Coverage Report

Created: 2021-09-30 21:42

/home/liu/buildslave/linux-x64-runtests/build/lib/nnc/ccv_cnnp_dataframe.c
Line
Count
Source (jump to first uncovered line)
1
#include "ccv_nnc.h"
2
#include "ccv_nnc_easy.h"
3
#include "ccv_nnc_internal.h"
4
#include "ccv_internal.h"
5
#include "_ccv_cnnp_dataframe.h"
6
#include "3rdparty/khash/khash.h"
7
#ifdef HAVE_GSL
8
#include <gsl/gsl_rng.h>
9
#include <gsl/gsl_randist.h>
10
#else
11
#include "3rdparty/sfmt/SFMT.h"
12
#endif
13
14
typedef struct {
15
  ccv_array_t* columns;
16
  int hook_id;
17
} ccv_cnnp_data_ctx_t;
18
19
KHASH_MAP_INIT_INT64(ctx, ccv_cnnp_data_ctx_t)
20
21
struct ccv_cnnp_dataframe_s {
22
  int row_count;
23
  int column_size;
24
  int* shuffled_idx;
25
#ifdef HAVE_GSL
26
  gsl_rng* rng;
27
#else
28
  sfmt_t sfmt;
29
#endif
30
  khash_t(ctx)* data_ctx; // The stream context based cache for data entity of columns. This helps us to avoid allocations when iterate through data.
31
  ccv_array_t* derived_column_data;
32
  ccv_cnnp_column_data_t column_data[1];
33
};
34
35
typedef struct {
36
  int stream_type;
37
  int column_idx_size;
38
  int* column_idxs;
39
  char* name;
40
  ccv_cnnp_column_data_enum_f data_enum;
41
  ccv_cnnp_column_data_deinit_f data_deinit;
42
  void* context;
43
  ccv_cnnp_column_data_context_deinit_f context_deinit;
44
  ccv_cnnp_column_data_map_f map;
45
} ccv_cnnp_derived_column_data_t;
46
47
ccv_cnnp_dataframe_t* ccv_cnnp_dataframe_new(const ccv_cnnp_column_data_t* const column_data, const int column_size, const int row_count)
48
52
{
49
52
  assert(column_size >= 0);
50
52
  ccv_cnnp_dataframe_t* const dataframe = (ccv_cnnp_dataframe_t*)cccalloc(1, sizeof(ccv_cnnp_dataframe_t) + sizeof(ccv_cnnp_column_data_t) * (column_size - 1));
51
52
  dataframe->row_count = row_count;
52
52
  dataframe->column_size = column_size;
53
52
  dataframe->data_ctx = kh_init(ctx);
54
52
  if (column_size > 0)
55
51
  {
56
51
    memcpy(dataframe->column_data, column_data, sizeof(ccv_cnnp_column_data_t) * column_size);
57
51
    int i;
58
6.53k
    for (i = 0; i < column_size; 
i++6.48k
)
59
6.48k
      dataframe->column_data[i].name = ccv_cnnp_column_copy_name(column_data[i].name);
60
51
  }
61
52
  return dataframe;
62
52
}
63
64
void ccv_cnnp_dataframe_shuffle(ccv_cnnp_dataframe_t* const dataframe)
65
11
{
66
11
  assert(dataframe->row_count);
67
11
  int i;
68
11
  if (!dataframe->shuffled_idx)
69
7
  {
70
7
    dataframe->shuffled_idx = (int*)ccmalloc(sizeof(int) * dataframe->row_count);
71
150k
    for (i = 0; i < dataframe->row_count; 
i++150k
)
72
150k
      dataframe->shuffled_idx[i] = i;
73
7
#ifdef HAVE_GSL
74
7
    assert(!dataframe->rng);
75
7
    gsl_rng_env_setup();
76
7
    dataframe->rng = gsl_rng_alloc(gsl_rng_default);
77
7
    gsl_rng_set(dataframe->rng, (unsigned long int)(uintptr_t)dataframe);
78
#else
79
    sfmt_init_gen_rand(&dataframe->sfmt, (uint32_t)(uintptr_t)dataframe);
80
#endif
81
7
  }
82
11
#ifdef HAVE_GSL
83
11
  gsl_ran_shuffle(dataframe->rng, dataframe->shuffled_idx, dataframe->row_count, sizeof(int));
84
#else
85
  sfmt_genrand_shuffle(&dataframe->sfmt, dataframe->shuffled_idx, dataframe->row_count, sizeof(int));
86
#endif
87
11
}
88
89
int ccv_cnnp_dataframe_row_count(ccv_cnnp_dataframe_t* const dataframe)
90
19
{
91
19
  return dataframe->row_count;
92
19
}
93
94
int ccv_cnnp_dataframe_add(ccv_cnnp_dataframe_t* const dataframe, ccv_cnnp_column_data_enum_f data_enum, const int stream_type, ccv_cnnp_column_data_deinit_f data_deinit, void* const context, ccv_cnnp_column_data_context_deinit_f context_deinit, const char* name)
95
19
{
96
19
  if (!dataframe->derived_column_data)
97
1
    dataframe->derived_column_data = ccv_array_new(sizeof(ccv_cnnp_derived_column_data_t), 1, 0);
98
19
  ccv_cnnp_derived_column_data_t column_data = {
99
19
    .stream_type = stream_type,
100
19
    .name = ccv_cnnp_column_copy_name(name),
101
19
    .data_enum = data_enum,
102
19
    .data_deinit = data_deinit,
103
19
    .context = context,
104
19
    .context_deinit = context_deinit,
105
19
  };
106
19
  ccv_array_push(dataframe->derived_column_data, &column_data);
107
19
  return dataframe->column_size + dataframe->derived_column_data->rnum - 1;
108
19
}
109
110
int ccv_cnnp_dataframe_map(ccv_cnnp_dataframe_t* const dataframe, ccv_cnnp_column_data_map_f map, const int stream_type, ccv_cnnp_column_data_deinit_f data_deinit, const int* const column_idxs, const int column_idx_size, void* const context, ccv_cnnp_column_data_context_deinit_f context_deinit, const char* name)
111
166
{
112
166
  assert(column_idx_size > 0);
113
166
  if (!dataframe->derived_column_data)
114
39
    dataframe->derived_column_data = ccv_array_new(sizeof(ccv_cnnp_derived_column_data_t), 1, 0);
115
166
  const int column_size = dataframe->column_size + dataframe->derived_column_data->rnum;
116
166
  int i;
117
366
  for (i = 0; i < column_idx_size; 
i++200
)
118
200
    { assert(column_idxs[i] < column_size); }
119
166
  ccv_cnnp_derived_column_data_t column_data = {
120
166
    .stream_type = stream_type,
121
166
    .name = ccv_cnnp_column_copy_name(name),
122
166
    .column_idx_size = column_idx_size,
123
166
    .column_idxs = (int*)ccmalloc(sizeof(int) * column_idx_size),
124
166
    .map = map,
125
166
    .data_deinit = data_deinit,
126
166
    .context = context,
127
166
    .context_deinit = context_deinit,
128
166
  };
129
166
  memcpy(column_data.column_idxs, column_idxs, sizeof(int) * column_idx_size);
130
166
  ccv_array_push(dataframe->derived_column_data, &column_data);
131
166
  return dataframe->column_size + dataframe->derived_column_data->rnum - 1;
132
166
}
133
134
void* ccv_cnnp_dataframe_column_context(const ccv_cnnp_dataframe_t* const dataframe, const int column_idx)
135
2
{
136
2
  assert(column_idx >= 0);
137
2
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? dataframe->derived_column_data->rnum : 
00
);
138
2
  assert(column_idx < column_size);
