Coverage Report

Created: 2021-04-11 20:23

/home/liu/buildslave/linux-x64-runtests/build/lib/nnc/ccv_nnc_stream.c
Line
Count
Source (jump to first uncovered line)
1
#include "ccv_nnc.h"
2
#include "ccv_nnc_internal.h"
3
#include "ccv_nnc_easy.h"
4
#include "co.h"
5
#ifdef HAVE_CUDA
6
#include "gpu/ccv_nnc_compat.h"
7
#endif
8
#ifdef USE_DISPATCH
9
#include <dispatch/dispatch.h>
10
#endif
11
#include "_ccv_nnc_stream.h"
12
13
typedef struct {
14
  ccv_nnc_stream_context_t super;
15
  // Left for implementation yet, the CPU support for stream context.
16
  size_t workspace_size;
17
  void* workspace;
18
} ccv_nnc_stream_cpu_t;
19
20
typedef struct {
21
  ccv_nnc_stream_context_destructor_f destructor_hook;
22
  void* context;
23
} ccv_nnc_stream_destructor_hook_t;
24
25
ccv_nnc_stream_context_t* ccv_nnc_stream_context_new(const int type)
26
1.18k
{
27
1.18k
  ccv_nnc_stream_cpu_t* const stream_cpu = (ccv_nnc_stream_cpu_t*)cccalloc(1, sizeof(ccv_nnc_stream_cpu_t));
28
1.18k
  stream_cpu->super.type = type;
29
1.18k
  stream_cpu->super.reuse_destructor_hook = -1;
30
1.18k
  stream_cpu->super.destructor_hooks = 0;
31
1.18k
  stream_cpu->workspace_size = 0;
32
1.18k
  stream_cpu->workspace = 0;
33
1.18k
#ifdef HAVE_CUDA
34
1.18k
  if (CCV_STREAM_GET_CONTEXT(type) == CCV_STREAM_CONTEXT_GPU)
35
1.05k
    return ccv_nnc_init_stream_context((ccv_nnc_stream_context_t*)stream_cpu);
36
125
#endif
37
125
  return (ccv_nnc_stream_context_t*)stream_cpu;
38
125
}
39
40
CCV_WARN_UNUSED(int) ccv_nnc_stream_context_type(const ccv_nnc_stream_context_t* const stream_context)
41
49.2k
{
42
49.2k
  return stream_context->type;
43
49.2k
}
44
45
#ifndef HAVE_CUDA
46
static __thread ccv_nnc_stream_cpu_t ccv_nnc_per_thread_stream_cpu = {
47
  .super = {
48
    .type = CCV_STREAM_CONTEXT_CPU,
49
  },
50
};
51
#endif
52
53
void* ccv_nnc_stream_context_get_workspace(ccv_nnc_stream_context_t* const stream_context, const size_t workspace_size, const int mem)
54
14.9k
{
55
14.9k
#ifdef HAVE_CUDA
56
14.9k
  return ccv_nnc_stream_compat_get_workspace(stream_context, workspace_size, mem);
57
#else
58
  ccv_nnc_stream_cpu_t* stream_cpu = (ccv_nnc_stream_cpu_t*)stream_context;
59
  if (!stream_cpu)
60
    stream_cpu = &ccv_nnc_per_thread_stream_cpu;
61
  assert(mem == CCV_TENSOR_CPU_MEMORY);
62
  if (stream_cpu->workspace_size >= workspace_size)
63
    return stream_cpu->workspace;
64
  stream_cpu->workspace_size = workspace_size;
65
  if (stream_cpu->workspace)
66
    ccfree(stream_cpu->workspace);
67
  stream_cpu->workspace = 0;
68
  ccmemalign(&stream_cpu->workspace, 16, workspace_size);
69
  return stream_cpu->workspace;
70
#endif
71
14.9k
}
72
73
void ccv_nnc_stream_context_drain(ccv_nnc_stream_context_t* const stream_context)
74
99.8k
{
75
99.8k
#ifdef HAVE_CUDA
76
99.8k
  ccv_nnc_stream_compat_drain(stream_context);
77
#else
78
  ccv_nnc_stream_cpu_t* stream_cpu = (ccv_nnc_stream_cpu_t*)stream_context;
79
  if (!stream_cpu)
80
    stream_cpu = &ccv_nnc_per_thread_stream_cpu;
81
  if (stream_cpu->workspace)
82
  {
83
    ccfree(stream_cpu->workspace);
84
    stream_cpu->workspace = 0;
85
    stream_cpu->workspace_size = 0;
86
  }
87
#endif
88
99.8k
}
89
90
static void _ccv_nnc_stream_context_add_callback(ccv_nnc_stream_context_t* const stream_context, const ccv_nnc_callback_f callback, const ccv_nnc_async_callback_f async_callback, void* const callback_context)
91
1.13k
{
92
1.13k
#ifdef HAVE_CUDA
93
1.13k
  if (CCV_STREAM_GET_CONTEXT(stream_context->type) == CCV_STREAM_CONTEXT_GPU)
94
1.13k
    ccv_nnc_stream_compat_add_callback(stream_context, callback, async_callback, callback_context);
95
2
  else
96
2
    callback(callback_context);
97
#else
98
  callback(callback_context);
99
#endif
100
1.13k
}
101
102
static void _ccv_nnc_sync_dispatch(ccv_nnc_async_callback_t* const async)
103
854
{
104
854
  async->fn(async->callback_context);
105
854
  ccfree(async);
106
854
}
107
108
#ifndef USE_DISPATCH
109
static void* _ccv_nnc_pthread_dispatch(void* const userdata)
110
{
111
  _ccv_nnc_sync_dispatch((ccv_nnc_async_callback_t*)userdata);
112
  return 0;
113
}
114
#endif
115
116
static void _ccv_nnc_async_dispatch(ccv_nnc_async_callback_t* const async)
117
200
{
118
200
  // This method dispatches to a different thread because the CUDA callback thread cannot operate CUDA objects.
119
200
#ifdef USE_DISPATCH
120
200
  dispatch_async_f(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), async, (dispatch_function_t)_ccv_nnc_sync_dispatch);
