Coverage Report

Created: 2021-09-30 21:42

/home/liu/buildslave/linux-x64-runtests/build/lib/nnc/ccv_nnc_stream.c
Line
Count
Source (jump to first uncovered line)
1
#include "ccv_nnc.h"
2
#include "ccv_nnc_internal.h"
3
#include "ccv_nnc_easy.h"
4
#include "co.h"
5
#ifdef HAVE_CUDA
6
#include "gpu/ccv_nnc_compat.h"
7
#endif
8
#ifdef USE_DISPATCH
9
#include <dispatch/dispatch.h>
10
#endif
11
#include "_ccv_nnc_stream.h"
12
13
typedef struct {
14
  ccv_nnc_stream_context_t super;
15
  // Left for implementation yet, the CPU support for stream context.
16
  size_t workspace_size;
17
  void* workspace;
18
} ccv_nnc_stream_cpu_t;
19
20
typedef struct {
21
  ccv_nnc_stream_context_destructor_f destructor_hook;
22
  void* context;
23
} ccv_nnc_stream_destructor_hook_t;
24
25
ccv_nnc_stream_context_t* ccv_nnc_stream_context_new(const int type)
26
1.18k
{
27
1.18k
  ccv_nnc_stream_cpu_t* const stream_cpu = (ccv_nnc_stream_cpu_t*)cccalloc(1, sizeof(ccv_nnc_stream_cpu_t));
28
1.18k
  stream_cpu->super.type = type;
29
1.18k
  stream_cpu->super.reuse_destructor_hook = -1;
30
1.18k
  stream_cpu->super.destructor_hooks = 0;
31
1.18k
  stream_cpu->workspace_size = 0;
32
1.18k
  stream_cpu->workspace = 0;
33
1.18k
#ifdef HAVE_CUDA
34
1.18k
  if (CCV_STREAM_GET_CONTEXT(type) == CCV_STREAM_CONTEXT_GPU)
35
1.05k
    return ccv_nnc_init_stream_context((ccv_nnc_stream_context_t*)stream_cpu);
36
125
#endif
37
125
  return (ccv_nnc_stream_context_t*)stream_cpu;
38
125
}
39
40
CCV_WARN_UNUSED(int) ccv_nnc_stream_context_type(const ccv_nnc_stream_context_t* const stream_context)
41
49.3k
{
42
49.3k
  return stream_context->type;
43
49.3k
}
44
45
#ifndef HAVE_CUDA
46
static __thread ccv_nnc_stream_cpu_t ccv_nnc_per_thread_stream_cpu = {
47
  .super = {
48
    .type = CCV_STREAM_CONTEXT_CPU,
49
  },
50
};
51
#endif
52
53
void* ccv_nnc_stream_context_get_workspace(ccv_nnc_stream_context_t* const stream_context, const size_t workspace_size, const int mem)
54
14.9k
{
55
14.9k
#ifdef HAVE_CUDA
56
14.9k
  return ccv_nnc_stream_compat_get_workspace(stream_context, workspace_size, mem);
57
#else
58
  ccv_nnc_stream_cpu_t* stream_cpu = (ccv_nnc_stream_cpu_t*)stream_context;
59
  if (!stream_cpu)
60
    stream_cpu = &ccv_nnc_per_thread_stream_cpu;
61
  assert(mem == CCV_TENSOR_CPU_MEMORY);
62
  if (stream_cpu->workspace_size >= workspace_size)
63
    return stream_cpu->workspace;
64
  stream_cpu->workspace_size = workspace_size;
65
  if (stream_cpu->workspace)
66
    ccfree(stream_cpu->workspace);
67
  stream_cpu->workspace = 0;
68
  ccmemalign(&stream_cpu->workspace, 16, workspace_size);
69
  return stream_cpu->workspace;
70
#endif
71
14.9k
}
72
73
void ccv_nnc_stream_context_drain(ccv_nnc_stream_context_t* const stream_context)
74
99.8k
{
75
99.8k
#ifdef HAVE_CUDA
76
99.8k
  ccv_nnc_stream_compat_drain(stream_context);
77
#else
78
  ccv_nnc_stream_cpu_t* stream_cpu = (ccv_nnc_stream_cpu_t*)stream_context;
79
  if (!stream_cpu)
80
    stream_cpu = &ccv_nnc_per_thread_stream_cpu;
81
  if (stream_cpu->workspace)
82
  {
83
    ccfree(stream_cpu->workspace);
84
    stream_cpu->workspace = 0;
85
    stream_cpu->workspace_size = 0;
86
  }
87
#endif
88
99.8k
}
89
90
static void _ccv_nnc_stream_context_add_callback(ccv_nnc_stream_context_t* const stream_context, const ccv_nnc_callback_f callback, const ccv_nnc_async_callback_f async_callback, void* const callback_context)
91
204
{
92
204
#ifdef HAVE_CUDA
93
204
  if (CCV_STREAM_GET_CONTEXT(stream_context->type) == CCV_STREAM_CONTEXT_GPU)
94
204
    ccv_nnc_stream_compat_add_callback(stream_context, callback, async_callback, callback_context);
95
0
  else
96
0
    callback(callback_context);
97
#else
98
  callback(callback_context);
99
#endif
100
204
}
101
102
static void _ccv_nnc_sync_dispatch(ccv_nnc_async_callback_t* const async)
103
201
{
104
201
  async->fn(async->callback_context);
105
201
  ccfree(async);
106
201
}
107
108
#ifndef USE_DISPATCH
109
static void* _ccv_nnc_pthread_dispatch(void* const userdata)
110
{
111
  _ccv_nnc_sync_dispatch((ccv_nnc_async_callback_t*)userdata);
112
  return 0;
113
}
114
#endif
115
116
static void _ccv_nnc_async_dispatch(ccv_nnc_async_callback_t* const async)
117
201
{
118
201
  // This method dispatches to a different thread because the CUDA callback thread cannot operate CUDA objects.
119
201
#ifdef USE_DISPATCH
120
201
  dispatch_async_f(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), async, (dispatch_function_t)_ccv_nnc_sync_dispatch);