139
2
  if (column_idx < dataframe->column_size)
140
0
    return dataframe->column_data[column_idx].context;
141
2
  assert(dataframe->derived_column_data);
142
2
  ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, column_idx - dataframe->column_size);
143
2
  return derived_column_data->context;
144
2
}
145
146
const char* ccv_cnnp_dataframe_column_name(ccv_cnnp_dataframe_t* const dataframe, const int column_idx)
147
4
{
148
4
  assert(column_idx >= 0);
149
4
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum1
:
03
);
150
4
  assert(column_idx < column_size);
151
4
  if (column_idx < dataframe->column_size)
152
3
    return dataframe->column_data[column_idx].name;
153
1
  ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, column_idx - dataframe->column_size);
154
1
  return derived_column_data->name;
155
1
}
156
157
typedef struct {
158
  int flag; // Mark this as cached or not.
159
  int64_t ctx; // The stream context.
160
  void* data;
161
} ccv_cnnp_dataframe_data_item_t;
162
163
typedef struct {
164
  ccv_nnc_stream_context_t* stream_context;
165
} ccv_cnnp_dataframe_column_ctx_t;
166
167
KHASH_MAP_INIT_INT64(iter_ctx, ccv_cnnp_dataframe_column_ctx_t*)
168
169
struct ccv_cnnp_dataframe_iter_s {
170
  int flag; // Whether we called next or not.
171
  int idx;
172
  int prefetch_head;
173
  int prefetch_tail;
174
  int column_size;
175
  int column_idx_size;
176
  int fetched_size; // The size of fetched data.
177
  ccv_cnnp_dataframe_t* dataframe;
178
  void**** derived_data; // This is ridiculous, but it is true.
179
  void** fetched_data; // The cache to store fetched data.
180
  khash_t(iter_ctx)* column_ctx; // Column context specific to a stream context. The key will be a parent stream context and value will be child stream context + signal.
181
  ccv_array_t* prefetches; // The prefetch contents.
182
  int* column_idxs;
183
  ccv_cnnp_dataframe_data_item_t cached_data[1]; // The data cached when deriving data.
184
};
185
186
82.0k
#define INDEX_DATA(iter) ((int*)((iter)->fetched_data))
187
5.05k
#define FETCHED_DATA(iter, idx) ((iter)->fetched_data + ((idx) + 1) * (iter)->fetched_size)
188
189
ccv_cnnp_dataframe_iter_t* ccv_cnnp_dataframe_iter_new(ccv_cnnp_dataframe_t* const dataframe, const int* const column_idxs, const int column_idx_size)
190
57
{
191
57
  assert(column_idx_size > 0);
192
57
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum43
:
014
);
193
57
  int i;
194
182
  for (i = 0; i < column_idx_size; 
i++125
)
195
125
    { assert(column_idxs[i] < column_size); }
196
57
  ccv_cnnp_dataframe_iter_t* const iter = (ccv_cnnp_dataframe_iter_t*)cccalloc(1, sizeof(ccv_cnnp_dataframe_iter_t) + sizeof(ccv_cnnp_dataframe_data_item_t) * column_size + sizeof(void*) * (column_idx_size - 1) + sizeof(int) * column_idx_size);
197
57
  iter->dataframe = dataframe;
198
57
  iter->prefetch_tail = -1;
199
57
  // After created the iterator, we may continue add more derived columns.
200
57
  // Hence, keep the number of existing columns for its cached_data and column_ctx tracking.
201
57
  iter->column_size = column_size;
202
57
  iter->column_idx_size = column_idx_size;
203
57
  iter->column_idxs = (int*)(iter->cached_data + column_size);
