/home/liu/actions-runner/_work/ccv/ccv/lib/nnc/ccv_nnc_stream.c
Line | Count | Source (jump to first uncovered line) |
1 | | #include "ccv_nnc.h" |
2 | | #include "ccv_nnc_internal.h" |
3 | | #include "ccv_nnc_easy.h" |
4 | | #include "co.h" |
5 | | #ifdef HAVE_CUDA |
6 | | #include "gpu/ccv_nnc_compat.h" |
7 | | #elif defined(HAVE_MPS) |
8 | | #include "mps/ccv_nnc_mps.h" |
9 | | #endif |
10 | | #ifdef USE_DISPATCH |
11 | | #include <dispatch/dispatch.h> |
12 | | #endif |
13 | | #include "_ccv_nnc_stream.h" |
14 | | |
15 | | typedef struct { |
16 | | ccv_nnc_stream_context_t super; |
17 | | // Left for implementation yet, the CPU support for stream context. |
18 | | size_t workspace_size; |
19 | | void* workspace; |
20 | | } ccv_nnc_stream_cpu_t; |
21 | | |
22 | | typedef struct { |
23 | | ccv_nnc_stream_context_destructor_f destructor_hook; |
24 | | void* context; |
25 | | } ccv_nnc_stream_destructor_hook_t; |
26 | | |
27 | | ccv_nnc_stream_context_t* ccv_nnc_stream_context_new(const int type) |
28 | 1.22k | { |
29 | 1.22k | ccv_nnc_stream_cpu_t* const stream_cpu = (ccv_nnc_stream_cpu_t*)cccalloc(1, sizeof(ccv_nnc_stream_cpu_t)); |
30 | 1.22k | stream_cpu->super.type = type; |
31 | 1.22k | stream_cpu->super.reuse_destructor_hook = -1; |
32 | 1.22k | #if defined(HAVE_CUDA) || defined(HAVE_MPS) |
33 | 1.22k | if (CCV_STREAM_GET_CONTEXT(type) == CCV_STREAM_CONTEXT_GPU) |
34 | 1.05k | return ccv_nnc_init_stream_context((ccv_nnc_stream_context_t*)stream_cpu); |
35 | 179 | #endif |
36 | 179 | return (ccv_nnc_stream_context_t*)stream_cpu; |
37 | 1.22k | } |
38 | | |
39 | | CCV_WARN_UNUSED(int) ccv_nnc_stream_context_type(const ccv_nnc_stream_context_t* const stream_context) |
40 | 49.3k | { |
41 | 49.3k | return stream_context->type; |
42 | 49.3k | } |
43 | | |
44 | | static __thread ccv_nnc_stream_cpu_t ccv_nnc_per_thread_stream_cpu = { |
45 | | .super = { |
46 | | .type = CCV_STREAM_CONTEXT_CPU, |
47 | | }, |
48 | | }; |
49 | | |
50 | | void* ccv_nnc_stream_context_get_workspace(ccv_nnc_stream_context_t* const stream_context, const size_t workspace_size, const int mem) |
51 | 15.2k | { |
52 | 15.2k | #if defined(HAVE_CUDA) || defined(HAVE_MPS) |
53 | 15.2k | return ccv_nnc_stream_compat_get_workspace(stream_context, workspace_size, mem); |
54 | | #else |
55 | | ccv_nnc_stream_cpu_t* stream_cpu = (ccv_nnc_stream_cpu_t*)stream_context; |
56 | | if (!stream_cpu) |
57 | | stream_cpu = &ccv_nnc_per_thread_stream_cpu; |
58 | | assert(mem == CCV_TENSOR_CPU_MEMORY); |
59 | | if (stream_cpu->workspace_size >= workspace_size) |
60 | | return stream_cpu->workspace; |
61 | | stream_cpu->workspace_size = workspace_size; |
62 | | if (stream_cpu->workspace) |
63 | | ccfree(stream_cpu->workspace); |
64 | | stream_cpu->workspace = 0; |
65 | | ccmemalign(&stream_cpu->workspace, 64, workspace_size); |
66 | | return stream_cpu->workspace; |
67 | | #endif |
68 | 15.2k | } |
69 | | |
70 | | void ccv_nnc_stream_context_drain(ccv_nnc_stream_context_t* const stream_context) |
71 | 110k | { |
72 | 110k | #if defined(HAVE_CUDA) || defined(HAVE_MPS) |
73 | 110k | ccv_nnc_stream_compat_drain(stream_context); |
74 | | #else |
75 | | ccv_nnc_stream_cpu_t* stream_cpu = (ccv_nnc_stream_cpu_t*)stream_context; |
76 | | if (!stream_cpu) |
77 | | stream_cpu = &ccv_nnc_per_thread_stream_cpu; |
78 | | if (stream_cpu->workspace) |
79 | | { |
80 | | ccfree(stream_cpu->workspace); |
81 | | stream_cpu->workspace = 0; |
82 | | stream_cpu->workspace_size = 0; |
83 | | } |
84 | | #endif |
85 | 110k | } |
86 | | |
87 | | static void _ccv_nnc_stream_context_add_callback(ccv_nnc_stream_context_t* const stream_context, const ccv_nnc_callback_f callback, const ccv_nnc_async_callback_f async_callback, void* const callback_context) |
88 | 204 | { |
89 | 204 | #if defined(HAVE_CUDA) || defined(HAVE_MPS) |
90 | 204 | if (CCV_STREAM_GET_CONTEXT(stream_context->type) == CCV_STREAM_CONTEXT_GPU) |
91 | 204 | ccv_nnc_stream_compat_add_callback(stream_context, callback, async_callback, callback_context); |
92 | 0 | else |
93 | 0 | callback(callback_context); |
94 | | #else |
95 | | callback(callback_context); |
96 | | #endif |
97 | 204 | } |
98 | | |
99 | | static void _ccv_nnc_sync_dispatch(ccv_nnc_async_callback_t* const async) |
100 | 201 | { |
101 | 201 | async->fn(async->callback_context); |
102 | 201 | ccfree(async); |
103 | 201 | } |
104 | | |
105 | | #ifndef USE_DISPATCH |
106 | | static void* _ccv_nnc_pthread_dispatch(void* const userdata) |
107 | 201 | { |
108 | 201 | _ccv_nnc_sync_dispatch((ccv_nnc_async_callback_t*)userdata); |
109 | 201 | return 0; |
110 | 201 | } |
111 | | #endif |
112 | | |
113 | | static void _ccv_nnc_async_dispatch(ccv_nnc_async_callback_t* const async) |
114 | 201 | { |
115 | | // This method dispatches to a different thread because the CUDA callback thread cannot operate CUDA objects. |
116 | | #ifdef USE_DISPATCH |
117 | | dispatch_async_f(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), async, (dispatch_function_t)_ccv_nnc_sync_dispatch); |
118 | | #else |
119 | 201 | pthread_t thread; |
120 | 201 | pthread_create(&thread, 0, _ccv_nnc_pthread_dispatch, async); |
121 | 201 | #endif |
122 | 201 | } |
123 | | |
124 | 0 | static co_decl_task(_ccv_nnc_stream_context_add_callback_async, (ccv_nnc_stream_context_t* const stream_context, const ccv_nnc_callback_f callback, void* const callback_context), private()) |
125 | 0 | { |
126 | 0 | co_stream_await(CO_P(stream_context)); |
127 | 0 | _ccv_nnc_stream_context_add_callback(CO_P(stream_context), CO_P(callback), _ccv_nnc_async_dispatch, CO_P(callback_context)); |
128 | 0 | } co_end() |
129 | | |
130 | | void ccv_nnc_stream_context_add_callback(ccv_nnc_stream_context_t* const stream_context, const ccv_nnc_callback_f callback, void* const callback_context) |
131 | 204 | { |
132 | 204 | if (!stream_context) |
133 | 0 | { |
134 | 0 | callback(callback_context); |
135 | 0 | return; |
136 | 0 | } |
137 | 204 | co_scheduler_t* const scheduler = stream_context->scheduler; |
138 | 204 | if (scheduler && co_scheduler_is_active(scheduler)) |
139 | 0 | { |
140 | 0 | co_routine_t* const task = co_new(_ccv_nnc_stream_context_add_callback_async, (stream_context, callback, callback_context)); |
141 | 0 | co_schedule(scheduler, task); |
142 | 0 | } else |
143 | 204 | _ccv_nnc_stream_context_add_callback(stream_context, callback, _ccv_nnc_async_dispatch, callback_context); |
144 | 204 | } |
145 | | |
146 | | void ccv_nnc_stream_context_wait(const ccv_nnc_stream_context_t* const stream_context) |
147 | 26.1k | { |
148 | 26.1k | if (!stream_context) |
149 | 0 | return; |
150 | 26.1k | co_scheduler_t* const scheduler = stream_context->scheduler; |
151 | 26.1k | if (scheduler && !co_is_on_scheduler(scheduler)26.0k ) // First wait the scheduler to finish if I am not currently on that scheduler. |
152 | 26.0k | { |
153 | 26.0k | pthread_mutex_lock(&scheduler->mutex); |
154 | 26.0k | while (scheduler->active) |
155 | 2 | pthread_cond_wait(&scheduler->notify, &scheduler->mutex); |
156 | 26.0k | pthread_mutex_unlock(&scheduler->mutex); |
157 | 26.0k | } |
158 | 26.1k | #if defined(HAVE_CUDA) || defined(HAVE_MPS) |
159 | 26.1k | if (CCV_STREAM_GET_CONTEXT(stream_context->type) == CCV_STREAM_CONTEXT_GPU) |
160 | 806 | ccv_nnc_synchronize_stream_context(stream_context); |
161 | 26.1k | #endif |
162 | 26.1k | } |
163 | | |
164 | | int ccv_nnc_stream_context_add_destructor_hook(ccv_nnc_stream_context_t* const stream, ccv_nnc_stream_context_destructor_f destructor, void* const context) |
165 | 24 | { |
166 | 24 | ccv_nnc_stream_destructor_hook_t hook = { |
167 | 24 | .destructor_hook = destructor, |
168 | 24 | .context = context |
169 | 24 | }; |
170 | 24 | if (stream->reuse_destructor_hook >= 0) |
171 | 0 | { |
172 | 0 | assert(stream->destructor_hooks); |
173 | 0 | const int reuse_destructor_hook = stream->reuse_destructor_hook; |
174 | 0 | assert(reuse_destructor_hook < stream->destructor_hooks->rnum); |
175 | 0 | *(ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream->destructor_hooks, reuse_destructor_hook) = hook; |
176 | 0 | int i; |
177 | 0 | stream->reuse_destructor_hook = -1; |
178 | 0 | for (i = reuse_destructor_hook + 1; i < stream->destructor_hooks->rnum && stream->reuse_destructor_hook < 0; i++) |
179 | 0 | if (!((ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream->destructor_hooks, i))->destructor_hook) |
180 | 0 | stream->reuse_destructor_hook = i; |
181 | 0 | return reuse_destructor_hook; |
182 | 24 | } else { |
183 | 24 | if (!stream->destructor_hooks) |
184 | 12 | stream->destructor_hooks = ccv_array_new(sizeof(ccv_nnc_stream_destructor_hook_t), 1, 0); |
185 | 24 | ccv_array_push(stream->destructor_hooks, &hook); |
186 | 24 | return stream->destructor_hooks->rnum - 1; |
187 | 24 | } |
188 | 24 | } |
189 | | |
190 | | void ccv_nnc_stream_context_remove_destructor_hook(ccv_nnc_stream_context_t* const stream, const int hook_id) |
191 | 18 | { |
192 | 18 | assert(hook_id >= 0); |
193 | 18 | assert(hook_id < stream->destructor_hooks->rnum); |
194 | 18 | ccv_nnc_stream_destructor_hook_t* const hook = (ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream->destructor_hooks, hook_id); |
195 | 18 | hook->destructor_hook = 0; |
196 | 18 | hook->context = 0; |
197 | 18 | int i; |
198 | 36 | for (i = stream->destructor_hooks->rnum - 1; i >= 0; i--18 ) |
199 | 26 | if (((ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream->destructor_hooks, i))->destructor_hook) |
200 | 8 | { |
201 | 8 | stream->destructor_hooks->rnum = i + 1; |
202 | 8 | break; |
203 | 8 | } |
204 | 18 | if (hook_id < stream->destructor_hooks->rnum && |
205 | 18 | (17 hook_id < stream->reuse_destructor_hook17 || stream->reuse_destructor_hook < 017 )) |
206 | 11 | stream->reuse_destructor_hook = hook_id; |
207 | 7 | else if (stream->reuse_destructor_hook >= stream->destructor_hooks->rnum) |
208 | 1 | stream->reuse_destructor_hook = -1; |
209 | 18 | } |
210 | | |
211 | | void ccv_nnc_stream_context_free(ccv_nnc_stream_context_t* const stream_context) |
212 | 1.22k | { |
213 | 1.22k | if (stream_context->destructor_hooks) |
214 | 12 | { |
215 | 12 | int i; |
216 | 34 | for (i = 0; i < stream_context->destructor_hooks->rnum; i++22 ) |
217 | 22 | { |
218 | 22 | ccv_nnc_stream_destructor_hook_t* const hook = (ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream_context->destructor_hooks, i); |
219 | 22 | if (hook->destructor_hook) |
220 | 6 | hook->destructor_hook(stream_context, hook->context); |
221 | 22 | } |
222 | 12 | ccv_array_free(stream_context->destructor_hooks); |
223 | 12 | } |
224 | 1.22k | #if defined(HAVE_CUDA) || defined(HAVE_MPS) |
225 | 1.22k | if (CCV_STREAM_GET_CONTEXT(stream_context->type) == CCV_STREAM_CONTEXT_GPU) |
226 | 1.05k | ccv_nnc_deinit_stream_context(stream_context); |
227 | 179 | else { |
228 | 179 | #endif |
229 | 179 | ccv_nnc_stream_cpu_t* stream_cpu = (ccv_nnc_stream_cpu_t*)stream_context; |
230 | 179 | if (stream_cpu->workspace) |
231 | 3 | ccfree(stream_cpu->workspace); |
232 | 179 | #if defined(HAVE_CUDA) || defined(HAVE_MPS) |
233 | 179 | } |
234 | 1.22k | #endif |
235 | 1.22k | if (stream_context->scheduler) |
236 | 136 | { |
237 | 136 | co_scheduler_t* const scheduler = stream_context->scheduler; |
238 | 136 | co_scheduler_free(scheduler); |
239 | 136 | } |
240 | 1.22k | if (stream_context->event) |
241 | 290 | ccv_nnc_stream_signal_free(stream_context->event); |
242 | 1.22k | if (stream_context->sfmt) |
243 | 9 | ccfree(stream_context->sfmt); |
244 | 1.22k | ccfree(stream_context); |
245 | 1.22k | } |
246 | | |
247 | | void ccv_nnc_stream_context_set_seed(ccv_nnc_stream_context_t* const stream_context, uint32_t seed) |
248 | 1 | { |
249 | 1 | if (!stream_context) |
250 | 1 | { |
251 | 1 | ccv_nnc_stream_cpu_t* const stream_cpu = &ccv_nnc_per_thread_stream_cpu; |
252 | 1 | if (!stream_cpu->super.sfmt) |
253 | 0 | stream_cpu->super.sfmt = ccmalloc(sizeof(sfmt_t)); |
254 | 1 | sfmt_init_gen_rand(stream_cpu->super.sfmt, seed); |
255 | 1 | return; |
256 | 1 | } |
257 | 0 | if (!stream_context->sfmt) |
258 | 0 | stream_context->sfmt = ccmalloc(sizeof(sfmt_t)); |
259 | 0 | sfmt_init_gen_rand(stream_context->sfmt, seed); |
260 | 0 | } |
261 | | |
262 | | uint32_t ccv_nnc_stream_context_genrand_uint32(ccv_nnc_stream_context_t* const stream_context) |
263 | 346 | { |
264 | 346 | if (!stream_context) |
265 | 176 | { |
266 | 176 | ccv_nnc_stream_cpu_t* const stream_cpu = &ccv_nnc_per_thread_stream_cpu; |
267 | 176 | if (!stream_cpu->super.sfmt) |
268 | 1 | { |
269 | 1 | stream_cpu->super.sfmt = ccmalloc(sizeof(sfmt_t)); |
270 | 1 | sfmt_init_gen_rand(stream_cpu->super.sfmt, (uint32_t)(uintptr_t)stream_cpu); |
271 | 1 | } |
272 | 176 | return sfmt_genrand_uint32(stream_cpu->super.sfmt); |
273 | 176 | } |
274 | 170 | if (!stream_context->sfmt) |
275 | 10 | { |
276 | 10 | stream_context->sfmt = ccmalloc(sizeof(sfmt_t)); |
277 | | // Init with seed from thread-local context. |
278 | 10 | sfmt_init_gen_rand(stream_context->sfmt, ccv_nnc_stream_context_genrand_uint32(0)); |
279 | 10 | } |
280 | 170 | return sfmt_genrand_uint32(stream_context->sfmt); |
281 | 346 | } |
282 | | |
283 | | void ccv_nnc_stream_context_set_neighbor_discovery(ccv_nnc_stream_context_t* const stream_context, ccv_nnc_stream_context_neighbor_discovery_f discovery, void* const context) |
284 | 229k | { |
285 | 229k | stream_context->neighbor_discovery = discovery; |
286 | 229k | stream_context->neighbor_discovery_context = context; |
287 | 229k | } |
288 | | |
289 | | ccv_nnc_stream_context_t* ccv_nnc_stream_context_find_neighbor(ccv_nnc_stream_context_t* const stream_context, const int device_id) |
290 | 14.1k | { |
291 | 14.1k | if (stream_context->neighbor_discovery) |
292 | 13.9k | return stream_context->neighbor_discovery(device_id, stream_context->neighbor_discovery_context); |
293 | 204 | return 0; |
294 | 14.1k | } |
295 | | |
296 | | ccv_nnc_stream_signal_t* ccv_nnc_stream_signal_new(const int type) |
297 | 1.89k | { |
298 | 1.89k | ccv_nnc_stream_signal_t* const signal = (ccv_nnc_stream_signal_t*)ccmalloc(sizeof(ccv_nnc_stream_signal_t)); |
299 | 1.89k | signal->type = type; |
300 | 1.89k | signal->emit_context = 0; |
301 | 1.89k | #if defined(HAVE_CUDA) || defined(HAVE_MPS) |
302 | 1.89k | if (CCV_STREAM_GET_CONTEXT(type) == CCV_STREAM_CONTEXT_GPU) |
303 | 1.58k | return ccv_nnc_init_stream_signal(signal); |
304 | 310 | #endif |
305 | 310 | return signal; |
306 | 1.89k | } |
307 | | |
308 | | CCV_WARN_UNUSED(int) ccv_nnc_stream_signal_type(const ccv_nnc_stream_signal_t* const signal) |
309 | 334 | { |
310 | 334 | return signal->type; |
311 | 334 | } |
312 | | |
313 | | void ccv_nnc_stream_context_emit_signal(ccv_nnc_stream_context_t* const stream, ccv_nnc_stream_signal_t* const signal) |
314 | 60.5k | { |
315 | 60.5k | signal->emit_context = stream; |
316 | 60.5k | #if defined(HAVE_CUDA) || defined(HAVE_MPS) |
317 | 60.5k | if (CCV_STREAM_GET_CONTEXT(signal->type) == CCV_STREAM_CONTEXT_GPU) |
318 | 30.2k | ccv_nnc_stream_compat_emit_signal(stream, signal); |
319 | 60.5k | #endif |
320 | 60.5k | } |
321 | | |
322 | | ccv_nnc_stream_context_t* ccv_nnc_stream_signal_get_emitter(const ccv_nnc_stream_signal_t* const signal) |
323 | 7 | { |
324 | 7 | return signal->emit_context; |
325 | 7 | } |
326 | | |
327 | | void ccv_nnc_stream_context_wait_signal(const ccv_nnc_stream_context_t* const stream, const ccv_nnc_stream_signal_t* const signal) |
328 | 118k | { |
329 | 118k | #if defined(HAVE_CUDA) || defined(HAVE_MPS) |
330 | 118k | if (CCV_STREAM_GET_CONTEXT(signal->type) == CCV_STREAM_CONTEXT_GPU) |
331 | 82.5k | ccv_nnc_stream_compat_wait_signal(stream, signal); |
332 | 118k | #endif |
333 | 118k | } |
334 | | |
335 | | void ccv_nnc_stream_signal_free(ccv_nnc_stream_signal_t* const signal) |
336 | 1.89k | { |
337 | 1.89k | #if defined(HAVE_CUDA) || defined(HAVE_MPS) |
338 | 1.89k | if (CCV_STREAM_GET_CONTEXT(signal->type) == CCV_STREAM_CONTEXT_GPU) |
339 | 1.58k | ccv_nnc_deinit_stream_signal(signal); |
340 | 1.89k | #endif |
341 | 1.89k | ccfree(signal); |
342 | 1.89k | } |
343 | | |
344 | | int ccv_nnc_device_count(const int type) |
345 | 25 | { |
346 | 25 | #if defined(HAVE_CUDA) || defined(HAVE_MPS) |
347 | 25 | if (CCV_STREAM_GET_CONTEXT(type) == CCV_STREAM_CONTEXT_GPU) |
348 | 25 | return ccv_nnc_gpu_device_count(); |
349 | | #else |
350 | | if (CCV_STREAM_GET_CONTEXT(type) == CCV_STREAM_CONTEXT_GPU) |
351 | | return 0; |
352 | | #endif |
353 | 0 | return 1; // I don't get core count for CPU yet. |
354 | 25 | } |
355 | | |
356 | | co_scheduler_t* ccv_nnc_stream_context_get_scheduler(ccv_nnc_stream_context_t* const stream_context) |
357 | 26.4k | { |
358 | 26.4k | co_scheduler_t* scheduler = stream_context->scheduler; |
359 | 26.4k | if (!scheduler) |
360 | 532 | stream_context->scheduler = scheduler = co_scheduler_new(); |
361 | 26.4k | return scheduler; |
362 | 26.4k | } |
363 | | |
364 | | int _co_stream_await(co_routine_t* const self, ccv_nnc_stream_context_t* const stream) |
365 | 14 | { |
366 | 14 | if (!stream) |
367 | 0 | return 1; |
368 | 14 | #if defined(HAVE_CUDA) || defined(HAVE_MPS) |
369 | 14 | if (CCV_STREAM_GET_CONTEXT(stream->type) == CCV_STREAM_CONTEXT_GPU) |
370 | 14 | return co_stream_compat_await(self, stream); |
371 | 0 | #endif |
372 | 0 | return 1; |
373 | 14 | } |
374 | | |
375 | | // MARK - Signal Container |
376 | | |
377 | | ccv_nnc_stream_signal_t* ccv_nnc_stream_context_emit_signal_new(ccv_nnc_stream_context_t* const stream) |
378 | 1.58k | { |
379 | | /** |
380 | | * We don't need complex containers for this. Based on CUDA documentation, Record will record the |
381 | | * most recent ones, and capture will use the most recent ones. Thus, even if we reuse the same event |
382 | | * again and again and again, as long as we emit and immediate wait, we won't have any problems. |
383 | | */ |
384 | 1.58k | if (!stream->event) |
385 | 290 | stream->event = ccv_nnc_stream_signal_new(ccv_nnc_stream_context_type(stream)); |
386 | 1.58k | ccv_nnc_stream_context_emit_signal(stream, stream->event); |
387 | 1.58k | return stream->event; |
388 | 1.58k | } |