Coverage Report

Created: 2024-08-18 16:21

/home/liu/actions-runner/_work/ccv/ccv/lib/nnc/ccv_nnc_stream.c
Line
Count
Source (jump to first uncovered line)
1
#include "ccv_nnc.h"
2
#include "ccv_nnc_internal.h"
3
#include "ccv_nnc_easy.h"
4
#include "co.h"
5
#ifdef HAVE_CUDA
6
#include "gpu/ccv_nnc_compat.h"
7
#elif defined(HAVE_MPS)
8
#include "mps/ccv_nnc_mps.h"
9
#endif
10
#ifdef USE_DISPATCH
11
#include <dispatch/dispatch.h>
12
#endif
13
#include "_ccv_nnc_stream.h"
14
15
typedef struct {
16
  ccv_nnc_stream_context_t super;
17
  // Left for implementation yet, the CPU support for stream context.
18
  size_t workspace_size;
19
  void* workspace;
20
} ccv_nnc_stream_cpu_t;
21
22
typedef struct {
23
  ccv_nnc_stream_context_destructor_f destructor_hook;
24
  void* context;
25
} ccv_nnc_stream_destructor_hook_t;
26
27
ccv_nnc_stream_context_t* ccv_nnc_stream_context_new(const int type)
28
1.22k
{
29
1.22k
  ccv_nnc_stream_cpu_t* const stream_cpu = (ccv_nnc_stream_cpu_t*)cccalloc(1, sizeof(ccv_nnc_stream_cpu_t));
30
1.22k
  stream_cpu->super.type = type;
31
1.22k
  stream_cpu->super.reuse_destructor_hook = -1;
32
1.22k
#if defined(HAVE_CUDA) || defined(HAVE_MPS)
33
1.22k
  if (CCV_STREAM_GET_CONTEXT(type) == CCV_STREAM_CONTEXT_GPU)
34
1.05k
    return ccv_nnc_init_stream_context((ccv_nnc_stream_context_t*)stream_cpu);
35
179
#endif
36
179
  return (ccv_nnc_stream_context_t*)stream_cpu;
37
1.22k
}
38
39
CCV_WARN_UNUSED(int) ccv_nnc_stream_context_type(const ccv_nnc_stream_context_t* const stream_context)
40
49.3k
{
41
49.3k
  return stream_context->type;
42
49.3k
}
43
44
static __thread ccv_nnc_stream_cpu_t ccv_nnc_per_thread_stream_cpu = {
45
  .super = {
46
    .type = CCV_STREAM_CONTEXT_CPU,
47
  },
48
};
49
50
void* ccv_nnc_stream_context_get_workspace(ccv_nnc_stream_context_t* const stream_context, const size_t workspace_size, const int mem)
51
15.2k
{
52
15.2k
#if defined(HAVE_CUDA) || defined(HAVE_MPS)
53
15.2k
  return ccv_nnc_stream_compat_get_workspace(stream_context, workspace_size, mem);
54
#else
55
  ccv_nnc_stream_cpu_t* stream_cpu = (ccv_nnc_stream_cpu_t*)stream_context;
56
  if (!stream_cpu)
57
    stream_cpu = &ccv_nnc_per_thread_stream_cpu;
58
  assert(mem == CCV_TENSOR_CPU_MEMORY);
59
  if (stream_cpu->workspace_size >= workspace_size)
60
    return stream_cpu->workspace;
61
  stream_cpu->workspace_size = workspace_size;
62
  if (stream_cpu->workspace)
63
    ccfree(stream_cpu->workspace);
64
  stream_cpu->workspace = 0;
65
  ccmemalign(&stream_cpu->workspace, 64, workspace_size);
66
  return stream_cpu->workspace;
67
#endif
68
15.2k
}
69
70
void ccv_nnc_stream_context_drain(ccv_nnc_stream_context_t* const stream_context)
71
110k
{
72
110k
#if defined(HAVE_CUDA) || defined(HAVE_MPS)
73
110k
  ccv_nnc_stream_compat_drain(stream_context);
74
#else
75
  ccv_nnc_stream_cpu_t* stream_cpu = (ccv_nnc_stream_cpu_t*)stream_context;
76
  if (!stream_cpu)
77
    stream_cpu = &ccv_nnc_per_thread_stream_cpu;
78
  if (stream_cpu->workspace)
79
  {
80
    ccfree(stream_cpu->workspace);
81
    stream_cpu->workspace = 0;
82
    stream_cpu->workspace_size = 0;
83
  }
84
#endif
85
110k
}
86
87
static void _ccv_nnc_stream_context_add_callback(ccv_nnc_stream_context_t* const stream_context, const ccv_nnc_callback_f callback, const ccv_nnc_async_callback_f async_callback, void* const callback_context)
88
204
{
89
204
#if defined(HAVE_CUDA) || defined(HAVE_MPS)
90
204
  if (CCV_STREAM_GET_CONTEXT(stream_context->type) == CCV_STREAM_CONTEXT_GPU)
91
204
    ccv_nnc_stream_compat_add_callback(stream_context, callback, async_callback, callback_context);
92
0
  else
93
0
    callback(callback_context);
94
#else
95
  callback(callback_context);
96
#endif
97
204
}
98
99
static void _ccv_nnc_sync_dispatch(ccv_nnc_async_callback_t* const async)
100
201
{
101
201
  async->fn(async->callback_context);
102
201
  ccfree(async);
103
201
}
104
105
#ifndef USE_DISPATCH
106
static void* _ccv_nnc_pthread_dispatch(void* const userdata)
107
201
{
108
201
  _ccv_nnc_sync_dispatch((ccv_nnc_async_callback_t*)userdata);
109
201
  return 0;
110
201
}
111
#endif
112
113
static void _ccv_nnc_async_dispatch(ccv_nnc_async_callback_t* const async)
114
201
{
115
  // This method dispatches to a different thread because the CUDA callback thread cannot operate CUDA objects.
116
#ifdef USE_DISPATCH
117
  dispatch_async_f(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), async, (dispatch_function_t)_ccv_nnc_sync_dispatch);
