Coverage Report

Created: 2021-04-05 03:19

/home/liu/buildslave/linux-x64-runtests/build/lib/nnc/co.c
Line
Count
Source (jump to first uncovered line)
1
#include <assert.h>
2
#include "co.h"
3
4
static void _co_append_task(co_scheduler_t* const scheduler, co_routine_t* const task)
5
26.3k
{
6
26.3k
  if (scheduler->tail)
7
0
  {
8
0
    scheduler->tail->next = task;
9
0
    task->prev = scheduler->tail;
10
26.3k
  } else {
11
26.3k
    scheduler->head = task;
12
26.3k
    task->prev = 0;
13
26.3k
  }
14
26.3k
  scheduler->tail = task;
15
26.3k
  task->next = 0;
16
26.3k
}
17
18
static void _co_delete_task(co_scheduler_t* const scheduler, co_routine_t* const task)
19
26.3k
{
20
26.3k
  if (task->prev)
21
0
    task->prev->next = task->next;
22
26.3k
  else
23
26.3k
    scheduler->head = task->next;
24
26.3k
  if (task->next)
25
0
    task->next->prev = task->prev;
26
26.3k
  else
27
26.3k
    scheduler->tail = task->prev;
28
26.3k
29
26.3k
}
30
31
static co_routine_t* _co_done(co_routine_t* const task)
32
52.6k
{
33
52.6k
  if (task->notify_any)
34
26.3k
  {
35
26.3k
    co_routine_t* const notify_any = task->notify_any;
36
26.3k
    task->notify_any = 0;
37
26.3k
    int i;
38
26.3k
    const int other_size = notify_any->other_size;
39
26.3k
    notify_any->other_size = 0;
40
26.3k
    co_routine_t* const* const others = notify_any->others;
41
26.3k
    for (i = 0; i < other_size; 
i++0
)
42
0
      if (others[i] != task)
43
0
      {
44
0
        assert(others[i]->notify_any == notify_any);
45
0
        others[i]->notify_any = 0;
46
0
      }
47
26.3k
    return notify_any;
48
26.3k
  }
49
26.3k
  return 0;
50
26.3k
}
51
52
void _co_prepend_task(co_scheduler_t* const scheduler, co_routine_t* const task)
53
6
{
54
6
  if (scheduler->head)
55
0
  {
56
0
    scheduler->head->prev = task;
57
0
    task->next = scheduler->head;
58
6
  } else {
59
6
    scheduler->tail = task;
60
6
    task->next = 0;
61
6
  }
62
6
  scheduler->head = task;
63
6
  task->prev = 0;
64
6
}
65
66
void _co_resume(co_routine_t* const self, co_routine_t* const task)
67
4
{
68
4
  assert(!task->done);
69
4
  task->scheduler = self->scheduler;
70
4
  task->caller = self;
71
4
  self->callee = task;
72
4
}
73
74
void _co_apply(co_routine_t* const self, co_routine_t* const task)
75
26.3k
{
76
26.3k
  assert(!task->done);
77
26.3k
  task->scheduler = self->scheduler;
78
26.3k
  self->callee = task;
79
26.3k
  task->caller = 0; // Doesn't automatic resume from this task.
80
26.3k
  _co_await_any(self, &task, 1);
81
26.3k
}
82
83
int _co_await_any(co_routine_t* const self, co_routine_t* const* const tasks, const int task_size)
84
26.3k
{
85
26.3k
  assert(task_size > 0);
86
26.3k
  if (task_size == 1) // Special casing this, no need to add to others list, which has life-cycle requirement for this list.
87
26.3k
  {
88
26.3k
    self->others = 0;
89
26.3k
    self->other_size = 0;
90
26.3k
    if (tasks[0]->done)
91
0
      return 1;
92
26.3k
    tasks[0]->notify_any = self;
93
26.3k
    return 0;
94
26.3k
  }
95
0
  self->others = tasks;
96
0
  self->other_size = task_size;
97
0
  int i;
98
0
  int flag = 0;
99
0
  for (i = 0; !flag && i < task_size; i++)
100
0
  {
101
0
    flag = tasks[i]->done;
102
0
    assert(tasks[i]->notify_any == 0);
103
0
    tasks[i]->notify_any = self;
104
0
  }
105
0
  if (flag)
106
0
  {
107
0
    for (i = 0; i < task_size; i++)
108
0
      tasks[i]->notify_any = 0;
109
0
    return 1;
110
0
  }
111
0
  return 0;
112
0
}
113
114
void co_free(co_routine_t* const task)
115
52.6k
{
116
52.6k
  ccfree(task);
117
52.6k
}
118
119
int co_is_done(const co_routine_t* const task)
120
6
{
121
6
  return task->done;
122
6
}
123
124
static __thread co_scheduler_t* scheduler_per_thread = 0;
125
126
// Second will invoke this blocking variant to schedule task on a newly created thread.
127
static void* _co_main(void* userdata)
128
2
{
129
2
  co_scheduler_t* const scheduler = (co_scheduler_t*)userdata;
130
2
  pthread_mutex_lock(&scheduler->mutex);
131
2
  co_scheduler_t* previous_scheduler = scheduler_per_thread;
132
2
  scheduler_per_thread = scheduler;
133
2
  // By definition, the last task cannot co_free itself. And because this
134
2
  // scheduler is asynchronous, we cannot free it somewhere else. That
135
2
  // left us with the only choice to free the task at the very end when we
136
2
  // are going to put the scheduler to sleep again.
137
2
  for (;;)
138
6
  {
139
6
    if (scheduler->head == 0 && 
scheduler->stream_await_count == 04
)
140
2
    {
141
2
      scheduler->active = 0;
142
2
      pthread_cond_broadcast(&scheduler->notify);
143
2
      pthread_mutex_unlock(&scheduler->mutex);
144
2
      break;
145
2
    }
146
4
    if (scheduler->head == 0)
147
2
    {
148
2
      pthread_cond_wait(&scheduler->wait, &scheduler->mutex);
149
2
      pthread_mutex_unlock(&scheduler->mutex);
150
2
    }
151
4
    co_routine_t* task = scheduler->head;
152
4
    _co_delete_task(scheduler, task);
153
4
    pthread_mutex_unlock(&scheduler->mutex);
154
30
    while (task) {
155
26
      const co_state_t state = task->fn(task, task + 1);
156
26
      task->line = state.line;
157
26
      task->done = state.done;
158
26
      if (task->callee)
159
8
        task = task->callee;
160
18
      else {
161
18
        co_routine_t* const prev_task = task;
162
18
        task = task->caller;
163
18
        prev_task->caller = 0;
164
18
        if (prev_task->done)
165
16
        {
166
16
          co_routine_t* const notify_any = _co_done(prev_task);
167
16
          if (prev_task->root) // Free the task scheduled from co_schedule.
168
2
            co_free(prev_task);
169
16
          if (notify_any)
170
14
          {
171
14
            if (!task)
172
14
              task = notify_any;
173
0
            else {
174
0
              pthread_mutex_lock(&scheduler->mutex);
175
0
              _co_prepend_task(scheduler, notify_any);
176
0
              pthread_mutex_unlock(&scheduler->mutex);
177
0
              // Since we have task, we will resume the inner loop.
178
0
            }
179
14
          }
180
16
        }
181
18
      }
182
26
    }
183
4
    pthread_mutex_lock(&scheduler->mutex);
184
4
  }
185
2
  scheduler_per_thread = previous_scheduler;
186
2
  return 0;
187
2
}
188
189
// First will invoke this non-blocking variant to schedule task.
190
static void _co_try(co_scheduler_t* const scheduler)
191
26.3k
{
192
26.3k
  pthread_mutex_lock(&scheduler->mutex);
193
26.3k
  if (scheduler->active)
194
0
  {
195
0
    pthread_mutex_unlock(&scheduler->mutex);
196
0
    return;
197
0
  }
198
26.3k
  scheduler->active = 1;
199
26.3k
  co_scheduler_t* previous_scheduler = scheduler_per_thread;
200
26.3k
  scheduler_per_thread = scheduler;
201
26.3k
  for (;;)
202
52.6k
  {
203
52.6k
    if (scheduler->head == 0 && 
scheduler->stream_await_count == 026.3k
)
204
26.3k
    {
205
26.3k
      scheduler->active = 0;
206
26.3k
      pthread_mutex_unlock(&scheduler->mutex);
207
26.3k
      break;
208
26.3k
    }
209
26.3k
    if (scheduler->head == 0)
210
2
    {
211
2
      // Launch a thread to continue the execution.
212
2
      pthread_create(&scheduler->thread, 0, _co_main, scheduler);
213
2
      pthread_mutex_unlock(&scheduler->mutex);
214
2
      break;
215
2
    }
216
26.3k
    co_routine_t* task = scheduler->head;
217
26.3k
    _co_delete_task(scheduler, task);
218
26.3k
    pthread_mutex_unlock(&scheduler->mutex);
219
105k
    while (task) {
220
78.9k
      const co_state_t state = task->fn(task, task + 1);
221
78.9k
      task->line = state.line;
222
78.9k
      task->done = state.done;
223
78.9k
      if (task->callee)
224
26.3k
        task = task->callee;
225
52.6k
      else {
226
52.6k
        co_routine_t* const prev_task = task;
227
52.6k
        task = task->caller;
228
52.6k
        prev_task->caller = 0;
229
52.6k
        if (prev_task->done)
230
52.6k
        {
231
52.6k
          co_routine_t* const notify_any = _co_done(prev_task);
232
52.6k
          if (prev_task->root) // Free the task scheduled from co_schedule.
233
26.3k
            co_free(prev_task);
234
52.6k
          if (notify_any)
235
26.3k
          {
236
26.3k
            if (!task)
237
26.3k
              task = notify_any;
238
0
            else {
239
0
              pthread_mutex_lock(&scheduler->mutex);
240
0
              _co_prepend_task(scheduler, notify_any);
241
0
              pthread_mutex_unlock(&scheduler->mutex);
242
0
              // Since we have task, we will resume the inner loop.
243
0
            }
244
26.3k
          }
245
52.6k
        }
246
52.6k
      }
247
78.9k
    }
248
26.3k
    pthread_mutex_lock(&scheduler->mutex);
249
26.3k
  }
250
26.3k
  scheduler_per_thread = previous_scheduler;
251
26.3k
}
252
253
void co_schedule(co_scheduler_t* const scheduler, co_routine_t* const task)
254
26.3k
{
255
26.3k
  task->scheduler = scheduler;
256
26.3k
  task->root = 1; // If this is the root, we will free it ourselves.
257
26.3k
  int activate_scheduler = 0;
258
26.3k
  pthread_mutex_lock(&scheduler->mutex);
259
26.3k
  _co_append_task(scheduler, task);
260
26.3k
  if (!scheduler->active)
261
26.3k
    activate_scheduler = 1;
262
26.3k
  pthread_mutex_unlock(&scheduler->mutex);
263
26.3k
  if (activate_scheduler)
264
26.3k
    _co_try(scheduler);
265
26.3k
}
266
267
int co_is_on_scheduler(co_scheduler_t* const scheduler)
268
25.9k
{
269
25.9k
  if (scheduler_per_thread == 0)
270
25.9k
    return 0;
271
0
  return scheduler_per_thread == scheduler;
272
0
}
273
274
int co_scheduler_is_active(co_scheduler_t* const scheduler)
275
600
{
276
600
  pthread_mutex_lock(&scheduler->mutex);
277
600
  const int active = scheduler->active;
278
600
  pthread_mutex_unlock(&scheduler->mutex);
279
600
  return active;
280
600
}
281
282
co_scheduler_t* co_scheduler_new(void)
283
495
{
284
495
  co_scheduler_t* const scheduler = cccalloc(1, sizeof(co_scheduler_t));
285
495
  pthread_mutex_init(&scheduler->mutex, 0);
286
495
  pthread_cond_init(&scheduler->notify, 0);
287
495
  pthread_cond_init(&scheduler->wait, 0);
288
495
  return scheduler;
289
495
}
290
291
void co_scheduler_free(co_scheduler_t* const scheduler)
292
495
{
293
495
  pthread_mutex_destroy(&scheduler->mutex);
294
495
  pthread_cond_destroy(&scheduler->notify);
295
495
  pthread_cond_destroy(&scheduler->wait);
296
495
  ccfree(scheduler);
297
495
}