204
57
  memcpy(iter->column_idxs, column_idxs, sizeof(int) * column_idx_size);
205
57
  // Preallocate fetched data.
206
57
  iter->fetched_size = 1;
207
57
  iter->fetched_data = (void**)ccmalloc(sizeof(void*) * (column_size + 1));
208
57
  return iter;
209
57
}
210
211
static void _ccv_cnnp_dataframe_data_ctx_columns_free(ccv_cnnp_dataframe_t* const dataframe, ccv_array_t* const columns)
212
58
{
213
58
  int i, j;
214
6.72k
  for (i = 0; i < columns->rnum; 
i++6.66k
)
215
6.66k
  {
216
6.66k
    ccv_array_t* const column = *(ccv_array_t**)ccv_array_get(columns, i);
217
6.66k
    if (!column)
218
6.42k
      continue;
219
239
    void* context;
220
239
    ccv_cnnp_column_data_deinit_f data_deinit;
221
239
    if (i < dataframe->column_size)
222
56
    {
223
56
      data_deinit = dataframe->column_data[i].data_deinit;
224
56
      context = dataframe->column_data[i].context;
225
183
    } else {
226
183
      assert(dataframe->derived_column_data);
227
183
      ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, i - dataframe->column_size);
228
183
      data_deinit = derived_column_data->data_deinit;
229
183
      context = derived_column_data->context;
230
183
    }
231
239
    if (data_deinit)
232
25.1k
      
for (j = 0; 151
j < column->rnum;
j++24.9k
)
233
24.9k
        data_deinit(*(void**)ccv_array_get(column, j), context);
234
239
    ccv_array_free(column);
235
239
  }
236
58
  ccv_array_free(columns);
237
58
}
238
239
static void _ccv_cnnp_dataframe_stream_destructor_hook(const ccv_nnc_stream_context_t* const stream, void* const context)
240
4
{
241
4
  ccv_cnnp_dataframe_t* const dataframe = (ccv_cnnp_dataframe_t*)context;
242
4
  khash_t(ctx)* const data_ctx = dataframe->data_ctx;
243
4
  int64_t ctx = (int64_t)(intptr_t)stream;
244
4
  khiter_t k = kh_get(ctx, data_ctx, ctx);
245
4
  assert(k != kh_end(data_ctx));
246
4
  ccv_array_t* const columns = kh_val(data_ctx, k).columns;
247
4
  _ccv_cnnp_dataframe_data_ctx_columns_free(dataframe, columns);
248
4
  kh_del(ctx, data_ctx, k);
249
4
}
250
251
static void _ccv_cnnp_dataframe_prepare_data_ctx(ccv_cnnp_dataframe_t* const dataframe, ccv_nnc_stream_context_t* const stream_context)
252
181k
{
253
181k
  khash_t(ctx)* const data_ctx = dataframe->data_ctx;
254
181k
  int ret = 0;
255
181k
  int64_t ctx = (int64_t)(intptr_t)stream_context;
256
181k
  khiter_t k = kh_put(ctx, data_ctx, ctx, &ret);
257
181k
  assert(ret >= 0);
258
181k
  if (ret != 0)
259
58
  {
260
58
    const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum46
:
012
);
261
58
    kh_val(data_ctx, k).columns = ccv_array_new(sizeof(ccv_array_t*), column_size, 0);
262
58
    kh_val(data_ctx, k).hook_id = stream_context ? 
ccv_nnc_stream_context_add_destructor_hook(stream_context, _ccv_cnnp_dataframe_stream_destructor_hook, dataframe)16
:
-142
;
263
58
  }
264
181k
}
265
266
static void _ccv_cnnp_dataframe_enqueue_data(ccv_cnnp_dataframe_t* const dataframe, void* const data, const int column_idx, const uint64_t ctx)
267
1.20M
{
268
1.20M
  if (!data)
269
4
    return;
270
1.20M
  khash_t(ctx)* const data_ctx = dataframe->data_ctx;
271
1.20M
  khiter_t k = kh_get(ctx, data_ctx, ctx);
272
1.20M
  if (k == kh_end(data_ctx))
273
1.20M
  {
274
40
    // Free the data directly.
275
40
    void* context;
276
40
    ccv_cnnp_column_data_deinit_f data_deinit;
277
40
    if (column_idx < dataframe->column_size)
278
2
    {
279
2
      data_deinit = dataframe->column_data[column_idx].data_deinit;
280
2
      context = dataframe->column_data[column_idx].context;
281
38
    } else {
282
38
      assert(dataframe->derived_column_data);
283
38
      ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, column_idx - dataframe->column_size);
284
38
      data_deinit = derived_column_data->data_deinit;
285
38
      context = derived_column_data->context;
286
38
    }
287
40
    if (data_deinit)
288
28
      data_deinit(data, context);
289
40
    return;
290
1.20M
  }
291
1.20M
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum1.20M
:
0113
);
292
1.20M
  assert(column_idx < column_size);
293
1.20M
  // If ret == 0, the key already exist, we can get the columns directly, otherwise, create and assign back.
294
1.20M
  ccv_array_t* const columns = kh_val(data_ctx, k).columns;
295
1.20M
  if (columns->rnum < column_size)
296
56
    ccv_array_resize(columns, column_size);
297
1.20M
  ccv_array_t* column = *(ccv_array_t**)ccv_array_get(columns, column_idx);
298
1.20M
  if (!column)
299
239
  {
300
239
    column = ccv_array_new(sizeof(void*), 1, 0);
301
239
    *(ccv_array_t**)ccv_array_get(columns, column_idx) = column;
302
239
  }
303
1.20M
  ccv_array_push(column, &data);
