Coverage Report

Created: 2019-07-03 22:50

/home/liu/buildslave/linux-x64-runtests/build/lib/nnc/ccv_nnc_stream.c
Line
Count
Source (jump to first uncovered line)
1
#include "ccv_nnc.h"
2
#include "ccv_nnc_internal.h"
3
#include "ccv_nnc_easy.h"
4
#ifdef HAVE_CUDA
5
#include "gpu/ccv_nnc_compat.h"
6
#endif
7
#include "_ccv_nnc_stream.h"
8
#include "3rdparty/valgrind/valgrind.h"
9
10
typedef struct {
11
  ccv_nnc_stream_context_t super;
12
  // Left for implementation yet, the CPU support for stream context.
13
  size_t workspace_size;
14
  void* workspace;
15
} ccv_nnc_stream_cpu_t;
16
17
ccv_nnc_stream_context_t* ccv_nnc_stream_context_new(const int type)
18
363
{
19
363
  ccv_nnc_stream_cpu_t* const stream_cpu = (ccv_nnc_stream_cpu_t*)cccalloc(1, sizeof(ccv_nnc_stream_cpu_t));
20
363
  stream_cpu->super.type = type;
21
363
  stream_cpu->super.signal_container = kh_init(signal_container);
22
363
  stream_cpu->workspace_size = 0;
23
363
  stream_cpu->workspace = 0;
24
363
#ifdef HAVE_CUDA
25
363
  if (CCV_STREAM_GET_CONTEXT(type) == CCV_STREAM_CONTEXT_GPU)
26
300
    return ccv_nnc_init_stream_context((ccv_nnc_stream_context_t*)stream_cpu);
27
63
#endif
28
63
  return (ccv_nnc_stream_context_t*)stream_cpu;
29
63
}
30
31
CCV_WARN_UNUSED(int) ccv_nnc_stream_context_type(const ccv_nnc_stream_context_t* const stream_context)
32
1.21M
{
33
1.21M
  return stream_context->type;
34
1.21M
}
35
36
#ifndef HAVE_CUDA
37
static __thread ccv_nnc_stream_cpu_t ccv_nnc_per_thread_stream_cpu = {
38
  .super = {
39
    .type = CCV_STREAM_CONTEXT_CPU,
40
  },
41
};
42
#endif
43
44
void* ccv_nnc_stream_context_get_workspace(ccv_nnc_stream_context_t* const stream_context, const size_t workspace_size, const int mem)
45
316k
{
46
316k
#ifdef HAVE_CUDA
47
316k
  return ccv_nnc_stream_compat_get_workspace(stream_context, workspace_size, mem);
48
#else
49
  ccv_nnc_stream_cpu_t* stream_cpu = (ccv_nnc_stream_cpu_t*)stream_context;
50
  if (!stream_cpu)
51
    stream_cpu = &ccv_nnc_per_thread_stream_cpu;
52
  assert(mem == CCV_TENSOR_CPU_MEMORY);
53
  if (stream_cpu->workspace_size >= workspace_size)
54
    return stream_cpu->workspace;
55
  stream_cpu->workspace_size = workspace_size;
56
  if (stream_cpu->workspace)
57
    ccfree(stream_cpu->workspace);
58
  stream_cpu->workspace = 0;
59
  ccmemalign(&stream_cpu->workspace, 16, workspace_size);
60
  return stream_cpu->workspace;
61
#endif
62
}
63
64
void ccv_nnc_stream_context_drain(ccv_nnc_stream_context_t* const stream_context)
65
94.4k
{
66
94.4k
#ifdef HAVE_CUDA
67
94.4k
  ccv_nnc_stream_compat_drain(stream_context);
68
#else
69
  ccv_nnc_stream_cpu_t* stream_cpu = (ccv_nnc_stream_cpu_t*)stream_context;
70
  if (!stream_cpu)
71
    stream_cpu = &ccv_nnc_per_thread_stream_cpu;
72
  if (stream_cpu->workspace)
73
  {
74
    ccfree(stream_cpu->workspace);
75
    stream_cpu->workspace = 0;
76
    stream_cpu->workspace_size = 0;
77
  }
78
#endif
79
}
80
81
void ccv_nnc_stream_context_wait(const ccv_nnc_stream_context_t* const stream_context)
82
3.46k
{
83
3.46k
  if (!stream_context)
84
0
    return;
85
3.46k
  ccv_nnc_stream_scheduler_t* const scheduler = stream_context->scheduler;
86
3.46k
  if (scheduler) // First wait the scheduler to finish.
87
3.43k
  {
88
3.43k
    pthread_mutex_lock(&scheduler->mutex);
89
3.44k
    while (scheduler->active)
90
2
      pthread_cond_wait(&scheduler->notify, &scheduler->mutex);
91
3.43k
    pthread_mutex_unlock(&scheduler->mutex);
92
3.43k
  }
93
3.46k
#ifdef HAVE_CUDA
94
3.46k
  if (CCV_STREAM_GET_CONTEXT(stream_context->type) == CCV_STREAM_CONTEXT_GPU)
95
3.46k
    ccv_nnc_synchronize_stream_context(stream_context);
96
3.46k
#endif
97
3.46k
}
98
99
void ccv_nnc_stream_context_free(ccv_nnc_stream_context_t* const stream_context)
100
363
{
101
363
#ifdef HAVE_CUDA
102
363
  if (CCV_STREAM_GET_CONTEXT(stream_context->type) == CCV_STREAM_CONTEXT_GPU)
103
300
    ccv_nnc_deinit_stream_context(stream_context);
104
#else
105
  ccv_nnc_stream_cpu_t* stream_cpu = (ccv_nnc_stream_cpu_t*)stream_context;
106
  if (stream_cpu->workspace)
107
    ccfree(stream_cpu->workspace);
108
#endif
109
363
  if (stream_context->scheduler)
110
11
  {
111
11
    ccv_nnc_stream_scheduler_t* const scheduler = stream_context->scheduler;
112
11
    pthread_mutex_destroy(&scheduler->mutex);
113
11
    pthread_cond_destroy(&scheduler->notify);
114
11
    pthread_cond_destroy(&scheduler->wait);
115
11
    if (scheduler->empty_tasks)
116
11
    {
117
11
      int i;
118
26
      for (i = 0; i < scheduler->empty_tasks->rnum; 
i++15
)
119
15
      {
120
15
        ccv_nnc_stream_task_t* const task = *(ccv_nnc_stream_task_t**)ccv_array_get(scheduler->empty_tasks, i);
121
15
        ccfree(task->stack);
122
15
        ccfree(task);
123
15
      }
124
11
      ccv_array_free(scheduler->empty_tasks);
125
11
    }
126
11
    ccfree(scheduler);
127
11
  }
128
363
  khash_t(signal_container)* const signal_container = stream_context->signal_container;
129
363
  khiter_t k;
130
395
  for (k = 
kh_begin363
(signal_container); k != kh_end(signal_container);
++k32
)
131
32
  {
132
32
    if (!kh_exist(signal_container, k))
133
32
      
continue24
;
134
8
    ccv_nnc_stream_signal_t* const signal = kh_val(signal_container, k);
135
8
    ccv_nnc_stream_signal_free(signal);
136
8
  }
137
363
  kh_destroy(signal_container, signal_container);
138
363
  ccfree(stream_context);
139
363
}
140
141
void ccv_nnc_stream_context_set_neighbor_discovery(ccv_nnc_stream_context_t* const stream_context, ccv_nnc_stream_context_neighbor_discovery_f discovery, void* const context)
142
1.38M
{
143
1.38M
  stream_context->neighbor_discovery = discovery;
144
1.38M
  stream_context->neighbor_discovery_context = context;
145
1.38M
}
146
147
ccv_nnc_stream_context_t* ccv_nnc_stream_context_find_neighbor(ccv_nnc_stream_context_t* const stream_context, const int device_id)
148
356k
{
149
356k
  if (stream_context->neighbor_discovery)
150
356k
    return stream_context->neighbor_discovery(device_id, stream_context->neighbor_discovery_context);
151
0
  return 0;
152
0
}
153
154
ccv_nnc_stream_signal_t* ccv_nnc_stream_signal_new(const int type)
155
547
{
156
547
  ccv_nnc_stream_signal_t* const signal = (ccv_nnc_stream_signal_t*)ccmalloc(sizeof(ccv_nnc_stream_signal_t));
157
547
  signal->type = type;
158
547
  signal->emit_context = 0;
159
547
#ifdef HAVE_CUDA
160
547
  if (CCV_STREAM_GET_CONTEXT(type) == CCV_STREAM_CONTEXT_GPU)
161
454
    return ccv_nnc_init_stream_signal(signal);
162
93
#endif
163
93
  return signal;
164
93
}
165
166
void ccv_nnc_stream_context_emit_signal(ccv_nnc_stream_context_t* const stream, ccv_nnc_stream_signal_t* const signal)
167
617k
{
168
617k
  signal->emit_context = stream;
169
617k
#ifdef HAVE_CUDA
170
617k
  if (CCV_STREAM_GET_CONTEXT(signal->type) == CCV_STREAM_CONTEXT_GPU)
171
617k
    ccv_nnc_stream_compat_emit_signal(stream, signal);
172
617k
#endif
173
617k
}
174
175
ccv_nnc_stream_context_t* ccv_nnc_stream_signal_get_emitter(const ccv_nnc_stream_signal_t* const signal)
176
7
{
177
7
  return signal->emit_context;
178
7
}
179
180
void ccv_nnc_stream_context_wait_signal(const ccv_nnc_stream_context_t* const stream, const ccv_nnc_stream_signal_t* const signal)
181
1.79M
{
182
1.79M
#ifdef HAVE_CUDA
183
1.79M
  if (CCV_STREAM_GET_CONTEXT(signal->type) == CCV_STREAM_CONTEXT_GPU)
184
1.79M
    ccv_nnc_stream_compat_wait_signal(stream, signal);
185
1.79M
#endif
186
1.79M
}
187
188
ccv_nnc_stream_signal_t* ccv_nnc_stream_context_get_signal(ccv_nnc_stream_context_t* const stream, const int64_t identifier)
189
3.43k
{
190
3.43k
  khash_t(signal_container)* const signal_container = stream->signal_container;
191
3.43k
  int ret = 0;
192
3.43k
  khiter_t k = kh_put(signal_container, signal_container, identifier, &ret);
193
3.43k
  assert(ret >= 0);
194
3.43k
  // If ret == 0, the key already exist, we can get the columns directly, otherwise, create and assign back.
195
3.43k
  ccv_nnc_stream_signal_t* const signal = (ret == 0) ? 
kh_val3.42k
(signal_container, k) :
ccv_nnc_stream_signal_new(stream->type)8
;
196
3.43k
  if (ret != 0)
197
3.43k
    
kh_val8
(signal_container, k) = signal8
;
198
3.43k
  return signal;
199
3.43k
}
200
201
void ccv_nnc_stream_signal_free(ccv_nnc_stream_signal_t* const signal)
202
547
{
203
547
#ifdef HAVE_CUDA
204
547
  if (CCV_STREAM_GET_CONTEXT(signal->type) == CCV_STREAM_CONTEXT_GPU)
205
454
    ccv_nnc_deinit_stream_signal(signal);
206
547
#endif
207
547
  ccfree(signal);
208
547
}
209
210
int ccv_nnc_device_count(const int type)
211
10
{
212
10
#ifdef HAVE_CUDA
213
10
  if (CCV_STREAM_GET_CONTEXT(type) == CCV_STREAM_CONTEXT_GPU)
214
10
    return ccv_nnc_gpu_device_count();
215
0
#endif
216
0
  return 1; // I don't get core count for CPU yet.
217
0
}
218
219
ccv_nnc_stream_scheduler_t* ccv_nnc_stream_context_get_scheduler(ccv_nnc_stream_context_t* const stream_context)
220
3.43k
{
221
3.43k
  ccv_nnc_stream_scheduler_t* scheduler = stream_context->scheduler;
222
3.43k
  if (!scheduler)
223
11
  {
224
11
    stream_context->scheduler = scheduler = (ccv_nnc_stream_scheduler_t*)cccalloc(1, sizeof(ccv_nnc_stream_scheduler_t));
225
11
    pthread_mutex_init(&scheduler->mutex, 0);
226
11
    pthread_cond_init(&scheduler->notify, 0);
227
11
    pthread_cond_init(&scheduler->wait, 0);
228
11
  }
229
3.43k
  return scheduler;
230
3.43k
}
231
232
void ccv_nnc_stream_scheduler_prepend_task(ccv_nnc_stream_scheduler_t* const scheduler, ccv_nnc_stream_task_t* const task)
233
17
{
234
17
  if (scheduler->head)
235
0
  {
236
0
    scheduler->head->prev = task;
237
0
    task->next = scheduler->head;
238
17
  } else {
239
17
    scheduler->tail = task;
240
17
    task->next = 0;
241
17
  }
242
17
  scheduler->head = task;
243
17
  task->prev = 0;
244
17
}
245
246
void ccv_nnc_stream_scheduler_append_task(ccv_nnc_stream_scheduler_t* const scheduler, ccv_nnc_stream_task_t* const task)
247
3.43k
{
248
3.43k
  if (scheduler->tail)
249
0
  {
250
0
    scheduler->tail->next = task;
251
0
    task->prev = scheduler->tail;
252
3.43k
  } else {
253
3.43k
    scheduler->head = task;
254
3.43k
    task->prev = 0;
255
3.43k
  }
256
3.43k
  scheduler->tail = task;
257
3.43k
  task->next = 0;
258
3.43k
}
259
260
static void _ccv_nnc_stream_scheduler_delete_task(ccv_nnc_stream_scheduler_t* const scheduler, ccv_nnc_stream_task_t* const task)
261
3.45k
{
262
3.45k
  if (task->prev)
263
0
    task->prev->next = task->next;
264
3.45k
  else
265
3.45k
    scheduler->head = task->next;
266
3.45k
  if (task->next)
267
0
    task->next->prev = task->prev;
268
3.45k
  else
269
3.45k
    scheduler->tail = task->prev;
270
3.45k
}
271
272
static void _ccv_nnc_stream_task_done(ccv_nnc_stream_task_t* const task)
273
3.44k
{
274
3.44k
  if (task->notify)
275
4
  {
276
4
    ccv_nnc_stream_task_t* const notify = task->notify;
277
4
    task->notify = 0;
278
4
    ccv_nnc_stream_scheduler_prepend_task(task->super, notify);
279
4
    int i;
280
4
    const int other_size = notify->other_size;
281
4
    notify->other_size = 0;
282
4
    ccv_nnc_stream_task_t* const* const others = notify->others;
283
4
    notify->others = 0;
284
8
    for (i = 0; i < other_size; 
i++4
)
285
4
      if (others[i] != task)
286
0
      {
287
0
        assert(others[i]->notify == notify);
288
0
        others[i]->notify = 0;
289
0
      }
290
4
  }
291
3.44k
  ccv_nnc_stream_scheduler_t* const scheduler = task->super;
292
3.44k
  if (!scheduler->empty_tasks)
293
11
    scheduler->empty_tasks = ccv_array_new(sizeof(ccv_nnc_stream_task_t*), 1, 0);
294
3.44k
  ccv_array_push(scheduler->empty_tasks, &task);
295
3.44k
}
296
297
// Second will invoke this blocking variant to schedule task on a newly created thread.
298
static void* _ccv_nnc_stream_schedule_main(void* userdata)
299
2
{
300
2
  ccv_nnc_stream_scheduler_t* const scheduler = (ccv_nnc_stream_scheduler_t*)userdata;
301
2
  pthread_mutex_lock(&scheduler->mutex);
302
2
  for (;;)
303
18
  {
304
18
    if (scheduler->head == 0 && 
scheduler->stream_wait_task_count == 010
)
305
2
    {
306
2
      scheduler->active = 0;
307
2
      pthread_cond_broadcast(&scheduler->notify);
308
2
      pthread_mutex_unlock(&scheduler->mutex);
309
2
      break;
310
2
    }
311
16
    if (scheduler->head == 0)
312
8
    {
313
8
      pthread_cond_wait(&scheduler->wait, &scheduler->mutex);
314
8
      pthread_mutex_unlock(&scheduler->mutex);
315
8
    }
316
16
    ccv_nnc_stream_task_t* const task = scheduler->head;
317
16
    _ccv_nnc_stream_scheduler_delete_task(scheduler, task);
318
16
    pthread_mutex_unlock(&scheduler->mutex);
319
16
    swapcontext(&scheduler->caller, &task->context);
320
16
    task->context = scheduler->callee;
321
16
    pthread_mutex_lock(&scheduler->mutex);
322
16
    if (task->done)
323
6
      _ccv_nnc_stream_task_done(task);
324
16
  }
325
2
  return 0;
326
2
}
327
328
// First will invoke this non-blocking variant to schedule task.
329
static void _ccv_nnc_stream_schedule_try(ccv_nnc_stream_scheduler_t* const scheduler)
330
3.43k
{
331
3.43k
  pthread_mutex_lock(&scheduler->mutex);
332
3.43k
  if (scheduler->active)
333
0
  {
334
0
    pthread_mutex_unlock(&scheduler->mutex);
335
0
    return;
336
0
  }
337
3.43k
  scheduler->active = 1;
338
3.43k
  for (;;)
339
6.87k
  {
340
6.87k
    if (scheduler->head == 0 && 
scheduler->stream_wait_task_count == 03.43k
)
341
3.43k
    {
342
3.43k
      scheduler->active = 0;
343
3.43k
      pthread_mutex_unlock(&scheduler->mutex);
344
3.43k
      break;
345
3.43k
    }
346
3.44k
    if (scheduler->head == 0)
347
2
    {
348
2
      // Launch a thread to continue the execution.
349
2
      pthread_create(&scheduler->thread, 0, _ccv_nnc_stream_schedule_main, scheduler);
350
2
      pthread_mutex_unlock(&scheduler->mutex);
351
2
      break;
352
2
    }
353
3.43k
    ccv_nnc_stream_task_t* const task = scheduler->head;
354
3.43k
    _ccv_nnc_stream_scheduler_delete_task(scheduler, task);
355
3.43k
    pthread_mutex_unlock(&scheduler->mutex);
356
3.43k
    swapcontext(&scheduler->caller, &task->context);
357
3.43k
    task->context = scheduler->callee;
358
3.43k
    pthread_mutex_lock(&scheduler->mutex);
359
3.43k
    if (task->done)
360
3.43k
      _ccv_nnc_stream_task_done(task);
361
3.43k
  }
362
3.43k
}
363
364
void ccv_nnc_stream_schedule_task(ccv_nnc_stream_scheduler_t* const scheduler, ccv_nnc_stream_task_t* const task)
365
3.43k
{
366
3.43k
  int activate_scheduler = 0;
367
3.43k
  pthread_mutex_lock(&scheduler->mutex);
368
3.43k
  // Append to the end, for swap tasks, they all prepend. Thus, this ensures all tasks scheduled this way will be executed later.
369
3.43k
  ccv_nnc_stream_scheduler_append_task(scheduler, task);
370
3.43k
  if (!scheduler->active)
371
3.43k
    activate_scheduler = 1;
372
3.43k
  pthread_mutex_unlock(&scheduler->mutex);
373
3.43k
  if (activate_scheduler)
374
3.43k
    _ccv_nnc_stream_schedule_try(scheduler);
375
3.43k
}
376
377
typedef union {
378
  void* ptr;
379
  uint32_t part[2];
380
} ccv_nnc_ptr_splitter_u;
381
382
static void _ccv_nnc_stream_task_entry_point(uint32_t part0, uint32_t part1)
383
3.44k
{
384
3.44k
  const ccv_nnc_ptr_splitter_u p = {
385
3.44k
    .part = {
386
3.44k
      part0, part1
387
3.44k
    }
388
3.44k
  };
389
3.44k
  ccv_nnc_stream_task_t* const task = (ccv_nnc_stream_task_t*)p.ptr;
390
3.44k
  task->func(task, task->userdata);
391
3.44k
  ccv_nnc_stream_scheduler_t* const scheduler = task->super;
392
3.44k
  task->done = 1;
393
3.44k
  swapcontext(&scheduler->callee, &scheduler->caller);
394
3.44k
}
395
396
ccv_nnc_stream_task_t* ccv_nnc_stream_task_new(ccv_nnc_stream_scheduler_t* const scheduler, const ccv_nnc_stream_task_f func, void* const userdata, const size_t userdata_size)
397
3.44k
{
398
3.44k
  ccv_nnc_stream_task_t* task;
399
3.44k
  pthread_mutex_lock(&scheduler->mutex);
400
3.44k
  if (scheduler->empty_tasks && 
scheduler->empty_tasks->rnum3.42k
)
401
3.42k
  {
402
3.42k
    task = *(ccv_nnc_stream_task_t**)ccv_array_get(scheduler->empty_tasks, scheduler->empty_tasks->rnum - 1);
403
3.42k
    --scheduler->empty_tasks->rnum;
404
3.42k
    pthread_mutex_unlock(&scheduler->mutex);
405
3.42k
    if (userdata_size)
406
3.42k
      task->stack = (char*)ccrealloc(task->stack, CCV_NNC_TASK_STACK_SIZE + userdata_size);
407
3.42k
  } else {
408
15
    pthread_mutex_unlock(&scheduler->mutex);
409
15
    task = (ccv_nnc_stream_task_t*)cccalloc(1, sizeof(ccv_nnc_stream_task_t));
410
15
    task->stack = (char*)cccalloc(CCV_NNC_TASK_STACK_SIZE + userdata_size, 1);
411
15
    task->super = scheduler;
412
15
  }
413
3.44k
  task->done = 0;
414
3.44k
  task->func = func;
415
3.44k
  if (userdata_size)
416
3.43k
  {
417
3.43k
    // If the size is available, we copy the userdata over.
418
3.43k
    task->userdata = task->stack + CCV_NNC_TASK_STACK_SIZE;
419
3.43k
    memcpy(task->userdata, userdata, userdata_size);
420
3.43k
  } else
421
6
    task->userdata = userdata;
422
3.44k
  getcontext(&task->context);
423
3.44k
  task->context.uc_stack.ss_sp = task->stack;
424
3.44k
  task->context.uc_stack.ss_size = CCV_NNC_TASK_STACK_SIZE;
425
3.44k
  VALGRIND_STACK_REGISTER(task->stack, task->stack + CCV_NNC_TASK_STACK_SIZE);
426
3.44k
  task->context.uc_link = 0;
427
3.44k
  const ccv_nnc_ptr_splitter_u p = {
428
3.44k
    .ptr = task,
429
3.44k
  };
430
3.44k
  makecontext(&task->context, (void (*)(void))_ccv_nnc_stream_task_entry_point, 2, p.part[0], p.part[1]);;
431
3.44k
  return task;
432
3.44k
}
433
434
void ccv_nnc_stream_task_resume(ccv_nnc_stream_task_t* const task)
435
6
{
436
6
  ccv_nnc_stream_scheduler_t* const scheduler = task->super;
437
6
  ucontext_t old_context = scheduler->caller;
438
6
  swapcontext(&scheduler->caller, &task->context);
439
6
  task->context = scheduler->callee;
440
6
  scheduler->caller = old_context;
441
6
  if (task->done)
442
3
  {
443
3
    pthread_mutex_lock(&scheduler->mutex);
444
3
    _ccv_nnc_stream_task_done(task);
445
3
    pthread_mutex_unlock(&scheduler->mutex);
446
3
  }
447
6
}
448
449
void ccv_nnc_stream_task_synchronize(ccv_nnc_stream_task_t* const self, ccv_nnc_stream_context_t* const stream)
450
21
{
451
21
  if (!stream)
452
0
    return;
453
21
#ifdef HAVE_CUDA
454
21
  if (CCV_STREAM_GET_CONTEXT(stream->type) == CCV_STREAM_CONTEXT_GPU)
455
21
    ccv_nnc_stream_compat_task_synchronize(self, stream);
456
21
#endif
457
21
}
458
459
void ccv_nnc_stream_task_wait_any(ccv_nnc_stream_task_t* const self, ccv_nnc_stream_task_t* const* const others, const int other_size)
460
4
{
461
4
  self->other_size = other_size;
462
4
  self->others = others;
463
4
  int i;
464
8
  for (i = 0; i < other_size; 
i++4
)
465
4
  {
466
4
    assert(others[i]->notify == 0);
467
4
    others[i]->notify = self;
468
4
  }
469
4
  ccv_nnc_stream_scheduler_t* const scheduler = self->super;
470
4
  swapcontext(&scheduler->callee, &scheduler->caller);
471
4
}