121
#else
122
  pthread_t thread;
123
  pthread_create(&thread, 0, _ccv_nnc_pthread_dispatch, async);
124
#endif
125
201
}
126
127
0
static co_decl_task(_ccv_nnc_stream_context_add_callback_async, (ccv_nnc_stream_context_t* const stream_context, const ccv_nnc_callback_f callback, void* const callback_context), private())
128
0
{
129
0
  co_stream_await(CO_P(stream_context));
130
0
  _ccv_nnc_stream_context_add_callback(CO_P(stream_context), CO_P(callback), _ccv_nnc_async_dispatch, CO_P(callback_context));
131
0
} co_end()
132
133
void ccv_nnc_stream_context_add_callback(ccv_nnc_stream_context_t* const stream_context, const ccv_nnc_callback_f callback, void* const callback_context)
134
204
{
135
204
  if (!stream_context)
136
0
  {
137
0
    callback(callback_context);
138
0
    return;
139
0
  }
140
204
  co_scheduler_t* const scheduler = stream_context->scheduler;
141
204
  if (scheduler && co_scheduler_is_active(scheduler))
142
0
  {
143
0
    co_routine_t* const task = co_new(_ccv_nnc_stream_context_add_callback_async, (stream_context, callback, callback_context));
144
0
    co_schedule(scheduler, task);
145
0
  } else
146
204
    _ccv_nnc_stream_context_add_callback(stream_context, callback, _ccv_nnc_async_dispatch, callback_context);
147
204
}
148
149
void ccv_nnc_stream_context_wait(const ccv_nnc_stream_context_t* const stream_context)
150
26.0k
{
151
26.0k
  if (!stream_context)
152
0
    return;
153
26.0k
  co_scheduler_t* const scheduler = stream_context->scheduler;
154
26.0k
  if (scheduler && 
!co_is_on_scheduler(scheduler)25.9k
) // First wait the scheduler to finish if I am not currently on that scheduler.
155
25.9k
  {
156
25.9k
    pthread_mutex_lock(&scheduler->mutex);
157
25.9k
    while (scheduler->active)
158
2
      pthread_cond_wait(&scheduler->notify, &scheduler->mutex);
159
25.9k
    pthread_mutex_unlock(&scheduler->mutex);
160
25.9k
  }
161
26.0k
#ifdef HAVE_CUDA
162
26.0k
  if (CCV_STREAM_GET_CONTEXT(stream_context->type) == CCV_STREAM_CONTEXT_GPU)
163
790
    ccv_nnc_synchronize_stream_context(stream_context);
164
26.0k
#endif
165
26.0k
}
166
167
int ccv_nnc_stream_context_add_destructor_hook(ccv_nnc_stream_context_t* const stream, ccv_nnc_stream_context_destructor_f destructor, void* const context)
168
24
{
169
24
  ccv_nnc_stream_destructor_hook_t hook = {
170
24
    .destructor_hook = destructor,
171
24
    .context = context
172
24
  };
173
24
  if (stream->reuse_destructor_hook >= 0)
174
0
  {
175
0
    assert(stream->destructor_hooks);
176
0
    const int reuse_destructor_hook = stream->reuse_destructor_hook;
177
0
    assert(reuse_destructor_hook < stream->destructor_hooks->rnum);
178
0
    *(ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream->destructor_hooks, reuse_destructor_hook) = hook;
179
0
    int i;
180
0
    stream->reuse_destructor_hook = -1;
181
0
    for (i = reuse_destructor_hook + 1; i < stream->destructor_hooks->rnum && stream->reuse_destructor_hook < 0; i++)
182
0
      if (!((ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream->destructor_hooks, i))->destructor_hook)
183
0
        stream->reuse_destructor_hook = i;
184
0
    return reuse_destructor_hook;
185
24
  } else {
186
24
    if (!stream->destructor_hooks)
187
12
      stream->destructor_hooks = ccv_array_new(sizeof(ccv_nnc_stream_destructor_hook_t), 1, 0);
188
24
    ccv_array_push(stream->destructor_hooks, &hook);
189
24
    return stream->destructor_hooks->rnum - 1;
190
24
  }
191
24
}
192
193
void ccv_nnc_stream_context_remove_destructor_hook(ccv_nnc_stream_context_t* const stream, const int hook_id)
194
18
{
195
18
  assert(hook_id >= 0);
196
18
  assert(hook_id < stream->destructor_hooks->rnum);
197
18
  ccv_nnc_stream_destructor_hook_t* const hook = (ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream->destructor_hooks, hook_id);
198
18
  hook->destructor_hook = 0;
199
18
  hook->context = 0;
200
18
  int i;
201
36
  for (i = stream->destructor_hooks->rnum - 1; i >= 0; 
i--18
)
202
26
    if (((ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream->destructor_hooks, i))->destructor_hook)
203
8
    {
204
8
      stream->destructor_hooks->rnum = i + 1;
205
8
      break;
206
8
    }
207
18
  if (hook_id < stream->destructor_hooks->rnum &&
208
18
    
(17
hook_id < stream->reuse_destructor_hook17
||
stream->reuse_destructor_hook < 017
))
209
11
    stream->reuse_destructor_hook = hook_id;
210
7
  else if (stream->reuse_destructor_hook >= stream->destructor_hooks->rnum)
211
1
    stream->reuse_destructor_hook = -1;
212
18
}
213
214
void ccv_nnc_stream_context_free(ccv_nnc_stream_context_t* const stream_context)
215
1.18k
{
216
1.18k
  if (stream_context->destructor_hooks)
217
12
  {
218
12
    int i;
219
34
    for (i = 0; i < stream_context->destructor_hooks->rnum; 
i++22
)
220
22
    {
221
22
      ccv_nnc_stream_destructor_hook_t* const hook = (ccv_nnc_stream_destructor_hook_t*)ccv_array_get(stream_context->destructor_hooks, i);
222
22
      if (hook->destructor_hook)
223
6
        hook->destructor_hook(stream_context, hook->context);
224
22
    }
225
12
    ccv_array_free(stream_context->destructor_hooks);
226
12
  }
227
1.18k
#ifdef HAVE_CUDA
228
1.18k
  if (CCV_STREAM_GET_CONTEXT(stream_context->type) == CCV_STREAM_CONTEXT_GPU)
229
1.05k
    ccv_nnc_deinit_stream_context(stream_context);
230
125
  else {
231
125
#endif
232
125
  ccv_nnc_stream_cpu_t* stream_cpu = (ccv_nnc_stream_cpu_t*)stream_context;
233
125
  if (stream_cpu->workspace)
234
125
    
ccfree0
(stream_cpu->workspace)0
;
235
125
#ifdef HAVE_CUDA
236
125
  }
237
1.18k
#endif
238
1.18k
  if (stream_context->scheduler)
239
99
  {
240
99
    co_scheduler_t* const scheduler = stream_context->scheduler;
241
99
    co_scheduler_free(scheduler);
242
99
  }
243
1.18k
  if (stream_context->event)
244
290
    ccv_nnc_stream_signal_free(stream_context->event);
245
1.18k
  ccfree(stream_context);
246
1.18k
}
247
248
void ccv_nnc_stream_context_set_neighbor_discovery(ccv_nnc_stream_context_t* const stream_context, ccv_nnc_stream_context_neighbor_discovery_f discovery, void* const context)
249
222k
{
250
222k
  stream_context->neighbor_discovery = discovery;
251
222k
  stream_context->neighbor_discovery_context = context;
252
222k
}
253
254
ccv_nnc_stream_context_t* ccv_nnc_stream_context_find_neighbor(ccv_nnc_stream_context_t* const stream_context, const int device_id)
255
14.1k
{
256
14.1k
  if (stream_context->neighbor_discovery)
257
13.9k
    return stream_context->neighbor_discovery(device_id, stream_context->neighbor_discovery_context);
258
204
  return 0;
259
204
}
260
261
ccv_nnc_stream_signal_t* ccv_nnc_stream_signal_new(const int type)
262
1.80k
{
263
1.80k
  ccv_nnc_stream_signal_t* const signal = (ccv_nnc_stream_signal_t*)ccmalloc(sizeof(ccv_nnc_stream_signal_t));
264
1.80k
  signal->type = type;
265
1.80k
  signal->emit_context = 0;
266
1.80k
#ifdef HAVE_CUDA
267
1.80k
  if (CCV_STREAM_GET_CONTEXT(type) == CCV_STREAM_CONTEXT_GPU)
268
1.59k
    return ccv_nnc_init_stream_signal(signal);
269
209
#endif
270
209
  return signal;
271
209
}
272
273
CCV_WARN_UNUSED(int) ccv_nnc_stream_signal_type(const ccv_nnc_stream_signal_t* const signal)
274
308
{
275
308
  return signal->type;
276
308
}
277
278
void ccv_nnc_stream_context_emit_signal(ccv_nnc_stream_context_t* const stream, ccv_nnc_stream_signal_t* const signal)
279
58.3k
{
280
58.3k
  signal->emit_context = stream;
281
58.3k
#ifdef HAVE_CUDA
282
58.3k
  if (CCV_STREAM_GET_CONTEXT(signal->type) == CCV_STREAM_CONTEXT_GPU)
283
30.2k
    ccv_nnc_stream_compat_emit_signal(stream, signal);
284
58.3k
#endif
285
58.3k
}
286
287
ccv_nnc_stream_context_t* ccv_nnc_stream_signal_get_emitter(const ccv_nnc_stream_signal_t* const signal)
288
7
{
289
7
  return signal->emit_context;
290
7
}
291
292
void ccv_nnc_stream_context_wait_signal(const ccv_nnc_stream_context_t* const stream, const ccv_nnc_stream_signal_t* const signal)
293
115k
{
294
115k
#ifdef HAVE_CUDA
295
115k
  if (CCV_STREAM_GET_CONTEXT(signal->type) == CCV_STREAM_CONTEXT_GPU)
296
82.5k
    ccv_nnc_stream_compat_wait_signal(stream, signal);
297
115k
#endif
298
115k
}
299
300
void ccv_nnc_stream_signal_free(ccv_nnc_stream_signal_t* const signal)
301
1.80k
{
302
1.80k
#ifdef HAVE_CUDA
303
1.80k
  if (CCV_STREAM_GET_CONTEXT(signal->type) == CCV_STREAM_CONTEXT_GPU)
304
1.59k
    ccv_nnc_deinit_stream_signal(signal);
305
1.80k
#endif
306
1.80k
  ccfree(signal);
307
1.80k
}
308
309
int ccv_nnc_device_count(const int type)
310
25
{
311
25
#ifdef HAVE_CUDA
312
25
  if (CCV_STREAM_GET_CONTEXT(type) == CCV_STREAM_CONTEXT_GPU)
313
25
    return ccv_nnc_gpu_device_count();
314
#else
315
  if (CCV_STREAM_GET_CONTEXT(type) == CCV_STREAM_CONTEXT_GPU)
316
    return 0;
317
#endif
318
0
  return 1; // I don't get core count for CPU yet.
319
0
}
320
321
co_scheduler_t* ccv_nnc_stream_context_get_scheduler(ccv_nnc_stream_context_t* const stream_context)
322
26.3k
{
323
26.3k
  co_scheduler_t* scheduler = stream_context->scheduler;
324
26.3k
  if (!scheduler)
325
495
    stream_context->scheduler = scheduler = co_scheduler_new();
326
26.3k
  return scheduler;
327
26.3k
}
328
329
int _co_stream_await(co_routine_t* const self, ccv_nnc_stream_context_t* const stream)
330
14
{
331
14
  if (!stream)
332
0
    return 1;
333
14
#ifdef HAVE_CUDA
334
14
  if (CCV_STREAM_GET_CONTEXT(stream->type) == CCV_STREAM_CONTEXT_GPU)
335
14
    return co_stream_compat_await(self, stream);
336
0
#endif
337
0
  return 1;
338
0
}
339
340
// MARK - Signal Container
341
342
ccv_nnc_stream_signal_t* ccv_nnc_stream_context_emit_signal_new(ccv_nnc_stream_context_t* const stream)
343
1.58k
{
344
1.58k
  /**
345
1.58k
   * We don't need complex containers for this. Based on CUDA documentation, Record will record the
346
1.58k
   * most recent ones, and capture will use the most recent ones. Thus, even if we reuse the same event
347
1.58k
   * again and again and again, as long as we emit and immediate wait, we won't have any problems.
348
1.58k
   */
349
1.58k
  if (!stream->event)
350
290
    stream->event = ccv_nnc_stream_signal_new(ccv_nnc_stream_context_type(stream));
351
1.58k
  ccv_nnc_stream_context_emit_signal(stream, stream->event);
352
1.58k
  return stream->event;
353
1.58k
}