304
1.20M
}
305
306
static void* _ccv_cnnp_dataframe_dequeue_data(ccv_cnnp_dataframe_t* const dataframe, const int column_idx, ccv_nnc_stream_context_t* const stream_context)
307
1.20M
{
308
1.20M
  const uint64_t ctx = (uint64_t)(uintptr_t)stream_context;
309
1.20M
  khash_t(ctx)* const data_ctx = dataframe->data_ctx;
310
1.20M
  khiter_t k = kh_get(ctx, data_ctx, ctx);
311
1.20M
  if (k == kh_end(data_ctx))
312
1.20M
    
return 00
;
313
1.20M
  ccv_array_t* const columns = kh_val(data_ctx, k).columns;
314
1.20M
  if (column_idx >= columns->rnum)
315
44.2k
    return 0;
316
1.16M
  ccv_array_t* const column = *(ccv_array_t**)ccv_array_get(columns, column_idx);
317
1.16M
  if (!column || column->rnum == 0)
318
56
    return 0;
319
1.16M
  void* const data = *(void**)ccv_array_get(column, column->rnum - 1);
320
1.16M
  --column->rnum;
321
1.16M
  return data;
322
1.16M
}
323
324
static ccv_cnnp_dataframe_column_ctx_t _ccv_cnnp_child_column_ctx_for_stream_type(ccv_cnnp_dataframe_t* const dataframe, ccv_cnnp_dataframe_iter_t* const iter, const int column_idx, ccv_nnc_stream_context_t* const stream_context, const int stream_type)
325
4.75k
{
326
4.75k
  ccv_cnnp_dataframe_column_ctx_t child_ctx = {
327
4.75k
    .stream_context = stream_context,
328
4.75k
  };
329
4.75k
  if (stream_context && 
ccv_nnc_stream_context_type(stream_context) != stream_type3.72k
&&
stream_type != 03.13k
)
330
603
  {
331
603
    if (!iter->column_ctx)
332
4
      iter->column_ctx = kh_init(iter_ctx);
333
603
    khash_t(iter_ctx)* const column_ctx = iter->column_ctx;
334
603
    int ret = 0;
335
603
    khiter_t k = kh_put(iter_ctx, column_ctx, (uint64_t)(uintptr_t)stream_context, &ret);
336
603
    assert(ret >= 0);
337
603
    const int column_size = iter->column_size;
338
603
    ccv_cnnp_dataframe_column_ctx_t* const ctx = (ret == 0) ? 
kh_val597
(column_ctx, k) :
cccalloc6
(column_size, sizeof(ccv_cnnp_dataframe_column_ctx_t))6
;
339
603
    if (ret != 0)
340
603
      
kh_val6
(column_ctx, k) = ctx6
;
341
603
    assert(column_idx < column_size);
342
603
    if (!ctx[column_idx].stream_context)
343
39
      ctx[column_idx].stream_context = ccv_nnc_stream_context_new(stream_type);
344
603
    child_ctx = ctx[column_idx];
345
603
  }
346
4.75k
  return child_ctx;
347
4.75k
}
348
349
static void _ccv_cnnp_dataframe_column_data(ccv_cnnp_dataframe_t* const dataframe, ccv_cnnp_dataframe_iter_t* const iter, ccv_cnnp_dataframe_data_item_t* const cached_data, void** const fetched_data, const int* const row_idxs, const int row_size, const int column_idx, const int cached_step, ccv_nnc_stream_context_t* const stream_context)
350
187k
{
351
187k
  int i;
352
187k
  if (cached_data[column_idx * cached_step].flag)
353
182k
  {
354
333k
    for (i = 1; i < row_size; 
i++150k
)
355
150k
      { assert(cached_data[i + column_idx * cached_step].flag); }
356
515k
    
for (i = 0; 182k
i < row_size;
i++333k
)
357
333k
      fetched_data[i] = cached_data[i + column_idx * cached_step].data;
358
182k
    return;
359
4.75k
  } else {
360
1.20M
    for (i = 1; i < row_size; 
i++1.20M
)
361
1.20M
      { assert(!cached_data[i + column_idx * cached_step].flag); }
362
1.20M
    
for (i = 0; 4.75k
i < row_size;
i++1.20M
)
363
1.20M
      fetched_data[i] = _ccv_cnnp_dataframe_dequeue_data(dataframe, column_idx, stream_context);
364
4.75k
  }
365
187k
  
if (4.75k
column_idx >= dataframe->column_size4.75k
)
366
3.83k
  {
367
3.83k
    assert(dataframe->derived_column_data);
368
3.83k
    const int derived_column_idx = column_idx - dataframe->column_size;
369
3.83k
    const ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, derived_column_idx);
370
3.83k
    ccv_cnnp_dataframe_column_ctx_t child_ctx = _ccv_cnnp_child_column_ctx_for_stream_type(dataframe, iter, column_idx, stream_context, derived_column_data->stream_type);
371
3.83k
    const int column_idx_size = derived_column_data->column_idx_size;
372
3.83k
    if (derived_column_data->map)
373
3.11k
    {
374
3.11k
      int i;
375
3.11k
      if (!iter->derived_data)
376
41
        iter->derived_data = (void****)cccalloc(dataframe->derived_column_data->rnum, sizeof(void***));
377
3.11k
      if (!iter->derived_data[derived_column_idx])
378
168
        iter->derived_data[derived_column_idx] = (void***)cccalloc(derived_column_data->column_idx_size, sizeof(void**));
379
3.11k
      void*** const derived_data = iter->derived_data[derived_column_idx];
380
6.57k
      for (i = 0; i < column_idx_size; 
i++3.45k
)
381
3.45k
      {
382
3.45k
        derived_data[i] = FETCHED_DATA(iter, derived_column_data->column_idxs[i]);
383
3.45k
        _ccv_cnnp_dataframe_column_data(dataframe, iter, cached_data, derived_data[i], row_idxs, row_size, derived_column_data->column_idxs[i], cached_step, stream_context);
384
3.45k
      }
385
3.11k
      // Mark it as const.
386
3.11k
      derived_column_data->map((void *const *const *)derived_data, derived_column_data->column_idx_size, row_size, fetched_data, derived_column_data->context, child_ctx.stream_context);
387
3.11k
    } else
388
721
      derived_column_data->data_enum(column_idx, row_idxs, row_size, fetched_data, derived_column_data->context, child_ctx.stream_context);
389
3.83k
    if (child_ctx.stream_context != stream_context)
390
603
    {
391
603
      ccv_nnc_stream_signal_t* const signal = ccv_nnc_stream_context_emit_signal_new(child_ctx.stream_context);
392
603
      ccv_nnc_stream_context_wait_signal(stream_context, signal);
393
603
    }
394
3.83k
  } else {
395
916
    const ccv_cnnp_column_data_t* const column_data = dataframe->column_data + column_idx;
396
916
    ccv_cnnp_dataframe_column_ctx_t child_ctx = _ccv_cnnp_child_column_ctx_for_stream_type(dataframe, iter, column_idx, stream_context, column_data->stream_type);
397
916
    column_data->data_enum(column_idx, row_idxs, row_size, fetched_data, column_data->context, child_ctx.stream_context);
398
916
    if (child_ctx.stream_context != stream_context)
399
0
    {
400
0
      ccv_nnc_stream_signal_t* const signal = ccv_nnc_stream_context_emit_signal_new(child_ctx.stream_context);
401
0
      ccv_nnc_stream_context_wait_signal(stream_context, signal);
402
0
    }
403
916
  }