118
#else
119
201
  pthread_t thread;
120
201
  pthread_create(&thread, 0, _ccv_nnc_pthread_dispatch, async);
121
201
#endif
122
201
}
123
124
0
static co_decl_task(_ccv_nnc_stream_context_add_callback_async, (ccv_nnc_stream_context_t* const stream_context, const ccv_nnc_callback_f callback, void* const callback_context), private())
125
0
{
126
0
  co_stream_await(CO_P(stream_context));
127
0
  _ccv_nnc_stream_context_add_callback(CO_P(stream_context), CO_P(callback), _ccv_nnc_async_dispatch, CO_P(callback_context));
128
0
} co_end()
129
130
void ccv_nnc_stream_context_add_callback(ccv_nnc_stream_context_t* const stream_context, const ccv_nnc_callback_f callback, void* const callback_context)
131
204
{
132
204
  if (!stream_context)
133
0
  {
134
0
    callback(callback_context);
135
0
    return;
136
0
  }
137
204
  co_scheduler_t* const scheduler = stream_context->scheduler;
138
204
  if (scheduler && co_scheduler_is_active(scheduler))
139
0
  {
140
0
    co_routine_t* const task = co_new(_ccv_nnc_stream_context_add_callback_async, (stream_context, callback, callback_context));
141
0
    co_schedule(scheduler, task);
142
0
  } else
143
204
    _ccv_nnc_stream_context_add_callback(stream_context, callback, _ccv_nnc_async_dispatch, callback_context);
144
204
}
145
146
void ccv_nnc_stream_context_wait(const ccv_nnc_stream_context_t* const stream_context)
147
26.1k
{
148
26.1k
  if (!stream_context)
149
0
    return;
150
26.1k
  co_scheduler_t* const scheduler = stream_context->scheduler;
151
26.1k
  if (scheduler && 
!co_is_on_scheduler(scheduler)26.0k
) // First wait the scheduler to finish if I am not currently on that scheduler.
152
26.0k
  {
153
26.0k
    pthread_mutex_lock(&scheduler->mutex);
154
26.0k
    while (scheduler->active)
155
2
      pthread_cond_wait(&scheduler->notify, &scheduler->mutex);
156
26.0k
    pthread_mutex_unlock(&scheduler->mutex);
157
26.0k
  }
158
26.1k
#if defined(HAVE_CUDA) || defined(HAVE_MPS)
159
26.1k
  if (CCV_STREAM_GET_CONTEXT(stream_context->type) == CCV_STREAM_CONTEXT_GPU)
160
806
    ccv_nnc_synchronize_stream_context(stream_context);
161
26.1k
#endif
162
26.1k
}
163
164
int ccv_nnc_stream_context_add_destructor_hook(ccv_nnc_stream_context_t* const stream, ccv_nnc_stream_context_destructor_f destructor, void* const context)
165
24
{
166
24
  ccv_nnc_stream_destructor_hook_t hook = {
167
24
    .destructor_hook = destructor,
168
24
    .context = context
169
24
  };
170
24
  if (stream->reuse_destructor_hook >= 0)
171
0
  {
172
0
    assert(stream->destructor_hooks);
173
0
    const int reuse_destructor_hook = stream->reuse_destructor_hook;
174
0
    assert(reuse_destructor_hook < stream->destructor_hooks->rnum);
175
0
    *(ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream->destructor_hooks, reuse_destructor_hook) = hook;
176
0
    int i;
177
0
    stream->reuse_destructor_hook = -1;
178
0
    for (i = reuse_destructor_hook + 1; i < stream->destructor_hooks->rnum && stream->reuse_destructor_hook < 0; i++)
179
0
      if (!((ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream->destructor_hooks, i))->destructor_hook)
180
0
        stream->reuse_destructor_hook = i;
181
0
    return reuse_destructor_hook;
182
24
  } else {
183
24
    if (!stream->destructor_hooks)
184
12
      stream->destructor_hooks = ccv_array_new(sizeof(ccv_nnc_stream_destructor_hook_t), 1, 0);
185
24
    ccv_array_push(stream->destructor_hooks, &hook);
186
24
    return stream->destructor_hooks->rnum - 1;
187
24
  }
188
24
}
189
190
void ccv_nnc_stream_context_remove_destructor_hook(ccv_nnc_stream_context_t* const stream, const int hook_id)
191
18
{
192
18
  assert(hook_id >= 0);
193
18
  assert(hook_id < stream->destructor_hooks->rnum);
194
18
  ccv_nnc_stream_destructor_hook_t* const hook = (ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream->destructor_hooks, hook_id);
195
18
  hook->destructor_hook = 0;
196
18
  hook->context = 0;
197
18
  int i;
198
36
  for (i = stream->destructor_hooks->rnum - 1; i >= 0; 
i--18
)
199
26
    if (((ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream->destructor_hooks, i))->destructor_hook)
200
8
    {
201
8
      stream->destructor_hooks->rnum = i + 1;
202
8
      break;
203
8
    }
204
18
  if (hook_id < stream->destructor_hooks->rnum &&
205
18
    
(17
hook_id < stream->reuse_destructor_hook17
||
stream->reuse_destructor_hook < 017
))
206
11
    stream->reuse_destructor_hook = hook_id;
207
7
  else if (stream->reuse_destructor_hook >= stream->destructor_hooks->rnum)
208
1
    stream->reuse_destructor_hook = -1;
209
18
}
210
211
void ccv_nnc_stream_context_free(ccv_nnc_stream_context_t* const stream_context)
212
1.22k
{
213
1.22k
  if (stream_context->destructor_hooks)
214
12
  {
215
12
    int i;
216
34
    for (i = 0; i < stream_context->destructor_hooks->rnum; 
i++22
)
217
22
    {
218
22
      ccv_nnc_stream_destructor_hook_t* const hook = (ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream_context->destructor_hooks, i);
219
22
      if (hook->destructor_hook)
220
6
        hook->destructor_hook(stream_context, hook->context);
221
22
    }
222
12
    ccv_array_free(stream_context->destructor_hooks);
223
12
  }
224
1.22k
#if defined(HAVE_CUDA) || defined(HAVE_MPS)
225
1.22k
  if (CCV_STREAM_GET_CONTEXT(stream_context->type) == CCV_STREAM_CONTEXT_GPU)
226
1.05k
    ccv_nnc_deinit_stream_context(stream_context);
227
179
  else {
228
179
#endif
229
179
  ccv_nnc_stream_cpu_t* stream_cpu = (ccv_nnc_stream_cpu_t*)stream_context;
230
179
  if (stream_cpu->workspace)
231
3
    ccfree(stream_cpu->workspace);
232
179
#if defined(HAVE_CUDA) || defined(HAVE_MPS)
233
179
  }
234
1.22k
#endif
235
1.22k
  if (stream_context->scheduler)
236
136
  {
237
136
    co_scheduler_t* const scheduler = stream_context->scheduler;
238
136
    co_scheduler_free(scheduler);
239
136
  }
240
1.22k
  if (stream_context->event)
241
290
    ccv_nnc_stream_signal_free(stream_context->event);
242
1.22k
  if (stream_context->sfmt)
243
9
    ccfree(stream_context->sfmt);
244
1.22k
  ccfree(stream_context);
245
1.22k
}
246
247
void ccv_nnc_stream_context_set_seed(ccv_nnc_stream_context_t* const stream_context, uint32_t seed)
248
1
{
249
1
  if (!stream_context)
250
1
  {
251
1
    ccv_nnc_stream_cpu_t* const stream_cpu = &ccv_nnc_per_thread_stream_cpu;
252
1
    if (!stream_cpu->super.sfmt)
253
0
      stream_cpu->super.sfmt = ccmalloc(sizeof(sfmt_t));
254
1
    sfmt_init_gen_rand(stream_cpu->super.sfmt, seed);
255
1
    return;
256
1
  }
257
0
  if (!stream_context->sfmt)
258
0
    stream_context->sfmt = ccmalloc(sizeof(sfmt_t));
259
0
  sfmt_init_gen_rand(stream_context->sfmt, seed);
260
0
}
261
262
uint32_t ccv_nnc_stream_context_genrand_uint32(ccv_nnc_stream_context_t* const stream_context)
263
346
{
264
346
  if (!stream_context)
265
176
  {
266
176
    ccv_nnc_stream_cpu_t* const stream_cpu = &ccv_nnc_per_thread_stream_cpu;
267
176
    if (!stream_cpu->super.sfmt)
268
1
    {
269
1
      stream_cpu->super.sfmt = ccmalloc(sizeof(sfmt_t));
270
1
      sfmt_init_gen_rand(stream_cpu->super.sfmt, (uint32_t)(uintptr_t)stream_cpu);
271
1
    }
272
176
    return sfmt_genrand_uint32(stream_cpu->super.sfmt);
273
176
  }
274
170
  if (!stream_context->sfmt)
275
10
  {
276
10
    stream_context->sfmt = ccmalloc(sizeof(sfmt_t));
277
    // Init with seed from thread-local context.
278
10
    sfmt_init_gen_rand(stream_context->sfmt, ccv_nnc_stream_context_genrand_uint32(0));
279
10
  }
280
170
  return sfmt_genrand_uint32(stream_context->sfmt);
281
346
}
282
283
void ccv_nnc_stream_context_set_neighbor_discovery(ccv_nnc_stream_context_t* const stream_context, ccv_nnc_stream_context_neighbor_discovery_f discovery, void* const context)
284
229k
{
285
229k
  stream_context->neighbor_discovery = discovery;
286
229k
  stream_context->neighbor_discovery_context = context;
287
229k
}
288
289
ccv_nnc_stream_context_t* ccv_nnc_stream_context_find_neighbor(ccv_nnc_stream_context_t* const stream_context, const int device_id)
290
14.1k
{
291
14.1k
  if (stream_context->neighbor_discovery)
292
13.9k
    return stream_context->neighbor_discovery(device_id, stream_context->neighbor_discovery_context);
293
204
  return 0;
294
14.1k
}
295
296
ccv_nnc_stream_signal_t* ccv_nnc_stream_signal_new(const int type)
297
1.89k
{
298
1.89k
  ccv_nnc_stream_signal_t* const signal = (ccv_nnc_stream_signal_t*)ccmalloc(sizeof(ccv_nnc_stream_signal_t));
299
1.89k
  signal->type = type;
300
1.89k
  signal->emit_context = 0;
301
1.89k
#if defined(HAVE_CUDA) || defined(HAVE_MPS)
302
1.89k
  if (CCV_STREAM_GET_CONTEXT(type) == CCV_STREAM_CONTEXT_GPU)
303
1.58k
    return ccv_nnc_init_stream_signal(signal);
304
310
#endif
305
310
  return signal;
306
1.89k
}
307
308
CCV_WARN_UNUSED(int) ccv_nnc_stream_signal_type(const ccv_nnc_stream_signal_t* const signal)
309
334
{
310
334
  return signal->type;
311
334
}
312
313
void ccv_nnc_stream_context_emit_signal(ccv_nnc_stream_context_t* const stream, ccv_nnc_stream_signal_t* const signal)
314
60.5k
{
315
60.5k
  signal->emit_context = stream;
316
60.5k
#if defined(HAVE_CUDA) || defined(HAVE_MPS)
317
60.5k
  if (CCV_STREAM_GET_CONTEXT(signal->type) == CCV_STREAM_CONTEXT_GPU)
318
30.2k
    ccv_nnc_stream_compat_emit_signal(stream, signal);
319
60.5k
#endif
320
60.5k
}
321
322
ccv_nnc_stream_context_t* ccv_nnc_stream_signal_get_emitter(const ccv_nnc_stream_signal_t* const signal)
323
7
{
324
7
  return signal->emit_context;
325
7
}
326
327
void ccv_nnc_stream_context_wait_signal(const ccv_nnc_stream_context_t* const stream, const ccv_nnc_stream_signal_t* const signal)
328
118k
{
329
118k
#if defined(HAVE_CUDA) || defined(HAVE_MPS)
330
118k
  if (CCV_STREAM_GET_CONTEXT(signal->type) == CCV_STREAM_CONTEXT_GPU)
331
82.5k
    ccv_nnc_stream_compat_wait_signal(stream, signal);
332
118k
#endif
333
118k
}
334
335
void ccv_nnc_stream_signal_free(ccv_nnc_stream_signal_t* const signal)
336
1.89k
{
337
1.89k
#if defined(HAVE_CUDA) || defined(HAVE_MPS)
338
1.89k
  if (CCV_STREAM_GET_CONTEXT(signal->type) == CCV_STREAM_CONTEXT_GPU)
339
1.58k
    ccv_nnc_deinit_stream_signal(signal);
340
1.89k
#endif
341
1.89k
  ccfree(signal);
342
1.89k
}
343
344
int ccv_nnc_device_count(const int type)
345
25
{
346
25
#if defined(HAVE_CUDA) || defined(HAVE_MPS)
347
25
  if (CCV_STREAM_GET_CONTEXT(type) == CCV_STREAM_CONTEXT_GPU)
348
25
    return ccv_nnc_gpu_device_count();
349
#else
350
  if (CCV_STREAM_GET_CONTEXT(type) == CCV_STREAM_CONTEXT_GPU)
351
    return 0;
352
#endif
353
0
  return 1; // I don't get core count for CPU yet.
354
25
}
355
356
co_scheduler_t* ccv_nnc_stream_context_get_scheduler(ccv_nnc_stream_context_t* const stream_context)
357
26.4k
{
358
26.4k
  co_scheduler_t* scheduler = stream_context->scheduler;
359
26.4k
  if (!scheduler)
360
532
    stream_context->scheduler = scheduler = co_scheduler_new();
361
26.4k
  return scheduler;
362
26.4k
}
363
364
int _co_stream_await(co_routine_t* const self, ccv_nnc_stream_context_t* const stream)
365
14
{
366
14
  if (!stream)
367
0
    return 1;
368
14
#if defined(HAVE_CUDA) || defined(HAVE_MPS)
369
14
  if (CCV_STREAM_GET_CONTEXT(stream->type) == CCV_STREAM_CONTEXT_GPU)
370
14
    return co_stream_compat_await(self, stream);
371
0
#endif
372
0
  return 1;
373
14
}
374
375
// MARK - Signal Container
376
377
ccv_nnc_stream_signal_t* ccv_nnc_stream_context_emit_signal_new(ccv_nnc_stream_context_t* const stream)
378
1.58k
{
379
  /**
380
   * We don't need complex containers for this. Based on CUDA documentation, Record will record the
381
   * most recent ones, and capture will use the most recent ones. Thus, even if we reuse the same event
382
   * again and again and again, as long as we emit and immediate wait, we won't have any problems.
383
   */
384
1.58k
  if (!stream->event)
385
290
    stream->event = ccv_nnc_stream_signal_new(ccv_nnc_stream_context_type(stream));
386
1.58k
  ccv_nnc_stream_context_emit_signal(stream, stream->event);
387
1.58k
  return stream->event;
388
1.58k
}