Coverage Report

Created: 2024-08-19 11:27

/home/liu/actions-runner/_work/ccv/ccv/lib/nnc/co.c
Line
Count
Source (jump to first uncovered line)
1
#include <assert.h>
2
#include "co.h"
3
4
static void _co_append_task(co_scheduler_t* const scheduler, co_routine_t* const task)
5
26.4k
{
6
26.4k
  if (scheduler->tail)
7
0
  {
8
0
    scheduler->tail->next = task;
9
0
    task->prev = scheduler->tail;
10
26.4k
  } else {
11
26.4k
    scheduler->head = task;
12
26.4k
    task->prev = 0;
13
26.4k
  }
14
26.4k
  scheduler->tail = task;
15
26.4k
  task->next = 0;
16
26.4k
}
17
18
static void _co_delete_task(co_scheduler_t* const scheduler, co_routine_t* const task)
19
26.4k
{
20
26.4k
  if (task->prev)
21
0
    task->prev->next = task->next;
22
26.4k
  else
23
26.4k
    scheduler->head = task->next;
24
26.4k
  if (task->next)
25
0
    task->next->prev = task->prev;
26
26.4k
  else
27
26.4k
    scheduler->tail = task->prev;
28
29
26.4k
}
30
31
static co_routine_t* _co_done(co_routine_t* const task)
32
53.0k
{
33
53.0k
  if (task->notify_any)
34
26.5k
  {
35
26.5k
    co_routine_t* const notify_any = task->notify_any;
36
26.5k
    task->notify_any = 0;
37
26.5k
    int i;
38
26.5k
    const int other_size = notify_any->other_size;
39
26.5k
    notify_any->other_size = 0;
40
26.5k
    co_routine_t* const* const others = notify_any->others;
41
26.5k
    for (i = 0; i < other_size; 
i++0
)
42
0
      if (others[i] != task)
43
0
      {
44
0
        assert(others[i]->notify_any == notify_any);
45
0
        others[i]->notify_any = 0;
46
0
      }
47
26.5k
    return notify_any;
48
26.5k
  }
49
26.4k
  return 0;
50
53.0k
}
51
52
void _co_prepend_task(co_scheduler_t* const scheduler, co_routine_t* const task)
53
6
{
54
6
  if (scheduler->head)
55
0
  {
56
0
    scheduler->head->prev = task;
57
0
    task->next = scheduler->head;
58
6
  } else {
59
6
    scheduler->tail = task;
60
6
    task->next = 0;
61
6
  }
62
6
  scheduler->head = task;
63
6
  task->prev = 0;
64
6
}
65
66
void _co_resume(co_routine_t* const self, co_routine_t* const task)
67
4
{
68
4
  assert(!task->done);
69
4
  task->scheduler = self->scheduler;
70
4
  task->caller = self;
71
4
  self->callee = task;
72
4
}
73
74
void _co_apply(co_routine_t* const self, co_routine_t* const task)
75
26.5k
{
76
26.5k
  assert(!task->done);
77
26.5k
  task->scheduler = self->scheduler;
78
26.5k
  self->callee = task;
79
26.5k
  task->caller = 0; // Doesn't automatic resume from this task.
80
26.5k
  _co_await_any(self, &task, 1);
81
26.5k
}
82
83
int _co_await_any(co_routine_t* const self, co_routine_t* const* const tasks, const int task_size)
84
26.5k
{
85
26.5k
  assert(task_size > 0);
86
26.5k
  if (task_size == 1) // Special casing this, no need to add to others list, which has life-cycle requirement for this list.
87
26.5k
  {
88
26.5k
    self->others = 0;
89
26.5k
    self->other_size = 0;
90
26.5k
    if (tasks[0]->done)
91
0
      return 1;
92
26.5k
    tasks[0]->notify_any = self;
93
26.5k
    return 0;
94
26.5k
  }
95
0
  self->others = tasks;
96
0
  self->other_size = task_size;
97
0
  int i;
98
0
  int flag = 0;
99
0
  for (i = 0; !flag && i < task_size; i++)
100
0
  {
101
0
    flag = tasks[i]->done;
102
0
    assert(tasks[i]->notify_any == 0);
103
0
    tasks[i]->notify_any = self;
104
0
  }
105
0
  if (flag)
106
0
  {
107
0
    for (i = 0; i < task_size; i++)
108
0
      tasks[i]->notify_any = 0;
109
0
    return 1;
110
0
  }
111
0
  return 0;
112
0
}
113
114
void co_free(co_routine_t* const task)
115
53.0k
{
116
53.0k
  ccfree(task);
117
53.0k
}
118
119
int co_is_done(const co_routine_t* const task)
120
6
{
121
6
  return task->done;
122
6
}
123
124
static __thread co_scheduler_t* scheduler_per_thread = 0;
125
126
// Second will invoke this blocking variant to schedule task on a newly created thread.
127
static void* _co_main(void* userdata)
128
2
{
129
2
  co_scheduler_t* const scheduler = (co_scheduler_t*)userdata;
130
2
  pthread_mutex_lock(&scheduler->mutex);
131
2
  co_scheduler_t* previous_scheduler = scheduler_per_thread;
132
2
  scheduler_per_thread = scheduler;
133
  // By definition, the last task cannot co_free itself. And because this
134
  // scheduler is asynchronous, we cannot free it somewhere else. That
135
  // left us with the only choice to free the task at the very end when we
136
  // are going to put the scheduler to sleep again.
137
2
  for (;;)
138
8
  {
139
8
    if (scheduler->head == 0 && 
scheduler->stream_await_count == 06
)
140
2
    {
141
2
      scheduler->active = 0;
142
2
      pthread_cond_broadcast(&scheduler->notify);
143
2
      pthread_mutex_unlock(&scheduler->mutex);
144
2
      break;
145
2
    }
146
6
    if (scheduler->head == 0)
147
4
    {
148
4
      pthread_cond_wait(&scheduler->wait, &scheduler->mutex);
149
4
      pthread_mutex_unlock(&scheduler->mutex);
150
4
    }
151
6
    co_routine_t* task = scheduler->head;
152
6
    _co_delete_task(scheduler, task);
153
6
    pthread_mutex_unlock(&scheduler->mutex);
154
50
    while (task) {
155
44
      const co_state_t state = task->fn(task, task + 1);
156
44
      task->line = state.line;
157
44
      task->done = state.done;
158
44
      if (task->callee)
159
16
        task = task->callee;
160
28
      else {
161
28
        co_routine_t* const prev_task = task;
162
28
        task = task->caller;
163
28
        prev_task->caller = 0;
164
28
        if (prev_task->done)
165
24
        {
166
24
          co_routine_t* const notify_any = _co_done(prev_task);
167
24
          if (prev_task->root) // Free the task scheduled from co_schedule.
168
2
            co_free(prev_task);
169
24
          if (notify_any)
170
22
          {
171
22
            if (!task)
172
22
              task = notify_any;
173
0
            else {
174
0
              pthread_mutex_lock(&scheduler->mutex);
175
0
              _co_prepend_task(scheduler, notify_any);
176
0
              pthread_mutex_unlock(&scheduler->mutex);
177
              // Since we have task, we will resume the inner loop.
178
0
            }
179
22
          }
180
24
        }
181
28
      }
182
44
    }
183
6
    pthread_mutex_lock(&scheduler->mutex);
184
6
  }
185
2
  scheduler_per_thread = previous_scheduler;
186
2
  return 0;
187
2
}
188
189
// First will invoke this non-blocking variant to schedule task.
190
static void _co_try(co_scheduler_t* const scheduler)
191
26.4k
{
192
26.4k
  pthread_mutex_lock(&scheduler->mutex);
193
26.4k
  if (scheduler->active)
194
0
  {
195
0
    pthread_mutex_unlock(&scheduler->mutex);
196
0
    return;
197
0
  }
198
26.4k
  scheduler->active = 1;
199
26.4k
  co_scheduler_t* previous_scheduler = scheduler_per_thread;
200
26.4k
  scheduler_per_thread = scheduler;
201
26.4k
  for (;;)
202
52.9k
  {
203
52.9k
    if (scheduler->head == 0 && 
scheduler->stream_await_count == 026.4k
)
204
26.4k
    {
205
26.4k
      scheduler->active = 0;
206
26.4k
      pthread_mutex_unlock(&scheduler->mutex);
207
26.4k
      break;
208
26.4k
    }
209
26.4k
    if (scheduler->head == 0)
210
2
    {
211
      // Launch a thread to continue the execution.
212
2
      pthread_create(&scheduler->thread, 0, _co_main, scheduler);
213
2
      pthread_mutex_unlock(&scheduler->mutex);
214
2
      break;
215
2
    }
216
26.4k
    co_routine_t* task = scheduler->head;
217
26.4k
    _co_delete_task(scheduler, task);
218
26.4k
    pthread_mutex_unlock(&scheduler->mutex);
219
105k
    while (task) {
220
79.4k
      const co_state_t state = task->fn(task, task + 1);
221
79.4k
      task->line = state.line;
222
79.4k
      task->done = state.done;
223
79.4k
      if (task->callee)
224
26.5k
        task = task->callee;
225
52.9k
      else {
226
52.9k
        co_routine_t* const prev_task = task;
227
52.9k
        task = task->caller;
228
52.9k
        prev_task->caller = 0;
229
52.9k
        if (prev_task->done)
230
52.9k
        {
231
52.9k
          co_routine_t* const notify_any = _co_done(prev_task);
232
52.9k
          if (prev_task->root) // Free the task scheduled from co_schedule.
233
26.4k
            co_free(prev_task);
234
52.9k
          if (notify_any)
235
26.4k
          {
236
26.4k
            if (!task)
237
26.4k
              task = notify_any;
238
0
            else {
239
0
              pthread_mutex_lock(&scheduler->mutex);
240
0
              _co_prepend_task(scheduler, notify_any);
241
0
              pthread_mutex_unlock(&scheduler->mutex);
242
              // Since we have task, we will resume the inner loop.
243
0
            }
244
26.4k
          }
245
52.9k
        }
246
52.9k
      }
247
79.4k
    }
248
26.4k
    pthread_mutex_lock(&scheduler->mutex);
249
26.4k
  }
250
26.4k
  scheduler_per_thread = previous_scheduler;
251
26.4k
}
252
253
void co_schedule(co_scheduler_t* const scheduler, co_routine_t* const task)
254
26.4k
{
255
26.4k
  task->scheduler = scheduler;
256
26.4k
  task->root = 1; // If this is the root, we will free it ourselves.
257
26.4k
  int activate_scheduler = 0;
258
26.4k
  pthread_mutex_lock(&scheduler->mutex);
259
26.4k
  _co_append_task(scheduler, task);
260
26.4k
  if (!scheduler->active)
261
26.4k
    activate_scheduler = 1;
262
26.4k
  pthread_mutex_unlock(&scheduler->mutex);
263
26.4k
  if (activate_scheduler)
264
26.4k
    _co_try(scheduler);
265
26.4k
}
266
267
int co_is_on_scheduler(co_scheduler_t* const scheduler)
268
26.0k
{
269
26.0k
  if (scheduler_per_thread == 0)
270
26.0k
    return 0;
271
0
  return scheduler_per_thread == scheduler;
272
26.0k
}
273
274
int co_scheduler_is_active(co_scheduler_t* const scheduler)
275
600
{
276
600
  pthread_mutex_lock(&scheduler->mutex);
277
600
  const int active = scheduler->active;
278
600
  pthread_mutex_unlock(&scheduler->mutex);
279
600
  return active;
280
600
}
281
282
co_scheduler_t* co_scheduler_new(void)
283
532
{
284
532
  co_scheduler_t* const scheduler = cccalloc(1, sizeof(co_scheduler_t));
285
532
  pthread_mutex_init(&scheduler->mutex, 0);
286
532
  pthread_cond_init(&scheduler->notify, 0);
287
532
  pthread_cond_init(&scheduler->wait, 0);
288
532
  return scheduler;
289
532
}
290
291
void co_scheduler_free(co_scheduler_t* const scheduler)
292
532
{
293
532
  pthread_mutex_destroy(&scheduler->mutex);
294
532
  pthread_cond_destroy(&scheduler->notify);
295
532
  pthread_cond_destroy(&scheduler->wait);
296
532
  ccfree(scheduler);
297
532
}