404
1.20M
  
for (i = 0; 4.75k
i < row_size;
i++1.20M
)
405
1.20M
  {
406
1.20M
    cached_data[i + column_idx * cached_step].flag = 1;
407
1.20M
    cached_data[i + column_idx * cached_step].ctx = (uint64_t)(uintptr_t)stream_context;
408
1.20M
    cached_data[i + column_idx * cached_step].data = fetched_data[i];
409
1.20M
  }
410
4.75k
}
411
412
int ccv_cnnp_dataframe_iter_next(ccv_cnnp_dataframe_iter_t* const iter, void** const data_ref, const int column_idx_size, ccv_nnc_stream_context_t* const stream_context)
413
181k
{
414
181k
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
415
181k
  assert(column_idx_size <= iter->column_idx_size);
416
181k
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum181k
:
0120
);
417
181k
  int i;
418
181k
  // Push existing data back to reusable state (note, these may not be reused immediately because they may be on a different stream context).
419
1.54M
  for (i = 0; i < column_size; 
i++1.36M
)
420
1.36M
    if (iter->cached_data[i].flag)
421
1.20M
    {
422
1.20M
      _ccv_cnnp_dataframe_enqueue_data(dataframe, iter->cached_data[i].data, i, iter->cached_data[i].ctx);
423
1.20M
      iter->cached_data[i].flag = 0;
424
1.20M
      iter->cached_data[i].data = 0;
425
1.20M
      iter->cached_data[i].ctx = 0;
426
1.20M
    }
427
181k
  const int idx = iter->idx;
428
181k
  iter->flag = 1; // Mark it as we called next already.
429
181k
  if (idx > dataframe->row_count) // If we exceed row count, return -2.
430
0
    return -2;
431
181k
  if (idx == dataframe->row_count) // Otherwise, no more row, return -1.
432
26
  {
433
26
    ++iter->idx;
434
26
    return -1;
435
26
  }
436
181k
  if (iter->prefetch_tail != -1) // If there is something in prefetch log.
437
180k
  {
438
180k
    ccv_array_t* const prefetches = iter->prefetches;
439
180k
    assert(prefetches);
440
180k
    const int lines = prefetches->rnum / column_size;
441
180k
    if (iter->prefetch_head == iter->prefetch_tail) // Only one item.
442
679
      iter->prefetch_tail = -1;
443
180k
    ccv_cnnp_dataframe_data_item_t* const cached_data = (ccv_cnnp_dataframe_data_item_t*)ccv_array_get(iter->prefetches, iter->prefetch_head);
444
1.41M
    for (i = 0; i < column_size; 
i++1.23M
)
445
1.23M
    {
446
1.23M
      if (!cached_data[i * lines].flag)
447
32.1k
        continue;
448
1.20M
      if (cached_data[i * lines].ctx == (uint64_t)(uintptr_t)stream_context) // If match existing stream context.
449
1.20M
        iter->cached_data[i] = cached_data[i * lines];
450
0
      else // Recycle
451
0
        _ccv_cnnp_dataframe_enqueue_data(dataframe, cached_data[i * lines].data, i, cached_data[i * lines].ctx);
452
1.20M
    }
453
180k
    ++iter->prefetch_head;
454
180k
    assert(prefetches->rnum % column_size == 0);
455
180k
    if (iter->prefetch_head >= lines)
456
666
      iter->prefetch_head = 0;
457
180k
  }
458
181k
  // Now we are about to create cached_data (above code only uses cached data).
459
181k
  // We are ready to prepare the data_ctx cache.
460
181k
  _ccv_cnnp_dataframe_prepare_data_ctx(dataframe, stream_context);
