/home/liu/actions-runner/_work/ccv/ccv/lib/nnc/co.c
Line | Count | Source (jump to first uncovered line) |
1 | | #include <assert.h> |
2 | | #include "co.h" |
3 | | |
4 | | static void _co_append_task(co_scheduler_t* const scheduler, co_routine_t* const task) |
5 | 26.4k | { |
6 | 26.4k | if (scheduler->tail) |
7 | 0 | { |
8 | 0 | scheduler->tail->next = task; |
9 | 0 | task->prev = scheduler->tail; |
10 | 26.4k | } else { |
11 | 26.4k | scheduler->head = task; |
12 | 26.4k | task->prev = 0; |
13 | 26.4k | } |
14 | 26.4k | scheduler->tail = task; |
15 | 26.4k | task->next = 0; |
16 | 26.4k | } |
17 | | |
18 | | static void _co_delete_task(co_scheduler_t* const scheduler, co_routine_t* const task) |
19 | 26.4k | { |
20 | 26.4k | if (task->prev) |
21 | 0 | task->prev->next = task->next; |
22 | 26.4k | else |
23 | 26.4k | scheduler->head = task->next; |
24 | 26.4k | if (task->next) |
25 | 0 | task->next->prev = task->prev; |
26 | 26.4k | else |
27 | 26.4k | scheduler->tail = task->prev; |
28 | | |
29 | 26.4k | } |
30 | | |
31 | | static co_routine_t* _co_done(co_routine_t* const task) |
32 | 53.0k | { |
33 | 53.0k | if (task->notify_any) |
34 | 26.5k | { |
35 | 26.5k | co_routine_t* const notify_any = task->notify_any; |
36 | 26.5k | task->notify_any = 0; |
37 | 26.5k | int i; |
38 | 26.5k | const int other_size = notify_any->other_size; |
39 | 26.5k | notify_any->other_size = 0; |
40 | 26.5k | co_routine_t* const* const others = notify_any->others; |
41 | 26.5k | for (i = 0; i < other_size; i++0 ) |
42 | 0 | if (others[i] != task) |
43 | 0 | { |
44 | 0 | assert(others[i]->notify_any == notify_any); |
45 | 0 | others[i]->notify_any = 0; |
46 | 0 | } |
47 | 26.5k | return notify_any; |
48 | 26.5k | } |
49 | 26.4k | return 0; |
50 | 53.0k | } |
51 | | |
52 | | void _co_prepend_task(co_scheduler_t* const scheduler, co_routine_t* const task) |
53 | 7 | { |
54 | 7 | if (scheduler->head) |
55 | 0 | { |
56 | 0 | scheduler->head->prev = task; |
57 | 0 | task->next = scheduler->head; |
58 | 7 | } else { |
59 | 7 | scheduler->tail = task; |
60 | 7 | task->next = 0; |
61 | 7 | } |
62 | 7 | scheduler->head = task; |
63 | 7 | task->prev = 0; |
64 | 7 | } |
65 | | |
66 | | void _co_resume(co_routine_t* const self, co_routine_t* const task) |
67 | 4 | { |
68 | 4 | assert(!task->done); |
69 | 4 | task->scheduler = self->scheduler; |
70 | 4 | task->caller = self; |
71 | 4 | self->callee = task; |
72 | 4 | } |
73 | | |
74 | | void _co_apply(co_routine_t* const self, co_routine_t* const task) |
75 | 26.5k | { |
76 | 26.5k | assert(!task->done); |
77 | 26.5k | task->scheduler = self->scheduler; |
78 | 26.5k | self->callee = task; |
79 | 26.5k | task->caller = 0; // Doesn't automatic resume from this task. |
80 | 26.5k | _co_await_any(self, &task, 1); |
81 | 26.5k | } |
82 | | |
83 | | int _co_await_any(co_routine_t* const self, co_routine_t* const* const tasks, const int task_size) |
84 | 26.5k | { |
85 | 26.5k | assert(task_size > 0); |
86 | 26.5k | if (task_size == 1) // Special casing this, no need to add to others list, which has life-cycle requirement for this list. |
87 | 26.5k | { |
88 | 26.5k | self->others = 0; |
89 | 26.5k | self->other_size = 0; |
90 | 26.5k | if (tasks[0]->done) |
91 | 0 | return 1; |
92 | 26.5k | tasks[0]->notify_any = self; |
93 | 26.5k | return 0; |
94 | 26.5k | } |
95 | 0 | self->others = tasks; |
96 | 0 | self->other_size = task_size; |
97 | 0 | int i; |
98 | 0 | int flag = 0; |
99 | 0 | for (i = 0; !flag && i < task_size; i++) |
100 | 0 | { |
101 | 0 | flag = tasks[i]->done; |
102 | 0 | assert(tasks[i]->notify_any == 0); |
103 | 0 | tasks[i]->notify_any = self; |
104 | 0 | } |
105 | 0 | if (flag) |
106 | 0 | { |
107 | 0 | for (i = 0; i < task_size; i++) |
108 | 0 | tasks[i]->notify_any = 0; |
109 | 0 | return 1; |
110 | 0 | } |
111 | 0 | return 0; |
112 | 0 | } |
113 | | |
114 | | void co_free(co_routine_t* const task) |
115 | 53.0k | { |
116 | 53.0k | ccfree(task); |
117 | 53.0k | } |
118 | | |
119 | | int co_is_done(const co_routine_t* const task) |
120 | 6 | { |
121 | 6 | return task->done; |
122 | 6 | } |
123 | | |
124 | | static __thread co_scheduler_t* scheduler_per_thread = 0; |
125 | | |
126 | | // Second will invoke this blocking variant to schedule task on a newly created thread. |
127 | | static void* _co_main(void* userdata) |
128 | 2 | { |
129 | 2 | co_scheduler_t* const scheduler = (co_scheduler_t*)userdata; |
130 | 2 | pthread_mutex_lock(&scheduler->mutex); |
131 | 2 | co_scheduler_t* previous_scheduler = scheduler_per_thread; |
132 | 2 | scheduler_per_thread = scheduler; |
133 | | // By definition, the last task cannot co_free itself. And because this |
134 | | // scheduler is asynchronous, we cannot free it somewhere else. That |
135 | | // left us with the only choice to free the task at the very end when we |
136 | | // are going to put the scheduler to sleep again. |
137 | 2 | for (;;) |
138 | 9 | { |
139 | 9 | if (scheduler->head == 0 && scheduler->stream_await_count == 07 ) |
140 | 2 | { |
141 | 2 | scheduler->active = 0; |
142 | 2 | pthread_cond_broadcast(&scheduler->notify); |
143 | 2 | pthread_mutex_unlock(&scheduler->mutex); |
144 | 2 | break; |
145 | 2 | } |
146 | 7 | if (scheduler->head == 0) |
147 | 5 | { |
148 | 5 | pthread_cond_wait(&scheduler->wait, &scheduler->mutex); |
149 | 5 | pthread_mutex_unlock(&scheduler->mutex); |
150 | 5 | } |
151 | 7 | co_routine_t* task = scheduler->head; |
152 | 7 | _co_delete_task(scheduler, task); |
153 | 7 | pthread_mutex_unlock(&scheduler->mutex); |
154 | 52 | while (task) { |
155 | 45 | const co_state_t state = task->fn(task, task + 1); |
156 | 45 | task->line = state.line; |
157 | 45 | task->done = state.done; |
158 | 45 | if (task->callee) |
159 | 16 | task = task->callee; |
160 | 29 | else { |
161 | 29 | co_routine_t* const prev_task = task; |
162 | 29 | task = task->caller; |
163 | 29 | prev_task->caller = 0; |
164 | 29 | if (prev_task->done) |
165 | 24 | { |
166 | 24 | co_routine_t* const notify_any = _co_done(prev_task); |
167 | 24 | if (prev_task->root) // Free the task scheduled from co_schedule. |
168 | 2 | co_free(prev_task); |
169 | 24 | if (notify_any) |
170 | 22 | { |
171 | 22 | if (!task) |
172 | 22 | task = notify_any; |
173 | 0 | else { |
174 | 0 | pthread_mutex_lock(&scheduler->mutex); |
175 | 0 | _co_prepend_task(scheduler, notify_any); |
176 | 0 | pthread_mutex_unlock(&scheduler->mutex); |
177 | | // Since we have task, we will resume the inner loop. |
178 | 0 | } |
179 | 22 | } |
180 | 24 | } |
181 | 29 | } |
182 | 45 | } |
183 | 7 | pthread_mutex_lock(&scheduler->mutex); |
184 | 7 | } |
185 | 2 | scheduler_per_thread = previous_scheduler; |
186 | 2 | return 0; |
187 | 2 | } |
188 | | |
189 | | // First will invoke this non-blocking variant to schedule task. |
190 | | static void _co_try(co_scheduler_t* const scheduler) |
191 | 26.4k | { |
192 | 26.4k | pthread_mutex_lock(&scheduler->mutex); |
193 | 26.4k | if (scheduler->active) |
194 | 0 | { |
195 | 0 | pthread_mutex_unlock(&scheduler->mutex); |
196 | 0 | return; |
197 | 0 | } |
198 | 26.4k | scheduler->active = 1; |
199 | 26.4k | co_scheduler_t* previous_scheduler = scheduler_per_thread; |
200 | 26.4k | scheduler_per_thread = scheduler; |
201 | 26.4k | for (;;) |
202 | 52.9k | { |
203 | 52.9k | if (scheduler->head == 0 && scheduler->stream_await_count == 026.4k ) |
204 | 26.4k | { |
205 | 26.4k | scheduler->active = 0; |
206 | 26.4k | pthread_mutex_unlock(&scheduler->mutex); |
207 | 26.4k | break; |
208 | 26.4k | } |
209 | 26.4k | if (scheduler->head == 0) |
210 | 2 | { |
211 | | // Launch a thread to continue the execution. |
212 | 2 | pthread_create(&scheduler->thread, 0, _co_main, scheduler); |
213 | 2 | pthread_mutex_unlock(&scheduler->mutex); |
214 | 2 | break; |
215 | 2 | } |
216 | 26.4k | co_routine_t* task = scheduler->head; |
217 | 26.4k | _co_delete_task(scheduler, task); |
218 | 26.4k | pthread_mutex_unlock(&scheduler->mutex); |
219 | 105k | while (task) { |
220 | 79.4k | const co_state_t state = task->fn(task, task + 1); |
221 | 79.4k | task->line = state.line; |
222 | 79.4k | task->done = state.done; |
223 | 79.4k | if (task->callee) |
224 | 26.5k | task = task->callee; |
225 | 52.9k | else { |
226 | 52.9k | co_routine_t* const prev_task = task; |
227 | 52.9k | task = task->caller; |
228 | 52.9k | prev_task->caller = 0; |
229 | 52.9k | if (prev_task->done) |
230 | 52.9k | { |
231 | 52.9k | co_routine_t* const notify_any = _co_done(prev_task); |
232 | 52.9k | if (prev_task->root) // Free the task scheduled from co_schedule. |
233 | 26.4k | co_free(prev_task); |
234 | 52.9k | if (notify_any) |
235 | 26.4k | { |
236 | 26.4k | if (!task) |
237 | 26.4k | task = notify_any; |
238 | 0 | else { |
239 | 0 | pthread_mutex_lock(&scheduler->mutex); |
240 | 0 | _co_prepend_task(scheduler, notify_any); |
241 | 0 | pthread_mutex_unlock(&scheduler->mutex); |
242 | | // Since we have task, we will resume the inner loop. |
243 | 0 | } |
244 | 26.4k | } |
245 | 52.9k | } |
246 | 52.9k | } |
247 | 79.4k | } |
248 | 26.4k | pthread_mutex_lock(&scheduler->mutex); |
249 | 26.4k | } |
250 | 26.4k | scheduler_per_thread = previous_scheduler; |
251 | 26.4k | } |
252 | | |
253 | | void co_schedule(co_scheduler_t* const scheduler, co_routine_t* const task) |
254 | 26.4k | { |
255 | 26.4k | task->scheduler = scheduler; |
256 | 26.4k | task->root = 1; // If this is the root, we will free it ourselves. |
257 | 26.4k | int activate_scheduler = 0; |
258 | 26.4k | pthread_mutex_lock(&scheduler->mutex); |
259 | 26.4k | _co_append_task(scheduler, task); |
260 | 26.4k | if (!scheduler->active) |
261 | 26.4k | activate_scheduler = 1; |
262 | 26.4k | pthread_mutex_unlock(&scheduler->mutex); |
263 | 26.4k | if (activate_scheduler) |
264 | 26.4k | _co_try(scheduler); |
265 | 26.4k | } |
266 | | |
267 | | int co_is_on_scheduler(co_scheduler_t* const scheduler) |
268 | 26.0k | { |
269 | 26.0k | if (scheduler_per_thread == 0) |
270 | 26.0k | return 0; |
271 | 0 | return scheduler_per_thread == scheduler; |
272 | 26.0k | } |
273 | | |
274 | | int co_scheduler_is_active(co_scheduler_t* const scheduler) |
275 | 600 | { |
276 | 600 | pthread_mutex_lock(&scheduler->mutex); |
277 | 600 | const int active = scheduler->active; |
278 | 600 | pthread_mutex_unlock(&scheduler->mutex); |
279 | 600 | return active; |
280 | 600 | } |
281 | | |
282 | | co_scheduler_t* co_scheduler_new(void) |
283 | 532 | { |
284 | 532 | co_scheduler_t* const scheduler = cccalloc(1, sizeof(co_scheduler_t)); |
285 | 532 | pthread_mutex_init(&scheduler->mutex, 0); |
286 | 532 | pthread_cond_init(&scheduler->notify, 0); |
287 | 532 | pthread_cond_init(&scheduler->wait, 0); |
288 | 532 | return scheduler; |
289 | 532 | } |
290 | | |
291 | | void co_scheduler_free(co_scheduler_t* const scheduler) |
292 | 532 | { |
293 | 532 | pthread_mutex_destroy(&scheduler->mutex); |
294 | 532 | pthread_cond_destroy(&scheduler->notify); |
295 | 532 | pthread_cond_destroy(&scheduler->wait); |
296 | 532 | ccfree(scheduler); |
297 | 532 | } |