121
#else
122
  pthread_t thread;
123
  pthread_create(&thread, 0, _ccv_nnc_pthread_dispatch, async);
124
#endif
125
200
}
126
127
0
static co_decl_task(_ccv_nnc_stream_context_add_callback_async, (ccv_nnc_stream_context_t* const stream_context, const ccv_nnc_callback_f callback, void* const callback_context), private())
128
0
{
129
0
  co_stream_await(CO_P(stream_context));
130
0
  _ccv_nnc_stream_context_add_callback(CO_P(stream_context), CO_P(callback), _ccv_nnc_async_dispatch, CO_P(callback_context));
131
0
} co_end()
132
133
void ccv_nnc_stream_context_add_callback(ccv_nnc_stream_context_t* const stream_context, const ccv_nnc_callback_f callback, void* const callback_context)
134
204
{
135
204
  if (!stream_context)
136
0
  {
137
0
    callback(callback_context);
138
0
    return;
139
0
  }
140
204
  co_scheduler_t* const scheduler = stream_context->scheduler;
141
204
  if (scheduler && co_scheduler_is_active(scheduler))
142
0
  {
143
0
    co_routine_t* const task = co_new(_ccv_nnc_stream_context_add_callback_async, (stream_context, callback, callback_context));
144
0
    co_schedule(scheduler, task);
145
0
  } else
146
204
    _ccv_nnc_stream_context_add_callback(stream_context, callback, _ccv_nnc_async_dispatch, callback_context);
147
204
}
148
149
void ccv_nnc_stream_context_wait(const ccv_nnc_stream_context_t* const stream_context)
150
26.0k
{
151
26.0k
  if (!stream_context)
152
0
    return;
153
26.0k
  co_scheduler_t* const scheduler = stream_context->scheduler;
154
26.0k
  if (scheduler && 
!co_is_on_scheduler(scheduler)25.9k
) // First wait the scheduler to finish if I am not currently on that scheduler.
155
25.9k
  {
156
25.9k
    pthread_mutex_lock(&scheduler->mutex);
157
25.9k
    while (scheduler->active)
158
2
      pthread_cond_wait(&scheduler->notify, &scheduler->mutex);
159
25.9k
    pthread_mutex_unlock(&scheduler->mutex);
160
25.9k
  }
161
26.0k
#ifdef HAVE_CUDA
162
26.0k
  if (CCV_STREAM_GET_CONTEXT(stream_context->type) == CCV_STREAM_CONTEXT_GPU)
163
790
    ccv_nnc_synchronize_stream_context(stream_context);
164
26.0k
#endif
165
26.0k
}
166
167
int ccv_nnc_stream_context_add_destructor_hook(ccv_nnc_stream_context_t* const stream, ccv_nnc_stream_context_destructor_f destructor, void* const context)
168
24
{
169
24
  ccv_nnc_stream_destructor_hook_t hook = {
170
24
    .destructor_hook = destructor,
171
24
    .context = context
172
24
  };
173
24
  if (stream->reuse_destructor_hook >= 0)
174
0
  {
175
0
    assert(stream->destructor_hooks);
176
0
    const int reuse_destructor_hook = stream->reuse_destructor_hook;
177
0
    assert(reuse_destructor_hook < stream->destructor_hooks->rnum);
178
0
    *(ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream->destructor_hooks, reuse_destructor_hook) = hook;
179
0
    int i;
180
0
    stream->reuse_destructor_hook = -1;
181
0
    for (i = reuse_destructor_hook + 1; i < stream->destructor_hooks->rnum && stream->reuse_destructor_hook < 0; i++)
182
0
      if (!((ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream->destructor_hooks, i))->destructor_hook)
183
0
        stream->reuse_destructor_hook = i;
184
0
    return reuse_destructor_hook;
185
24
  } else {
186
24
    if (!stream->destructor_hooks)
187
12
      stream->destructor_hooks = ccv_array_new(sizeof(ccv_nnc_stream_destructor_hook_t), 1, 0);
188
24
    ccv_array_push(stream->destructor_hooks, &hook);
189
24
    return stream->destructor_hooks->rnum - 1;
190
24
  }
191
24
}
192
193
void ccv_nnc_stream_context_remove_destructor_hook(ccv_nnc_stream_context_t* const stream, const int hook_id)
194
18
{
195
18
  assert(hook_id >= 0);
196
18
  assert(hook_id < stream->destructor_hooks->rnum);
197
18
  ccv_nnc_stream_destructor_hook_t* const hook = (ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream->destructor_hooks, hook_id);
198
18
  hook->destructor_hook = 0;
199
18
  hook->context = 0;
200
18
  int i;
201
36
  for (i = stream->destructor_hooks->rnum - 1; i >= 0; 
i--18
)
202
26
    if (((ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream->destructor_hooks, i))->destructor_hook)
203
8
    {
204
8
      stream->destructor_hooks->rnum = i + 1;
205
8
      break;
206
8
    }
207
18
  if (hook_id < stream->destructor_hooks->rnum &&
208
18
    
(17
hook_id < stream->reuse_destructor_hook17
||
stream->reuse_destructor_hook < 017
))
209
11
    stream->reuse_destructor_hook = hook_id;
210
7
  else if (stream->reuse_destructor_hook >= stream->destructor_hooks->rnum)
211
1
    stream->reuse_destructor_hook = -1;
212
18
}
213
214
void ccv_nnc_stream_context_free(ccv_nnc_stream_context_t* const stream_context)
215
1.18k
{
216
1.18k
  if (stream_context->destructor_hooks)
217
12
  {
218
12
    int i;
219
34
    for (i = 0; i < stream_context->destructor_hooks->rnum; 
i++22
)
220
22
    {
221
22
      ccv_nnc_stream_destructor_hook_t* const hook = (ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream_context->destructor_hooks, i);
222
22
      if (hook->destructor_hook)
223
6
        hook->destructor_hook(stream_context, hook->context);
224
22
    }
225
12
    ccv_array_free(stream_context->destructor_hooks);
226
12
  }
227
1.18k
#ifdef HAVE_CUDA
228
1.18k
  if (CCV_STREAM_GET_CONTEXT(stream_context->type) == CCV_STREAM_CONTEXT_GPU)
229
1.05k
    ccv_nnc_deinit_stream_context(stream_context);
230
125
  else {
231
125
#endif
232
125
  ccv_nnc_stream_cpu_t* stream_cpu = (ccv_nnc_stream_cpu_t*)stream_context;
233
125
  if (stream_cpu->workspace)
234
125
    
ccfree0
(stream_cpu->workspace)0
;
235
125
#ifdef HAVE_CUDA
236
125
  }
237
1.18k
#endif
238
1.18k
  if (stream_context->scheduler)
239
99
  {
240
99
    co_scheduler_t* const scheduler = stream_context->scheduler;
241
99
    co_scheduler_free(scheduler);
242
99
  }
243
1.18k
  if (stream_context->container)
244
226
  {
245
226
    ccv_nnc_signal_container_t* const container = stream_context->container;
246
226
    pthread_mutex_destroy(&container->mutex);
247
226
    int i;
248
456
    for (i = 0; i < container->empty->rnum; 
i++230
)
249
230
    {
250
230
      ccv_nnc_signal_handler_t* const handler = *(ccv_nnc_signal_handler_t**)ccv_array_get(container->empty, i);
251
230
      ccv_nnc_stream_signal_free(handler->signal);
252
230
      ccfree(handler);
253
230
    }
254
226
    ccv_array_free(container->empty);
255
226
    ccfree(container);
256
226
  }
257
1.18k
  ccfree(stream_context);
258
1.18k
}
259
260
void ccv_nnc_stream_context_set_neighbor_discovery(ccv_nnc_stream_context_t* const stream_context, ccv_nnc_stream_context_neighbor_discovery_f discovery, void* const context)
261
222k
{
262
222k
  stream_context->neighbor_discovery = discovery;
263
222k
  stream_context->neighbor_discovery_context = context;
264
222k
}
265
266
ccv_nnc_stream_context_t* ccv_nnc_stream_context_find_neighbor(ccv_nnc_stream_context_t* const stream_context, const int device_id)
267
14.1k
{
268
14.1k
  if (stream_context->neighbor_discovery)
269
13.9k
    return stream_context->neighbor_discovery(device_id, stream_context->neighbor_discovery_context);
270
204
  return 0;
271
204
}
272
273
ccv_nnc_stream_signal_t* ccv_nnc_stream_signal_new(const int type)
274
1.81k
{
275
1.81k
  ccv_nnc_stream_signal_t* const signal = (ccv_nnc_stream_signal_t*)ccmalloc(sizeof(ccv_nnc_stream_signal_t));
276
1.81k
  signal->type = type;
277
1.81k
  signal->emit_context = 0;
278
1.81k
#ifdef HAVE_CUDA
279
1.81k
  if (CCV_STREAM_GET_CONTEXT(type) == CCV_STREAM_CONTEXT_GPU)
280
1.60k
    return ccv_nnc_init_stream_signal(signal);
281
209
#endif
282
209
  return signal;
283
209
}
284
285
CCV_WARN_UNUSED(int) ccv_nnc_stream_signal_type(const ccv_nnc_stream_signal_t* const signal)
286
308
{
287
308
  return signal->type;
288
308
}
289
290
void ccv_nnc_stream_context_emit_signal(ccv_nnc_stream_context_t* const stream, ccv_nnc_stream_signal_t* const signal)
291
58.3k
{
292
58.3k
  signal->emit_context = stream;
293
58.3k
#ifdef HAVE_CUDA
294
58.3k
  if (CCV_STREAM_GET_CONTEXT(signal->type) == CCV_STREAM_CONTEXT_GPU)
295
30.2k
    ccv_nnc_stream_compat_emit_signal(stream, signal);
296
58.3k
#endif
297
58.3k
}
298
299
ccv_nnc_stream_context_t* ccv_nnc_stream_signal_get_emitter(const ccv_nnc_stream_signal_t* const signal)
300
7
{
301
7
  return signal->emit_context;
302
7
}
303
304
void ccv_nnc_stream_context_wait_signal(const ccv_nnc_stream_context_t* const stream, const ccv_nnc_stream_signal_t* const signal)
305
115k
{
306
115k
#ifdef HAVE_CUDA
307
115k
  if (CCV_STREAM_GET_CONTEXT(signal->type) == CCV_STREAM_CONTEXT_GPU)
308
82.2k
    ccv_nnc_stream_compat_wait_signal(stream, signal);
309
115k
#endif
310
115k
}
311
312
void ccv_nnc_stream_signal_free(ccv_nnc_stream_signal_t* const signal)
313
1.81k
{
314
1.81k
#ifdef HAVE_CUDA
315
1.81k
  if (CCV_STREAM_GET_CONTEXT(signal->type) == CCV_STREAM_CONTEXT_GPU)
316
1.60k
    ccv_nnc_deinit_stream_signal(signal);
317
1.81k
#endif
318
1.81k
  ccfree(signal);
319
1.81k
}
320
321
int ccv_nnc_device_count(const int type)
322
25
{
323
25
#ifdef HAVE_CUDA
324
25
  if (CCV_STREAM_GET_CONTEXT(type) == CCV_STREAM_CONTEXT_GPU)
325
25
    return ccv_nnc_gpu_device_count();
326
#else
327
  if (CCV_STREAM_GET_CONTEXT(type) == CCV_STREAM_CONTEXT_GPU)
328
    return 0;
329
#endif
330
0
  return 1; // I don't get core count for CPU yet.
331
0
}
332
333
co_scheduler_t* ccv_nnc_stream_context_get_scheduler(ccv_nnc_stream_context_t* const stream_context)
334
26.3k
{
335
26.3k
  co_scheduler_t* scheduler = stream_context->scheduler;
336
26.3k
  if (!scheduler)
337
495
    stream_context->scheduler = scheduler = co_scheduler_new();
338
26.3k
  return scheduler;
339
26.3k
}
340
341
int _co_stream_await(co_routine_t* const self, ccv_nnc_stream_context_t* const stream)
342
14
{
343
14
  if (!stream)
344
0
    return 1;
345
14
#ifdef HAVE_CUDA
346
14
  if (CCV_STREAM_GET_CONTEXT(stream->type) == CCV_STREAM_CONTEXT_GPU)
347
14
    return co_stream_compat_await(self, stream);
348
0
#endif
349
0
  return 1;
350
0
}
351
352
// MARK - Signal Container
353
354
static ccv_nnc_signal_handler_t* const _ccv_nnc_signal_container_get_handler(ccv_nnc_stream_context_t* const stream)
355
929
{
356
929
  if (!stream->container)
357
226
  {
358
226
    ccv_nnc_signal_container_t* const container = stream->container = (ccv_nnc_signal_container_t*)ccmalloc(sizeof(ccv_nnc_signal_container_t));
359
226
    container->empty = ccv_array_new(sizeof(ccv_nnc_signal_handler_t*), 0, 0);
360
226
    pthread_mutex_init(&container->mutex, 0);
361
226
  }
362
929
  ccv_nnc_signal_container_t* const container = stream->container;
363
929
  ccv_nnc_signal_handler_t* handler;
364
929
  pthread_mutex_lock(&container->mutex);
365
929
  if (container->empty->rnum > 0)
366
699
  {
367
699
    handler = *(ccv_nnc_signal_handler_t**)ccv_array_get(container->empty, container->empty->rnum - 1);
368
699
    --container->empty->rnum;
369
699
  } else {
370
230
    handler = (ccv_nnc_signal_handler_t*)ccmalloc(sizeof(ccv_nnc_signal_handler_t));
371
230
    handler->container = container;
372
230
    handler->signal = ccv_nnc_stream_signal_new(ccv_nnc_stream_context_type(stream));
373
230
  }
374
929
  pthread_mutex_unlock(&container->mutex);
375
929
  return handler;
376
929
}
377
378
static void _ccv_nnc_stream_signal_callback(void* const callback_context)
379
929
{
380
929
  ccv_nnc_signal_handler_t* const handler = (ccv_nnc_signal_handler_t*)callback_context;
381
929
  ccv_nnc_signal_container_t* const container = handler->container;
382
929
  pthread_mutex_lock(&container->mutex);
383
929
  ccv_array_push(container->empty, &handler);
384
929
  handler->in_use = 0;
385
929
  pthread_mutex_unlock(&container->mutex);
386
929
}
387
388
ccv_nnc_stream_signal_t* ccv_nnc_stream_context_emit_signal_new(ccv_nnc_stream_context_t* const stream)
389
929
{
390
929
  ccv_nnc_signal_handler_t* const handler = _ccv_nnc_signal_container_get_handler(stream);
391
929
  ccv_nnc_stream_context_emit_signal(stream, handler->signal);
392
929
  handler->in_use = 1;
393
929
  // Because the callback only handles CPU things, we can avoid another async jump because no CUDA related stuff touched.
394
929
  _ccv_nnc_stream_context_add_callback(stream, _ccv_nnc_stream_signal_callback, _ccv_nnc_sync_dispatch, handler);
395
929
  return handler->in_use ? 
handler->signal656
:
0273
;
396
929
}
397
398
ccv_nnc_stream_signal_t* ccv_nnc_stream_context_checkpoint(ccv_nnc_stream_context_t* const stream)
399
396
{
400
396
  return stream->checkpoint;
401
396
}
402
403
void ccv_nnc_stream_context_set_checkpoint(ccv_nnc_stream_context_t* const stream, ccv_nnc_stream_signal_t* const checkpoint)
404
1.30k
{
405
1.30k
  stream->checkpoint = checkpoint;
406
1.30k
}