461
363k
  for (i = 0; i < column_idx_size; 
i++182k
)
462
182k
  {
463
182k
    void* fetched_data[1]; // This guards better than just give away data_ref + i.
464
182k
    _ccv_cnnp_dataframe_column_data(dataframe, iter, iter->cached_data, fetched_data, dataframe->shuffled_idx ? 
dataframe->shuffled_idx + idx100k
:
&idx81.9k
, 1, iter->column_idxs[i], 1, stream_context);
465
182k
    data_ref[i] = fetched_data[0];
466
182k
  }
467
181k
  ++iter->idx;
468
181k
  return 0;
469
181k
}
470
471
void ccv_cnnp_dataframe_iter_peek(ccv_cnnp_dataframe_iter_t* const iter, void** const data_ref, const int offset, const int data_ref_size, ccv_nnc_stream_context_t* const stream_context)
472
19
{
473
19
  assert(iter->flag);
474
19
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
475
19
  assert(offset + data_ref_size <= iter->column_idx_size);
476
19
  const int idx = iter->idx - 1; // next is called, therefore, index is already incremented.
477
19
  assert(idx >= 0);
478
19
  assert(idx < dataframe->row_count);
479
19
  int i;
480
51
  for (i = 0; i < data_ref_size; 
i++32
)
481
32
    _ccv_cnnp_dataframe_column_data(dataframe, iter, iter->cached_data, data_ref + i, dataframe->shuffled_idx ? 
dataframe->shuffled_idx + idx0
: &idx, 1, iter->column_idxs[i + offset], 1, stream_context);
482
19
}
483
484
static void _ccv_cnnp_null_prefetches(ccv_cnnp_dataframe_iter_t* const iter)
485
65
{
486
65
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
487
65
  assert(dataframe);
488
65
  int i, j;
489
65
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum49
:
016
);
490
65
  if (iter->prefetch_head <= iter->prefetch_tail)
491
2
  {
492
2
    assert(iter->prefetches);
493
2
    const int lines = iter->prefetches->rnum / column_size;
494
6
    for (i = iter->prefetch_head; i <= iter->prefetch_tail; 
i++4
)
495
4
    {
496
4
      ccv_cnnp_dataframe_data_item_t* const cached_data = ccv_array_get(iter->prefetches, i);
497
10
      for (j = 0; j < column_size; 
j++6
)
498
6
        if (cached_data[j * lines].flag)
499
6
          _ccv_cnnp_dataframe_enqueue_data(dataframe, cached_data[j * lines].data, j, cached_data[j * lines].ctx);
500
4
    }
501
63
  } else if (iter->prefetch_tail >= 0) { // -1 means no item.
502
1
    assert(iter->prefetches);
503
1
    const int lines = iter->prefetches->rnum / column_size;
504
2
    for (i = iter->prefetch_head; i < lines; 
i++1
)
505
1
    {
506
1
      ccv_cnnp_dataframe_data_item_t* const cached_data = ccv_array_get(iter->prefetches, i);
507
3
      for (j = 0; j < column_size; 
j++2
)
508
2
        if (cached_data[j * lines].flag)
509
2
          _ccv_cnnp_dataframe_enqueue_data(dataframe, cached_data[j * lines].data, j, cached_data[j * lines].ctx);
510
1
    }
511
5
    for (i = 0; i <= iter->prefetch_tail; 
i++4
)
512
4
    {
513
4
      ccv_cnnp_dataframe_data_item_t* const cached_data = ccv_array_get(iter->prefetches, i);
514
12
      for (j = 0; j < column_size; 
j++8
)
515
8
        if (cached_data[j * lines].flag)
516
8
          _ccv_cnnp_dataframe_enqueue_data(dataframe, cached_data[j * lines].data, j, cached_data[j * lines].ctx);
517
4
    }
518
1
  }
519
65
  iter->prefetch_head = 0;
520
65
  iter->prefetch_tail = -1;
521
65
}
522
523
static void _ccv_cnnp_prefetch_cached_data(ccv_cnnp_dataframe_iter_t* const iter, ccv_cnnp_dataframe_data_item_t* const cached_data, const int idx, const int max_to_prefetch, ccv_nnc_stream_context_t* const stream_context)
524
715
{
525
715
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
526
715
  assert(dataframe);
527
715
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum694
:
021
);
528
715
  assert(iter->prefetches);
529
715
  const int lines = iter->prefetches->rnum / column_size;
530
715
  int i, j;
531
715
  // Reset
532
8.05k
  for (i = 0; i < column_size; 
i++7.33k
)
533
1.24M
    
for (j = 0; 7.33k
j < max_to_prefetch;
j++1.23M
)
534
1.23M
    {
535
1.23M
      cached_data[j + i * lines].flag = 0;
536
1.23M
      cached_data[j + i * lines].data = 0;
537
1.23M
      cached_data[j + i * lines].ctx = 0;
538
1.23M
    }
539
715
  if (iter->fetched_size < max_to_prefetch)
540
20
  {
541
20
    iter->fetched_data = ccrealloc(iter->fetched_data, sizeof(void*) * max_to_prefetch * (column_size + 1));
542
20
    iter->fetched_size = max_to_prefetch;
543
20
  }
544
715
  if (dataframe->shuffled_idx)
545
502
    
for (i = 0; 251
i < iter->column_idx_size;
i++251
)
546
251
      _ccv_cnnp_dataframe_column_data(dataframe, iter, cached_data, FETCHED_DATA(iter, iter->column_idxs[i]), dataframe->shuffled_idx + idx, max_to_prefetch, iter->column_idxs[i], lines, stream_context);
547
464
  else {
548
81.1k
    for (i = 0; i < max_to_prefetch; 
i++80.6k
)
549
80.6k
      INDEX_DATA(iter)[i] = idx + i;
550
1.81k
    for (i = 0; i < iter->column_idx_size; 
i++1.34k
)
551
1.34k
      _ccv_cnnp_dataframe_column_data(dataframe, iter, cached_data, FETCHED_DATA(iter, iter->column_idxs[i]), INDEX_DATA(iter), max_to_prefetch, iter->column_idxs[i], lines, stream_context);
552
464
  }
553
715
}
554
555
int ccv_cnnp_dataframe_iter_prefetch(ccv_cnnp_dataframe_iter_t* const iter, const int prefetch_count, ccv_nnc_stream_context_t* const stream_context)
556
721
{
557
721
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
558
721
  assert(dataframe);
559
721
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum699
:
022
);
560
721
  int i, j;
561
721
  assert(iter->idx <= dataframe->row_count);
562
721
  int lines, next, max_to_prefetch;
563
721
  if (iter->prefetch_tail == -1)
564
688
  {
565
688
    if (iter->idx == dataframe->row_count)
566
6
      return -1; // Cannot be done.
567
682
    max_to_prefetch = ccv_min(dataframe->row_count - iter->idx, prefetch_count);
568
682
    if (!iter->prefetches)
569
28
    {
570
28
      iter->prefetches = ccv_array_new(sizeof(ccv_cnnp_dataframe_data_item_t), max_to_prefetch * column_size, 0);
571
28
      ccv_array_resize(iter->prefetches, max_to_prefetch * column_size);
572
28
    }
573
682
    iter->prefetch_tail = iter->prefetch_head = 0; // Advance!
574
682
    next = iter->idx;
575
682
    lines = iter->prefetches->rnum / column_size;
576
682
    // Reset to enough space.
577
682
    if (lines < max_to_prefetch)
578
1
    {
579
1
      ccv_array_resize(iter->prefetches, max_to_prefetch * column_size);
580
1
      lines = max_to_prefetch;
581
1
    }
582
682
  } else {
583
33
    assert(iter->prefetches);
584
33
    ccv_array_t* const prefetches = iter->prefetches;
585
33
    assert(prefetches->rnum % column_size == 0);
586
33
    lines = prefetches->rnum / column_size;
587
33
    const int prefetched = iter->prefetch_tail >= iter->prefetch_head ? 
iter->prefetch_tail - iter->prefetch_head + 122
:
lines - iter->prefetch_head + iter->prefetch_tail + 111
;
588
33
    if (iter->idx + prefetched == dataframe->row_count) // Nothing to prefetch.
589
2
      return -1;
590
31
    max_to_prefetch = ccv_min(dataframe->row_count - (iter->idx + prefetched), prefetch_count);
591
31
    // Not enough space, need to resize.
592
31
    if (prefetched + max_to_prefetch > lines)
593
20
    {
594
20
      const int new_lines = prefetched + max_to_prefetch;
595
20
      ccv_array_resize(prefetches, new_lines * column_size);
596
20
      // These are overlap moves, have to make sure start from the end and move it up to the beginning.
597
20
      if (iter->prefetch_head > iter->prefetch_tail)
598
7
      {
599
7
        const int offset = new_lines - lines;
600
19
        for (i = column_size - 1; i >= 0; 
i--12
)
601
12
        {
602
24
          for (j = lines - 1; j >= iter->prefetch_head; 
j--12
)
603
12
            *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + offset + i * new_lines) = *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * lines);
604
41
          for (j = iter->prefetch_tail; j >= 0; 
j--29
)
605
29
            *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * new_lines) = *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * lines);
606
12
        }
607
7
        iter->prefetch_head += offset;
608
13
      } else {
609
33
        for (i = column_size - 1; i >= 0; 
i--20
)
610
49
          
for (j = iter->prefetch_tail; 20
j >= iter->prefetch_head;
j--29
)
611
29
            *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * new_lines) = *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * lines);
612
13
      }
613
20
      lines = new_lines;
614
20
    }
615
31
    ++iter->prefetch_tail; // Move to the next ready tail.
616
31
    if (iter->prefetch_tail >= lines)
617
4
      iter->prefetch_tail = 0;
618
31
    next = iter->idx + prefetched;
619
31
  }
620
721
  ccv_array_t* const prefetches = iter->prefetches;
621
713
  ccv_cnnp_dataframe_data_item_t* const cached_data = (ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, iter->prefetch_tail);
622
713
  // Now we are about to create cached_data (above code only uses cached data).
623
713
  // We are ready to prepare the data_ctx cache.
624
713
  _ccv_cnnp_dataframe_prepare_data_ctx(dataframe, stream_context);
625
713
  // If the tail is before the head, we must have enough space for the max_to_prefetch
626
713
  if (iter->prefetch_tail < iter->prefetch_head)
627
15
  {
628
15
    assert(iter->prefetch_tail + max_to_prefetch - 1 < iter->prefetch_head);
629
15
    _ccv_cnnp_prefetch_cached_data(iter, cached_data, next, max_to_prefetch, stream_context);
630
15
    iter->prefetch_tail += max_to_prefetch - 1;
631
698
  } else {
632
698
    // First, fetch to the end.
633
698
    const int fetch_to_end = ccv_min(max_to_prefetch, lines - iter->prefetch_tail);
634
698
    _ccv_cnnp_prefetch_cached_data(iter, cached_data, next, fetch_to_end, stream_context);
635
698
    if (fetch_to_end == max_to_prefetch)
636
696
      iter->prefetch_tail += fetch_to_end - 1;
637
2
    else {
638
2
      // Need to fetch more.
639
2
      ccv_cnnp_dataframe_data_item_t* const more_data = (ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, 0);
640
2
      assert(max_to_prefetch > fetch_to_end);
641
2
      _ccv_cnnp_prefetch_cached_data(iter, more_data, next + fetch_to_end, max_to_prefetch - fetch_to_end, stream_context);
642
2
      iter->prefetch_tail = max_to_prefetch - fetch_to_end - 1;
643
2
    }
644
698
  }
645
713
  return 0;
646
713
}
647
648
int ccv_cnnp_dataframe_iter_set_cursor(ccv_cnnp_dataframe_iter_t* const iter, const int idx)
649
11
{
650
11
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
651
11
  assert(dataframe);
652
11
  if (idx >= dataframe->row_count)
653
0
    return -1;
654
11
  if (idx == iter->idx)
655
3
    return 0;
656
8
  iter->idx = idx;
657
8
  iter->flag = 0;
658
8
  _ccv_cnnp_null_prefetches(iter);
659
8
  return 0;
660
8
}
661
662
void ccv_cnnp_dataframe_iter_free(ccv_cnnp_dataframe_iter_t* const iter)
663
57
{
664
57
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
665
57
  const int column_size = iter->column_size;
666
57
  int i;
667
57
  // Push existing data back to reusable state (note, these may not be reused immediately because they may be on a different stream context).
668
6.72k
  for (i = 0; i < column_size; 
i++6.67k
)
669
6.67k
    if (iter->cached_data[i].flag)
670
161
      _ccv_cnnp_dataframe_enqueue_data(dataframe, iter->cached_data[i].data, i, iter->cached_data[i].ctx);
671
57
  // Push prefetches back to reusable state.
672
57
  _ccv_cnnp_null_prefetches(iter);
673
57
  if (iter->prefetches)
674
28
    ccv_array_free(iter->prefetches);
675
57
  if (iter->derived_data)
676
41
  {
677
41
    assert(dataframe->derived_column_data);
678
227
    
for (i = 0; 41
i < dataframe->derived_column_data->rnum;
i++186
)
679
186
      if (iter->derived_data[i])
680
186
        
ccfree168
(iter->derived_data[i])168
;
681
41
    ccfree(iter->derived_data);
682
41
  }
683
57
  ccfree(iter->fetched_data);
684
57
  if (iter->column_ctx)
685
4
  {
686
4
    khash_t(iter_ctx)* const column_ctx = iter->column_ctx;
687
4
    khiter_t k;
688
20
    for (k = 
kh_begin4
(column_ctx); k != kh_end(column_ctx);
++k16
)
689
16
    {
690
16
      if (!kh_exist(column_ctx, k))
691
16
        
continue10
;
692
6
      ccv_cnnp_dataframe_column_ctx_t* const ctx = kh_val(column_ctx, k);
693
82
      for (i = 0; i < column_size; 
i++76
)
694
76
      {
695
76
        if (ctx[i].stream_context)
696
39
          ccv_nnc_stream_context_free(ctx[i].stream_context);
697
76
      }
698
6
    }
699
4
    kh_destroy(iter_ctx, column_ctx);
700
4
  }
701
57
  ccfree(iter);
702
57
}
703
704
void ccv_cnnp_dataframe_free(ccv_cnnp_dataframe_t* const dataframe)
705
52
{
706
52
  int i;
707
52
  khash_t(ctx)* const data_ctx = dataframe->data_ctx;
708
52
  khiter_t k;
709
52
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum40
:
012
);
710
260
  for (k = 
kh_begin52
(data_ctx); k != kh_end(data_ctx);
++k208
)
711
208
  {
712
208
    if (!kh_exist(data_ctx, k))
713
208
      
continue154
;
714
54
    ccv_nnc_stream_context_t* const stream_context = (ccv_nnc_stream_context_t*)(intptr_t)kh_key(data_ctx, k);
715
54
    ccv_array_t* const columns = kh_val(data_ctx, k).columns;
716
54
    if (stream_context)
717
12
    {
718
12
      const int hook_id = kh_val(data_ctx, k).hook_id;
719
12
      ccv_nnc_stream_context_remove_destructor_hook(stream_context, hook_id);
720
12
    }
721
54
    assert(columns->rnum <= column_size);
722
54
    _ccv_cnnp_dataframe_data_ctx_columns_free(dataframe, columns);
723
54
  }
724
52
  kh_destroy(ctx, data_ctx);
725
52
  if (dataframe->derived_column_data)
726
40
  {
727
225
    for (i = 0; i < dataframe->derived_column_data->rnum; 
i++185
)
728
185
    {
729
185
      ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, i);
730
185
      if (derived_column_data->context_deinit)
731
128
        derived_column_data->context_deinit(derived_column_data->context);
732
185
      ccfree(derived_column_data->column_idxs);
733
185
      if (derived_column_data->name)
734
185
        
ccfree1
(derived_column_data->name)1
;
735
185
    }
736
40
    ccv_array_free(dataframe->derived_column_data);
737
40
  }
738
6.53k
  for (i = 0; i < dataframe->column_size; 
i++6.48k
)
739
6.48k
  {
740
6.48k
    if (dataframe->column_data[i].context_deinit)
741
20
      dataframe->column_data[i].context_deinit(dataframe->column_data[i].context);
742
6.48k
    if (dataframe->column_data[i].name)
743
6.48k
      
ccfree3.21k
(dataframe->column_data[i].name)3.21k
;
744
6.48k
  }
745
52
  if (dataframe->shuffled_idx)
746
52
    
ccfree7
(dataframe->shuffled_idx)7
;
747
52
#ifdef HAVE_GSL
748
52
  if (dataframe->rng)
749
7
    gsl_rng_free(dataframe->rng);
750
52
#endif
751
52
  ccfree(dataframe);
752
52
}