Coverage Report

Created: 2021-04-12 01:11

/home/liu/buildslave/linux-x64-runtests/build/lib/nnc/ccv_cnnp_dataframe.c
Line
Count
Source (jump to first uncovered line)
1
#include "ccv_nnc.h"
2
#include "ccv_nnc_easy.h"
3
#include "ccv_nnc_internal.h"
4
#include "ccv_internal.h"
5
#include "_ccv_cnnp_dataframe.h"
6
#include "3rdparty/khash/khash.h"
7
#ifdef HAVE_GSL
8
#include <gsl/gsl_rng.h>
9
#include <gsl/gsl_randist.h>
10
#else
11
#include "3rdparty/sfmt/SFMT.h"
12
#endif
13
14
typedef struct {
15
  ccv_array_t* columns;
16
  int hook_id;
17
} ccv_cnnp_data_ctx_t;
18
19
KHASH_MAP_INIT_INT64(ctx, ccv_cnnp_data_ctx_t)
20
21
struct ccv_cnnp_dataframe_s {
22
  int row_count;
23
  int column_size;
24
  int* shuffled_idx;
25
#ifdef HAVE_GSL
26
  gsl_rng* rng;
27
#else
28
  sfmt_t sfmt;
29
#endif
30
  khash_t(ctx)* data_ctx; // The stream context based cache for data entity of columns. This helps us to avoid allocations when iterate through data.
31
  ccv_array_t* derived_column_data;
32
  ccv_cnnp_column_data_t column_data[1];
33
};
34
35
typedef struct {
36
  int stream_type;
37
  int column_idx_size;
38
  int* column_idxs;
39
  char* name;
40
  ccv_cnnp_column_data_enum_f data_enum;
41
  ccv_cnnp_column_data_deinit_f data_deinit;
42
  void* context;
43
  ccv_cnnp_column_data_context_deinit_f context_deinit;
44
  ccv_cnnp_column_data_map_f map;
45
} ccv_cnnp_derived_column_data_t;
46
47
ccv_cnnp_dataframe_t* ccv_cnnp_dataframe_new(const ccv_cnnp_column_data_t* const column_data, const int column_size, const int row_count)
48
52
{
49
52
  assert(column_size >= 0);
50
52
  ccv_cnnp_dataframe_t* const dataframe = (ccv_cnnp_dataframe_t*)cccalloc(1, sizeof(ccv_cnnp_dataframe_t) + sizeof(ccv_cnnp_column_data_t) * (column_size - 1));
51
52
  dataframe->row_count = row_count;
52
52
  dataframe->column_size = column_size;
53
52
  dataframe->data_ctx = kh_init(ctx);
54
52
  if (column_size > 0)
55
51
  {
56
51
    memcpy(dataframe->column_data, column_data, sizeof(ccv_cnnp_column_data_t) * column_size);
57
51
    int i;
58
6.53k
    for (i = 0; i < column_size; 
i++6.48k
)
59
6.48k
      dataframe->column_data[i].name = ccv_cnnp_column_copy_name(column_data[i].name);
60
51
  }
61
52
  return dataframe;
62
52
}
63
64
void ccv_cnnp_dataframe_shuffle(ccv_cnnp_dataframe_t* const dataframe)
65
11
{
66
11
  assert(dataframe->row_count);
67
11
  int i;
68
11
  if (!dataframe->shuffled_idx)
69
7
  {
70
7
    dataframe->shuffled_idx = (int*)ccmalloc(sizeof(int) * dataframe->row_count);
71
150k
    for (i = 0; i < dataframe->row_count; 
i++150k
)
72
150k
      dataframe->shuffled_idx[i] = i;
73
7
#ifdef HAVE_GSL
74
7
    assert(!dataframe->rng);
75
7
    gsl_rng_env_setup();
76
7
    dataframe->rng = gsl_rng_alloc(gsl_rng_default);
77
7
    gsl_rng_set(dataframe->rng, (unsigned long int)(uintptr_t)dataframe);
78
#else
79
    sfmt_init_gen_rand(&dataframe->sfmt, (uint32_t)(uintptr_t)dataframe);
80
#endif
81
7
  }
82
11
#ifdef HAVE_GSL
83
11
  gsl_ran_shuffle(dataframe->rng, dataframe->shuffled_idx, dataframe->row_count, sizeof(int));
84
#else
85
  sfmt_genrand_shuffle(&dataframe->sfmt, dataframe->shuffled_idx, dataframe->row_count, sizeof(int));
86
#endif
87
11
}
88
89
int ccv_cnnp_dataframe_row_count(ccv_cnnp_dataframe_t* const dataframe)
90
19
{
91
19
  return dataframe->row_count;
92
19
}
93
94
int ccv_cnnp_dataframe_add(ccv_cnnp_dataframe_t* const dataframe, ccv_cnnp_column_data_enum_f data_enum, const int stream_type, ccv_cnnp_column_data_deinit_f data_deinit, void* const context, ccv_cnnp_column_data_context_deinit_f context_deinit, const char* name)
95
19
{
96
19
  if (!dataframe->derived_column_data)
97
1
    dataframe->derived_column_data = ccv_array_new(sizeof(ccv_cnnp_derived_column_data_t), 1, 0);
98
19
  ccv_cnnp_derived_column_data_t column_data = {
99
19
    .stream_type = stream_type,
100
19
    .name = ccv_cnnp_column_copy_name(name),
101
19
    .data_enum = data_enum,
102
19
    .data_deinit = data_deinit,
103
19
    .context = context,
104
19
    .context_deinit = context_deinit,
105
19
  };
106
19
  ccv_array_push(dataframe->derived_column_data, &column_data);
107
19
  return dataframe->column_size + dataframe->derived_column_data->rnum - 1;
108
19
}
109
110
int ccv_cnnp_dataframe_map(ccv_cnnp_dataframe_t* const dataframe, ccv_cnnp_column_data_map_f map, const int stream_type, ccv_cnnp_column_data_deinit_f data_deinit, const int* const column_idxs, const int column_idx_size, void* const context, ccv_cnnp_column_data_context_deinit_f context_deinit, const char* name)
111
166
{
112
166
  assert(column_idx_size > 0);
113
166
  if (!dataframe->derived_column_data)
114
39
    dataframe->derived_column_data = ccv_array_new(sizeof(ccv_cnnp_derived_column_data_t), 1, 0);
115
166
  const int column_size = dataframe->column_size + dataframe->derived_column_data->rnum;
116
166
  int i;
117
366
  for (i = 0; i < column_idx_size; 
i++200
)
118
200
    { assert(column_idxs[i] < column_size); }
119
166
  ccv_cnnp_derived_column_data_t column_data = {
120
166
    .stream_type = stream_type,
121
166
    .name = ccv_cnnp_column_copy_name(name),
122
166
    .column_idx_size = column_idx_size,
123
166
    .column_idxs = (int*)ccmalloc(sizeof(int) * column_idx_size),
124
166
    .map = map,
125
166
    .data_deinit = data_deinit,
126
166
    .context = context,
127
166
    .context_deinit = context_deinit,
128
166
  };
129
166
  memcpy(column_data.column_idxs, column_idxs, sizeof(int) * column_idx_size);
130
166
  ccv_array_push(dataframe->derived_column_data, &column_data);
131
166
  return dataframe->column_size + dataframe->derived_column_data->rnum - 1;
132
166
}
133
134
void* ccv_cnnp_dataframe_column_context(const ccv_cnnp_dataframe_t* const dataframe, const int column_idx)
135
2
{
136
2
  assert(column_idx >= 0);
137
2
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? dataframe->derived_column_data->rnum : 
00
);
138
2
  assert(column_idx < column_size);
139
2
  if (column_idx < dataframe->column_size)
140
0
    return dataframe->column_data[column_idx].context;
141
2
  assert(dataframe->derived_column_data);
142
2
  ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, column_idx - dataframe->column_size);
143
2
  return derived_column_data->context;
144
2
}
145
146
const char* ccv_cnnp_dataframe_column_name(ccv_cnnp_dataframe_t* const dataframe, const int column_idx)
147
4
{
148
4
  assert(column_idx >= 0);
149
4
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum1
:
03
);
150
4
  assert(column_idx < column_size);
151
4
  if (column_idx < dataframe->column_size)
152
3
    return dataframe->column_data[column_idx].name;
153
1
  ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, column_idx - dataframe->column_size);
154
1
  return derived_column_data->name;
155
1
}
156
157
typedef struct {
158
  int flag; // Mark this as cached or not.
159
  int64_t ctx; // The stream context.
160
  void* data;
161
} ccv_cnnp_dataframe_data_item_t;
162
163
typedef struct {
164
  ccv_nnc_stream_context_t* stream_context;
165
  ccv_nnc_stream_signal_t* signal;
166
} ccv_cnnp_dataframe_column_ctx_t;
167
168
KHASH_MAP_INIT_INT64(iter_ctx, ccv_cnnp_dataframe_column_ctx_t*)
169
170
struct ccv_cnnp_dataframe_iter_s {
171
  int flag; // Whether we called next or not.
172
  int idx;
173
  int prefetch_head;
174
  int prefetch_tail;
175
  int column_size;
176
  int column_idx_size;
177
  int fetched_size; // The size of fetched data.
178
  ccv_cnnp_dataframe_t* dataframe;
179
  void**** derived_data; // This is ridiculous, but it is true.
180
  void** fetched_data; // The cache to store fetched data.
181
  khash_t(iter_ctx)* column_ctx; // Column context specific to a stream context. The key will be a parent stream context and value will be child stream context + signal.
182
  ccv_array_t* prefetches; // The prefetch contents.
183
  int* column_idxs;
184
  ccv_cnnp_dataframe_data_item_t cached_data[1]; // The data cached when deriving data.
185
};
186
187
82.0k
#define INDEX_DATA(iter) ((int*)((iter)->fetched_data))
188
5.05k
#define FETCHED_DATA(iter, idx) ((iter)->fetched_data + ((idx) + 1) * (iter)->fetched_size)
189
190
ccv_cnnp_dataframe_iter_t* ccv_cnnp_dataframe_iter_new(ccv_cnnp_dataframe_t* const dataframe, const int* const column_idxs, const int column_idx_size)
191
57
{
192
57
  assert(column_idx_size > 0);
193
57
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum43
:
014
);
194
57
  int i;
195
182
  for (i = 0; i < column_idx_size; 
i++125
)
196
125
    { assert(column_idxs[i] < column_size); }
197
57
  ccv_cnnp_dataframe_iter_t* const iter = (ccv_cnnp_dataframe_iter_t*)cccalloc(1, sizeof(ccv_cnnp_dataframe_iter_t) + sizeof(ccv_cnnp_dataframe_data_item_t) * column_size + sizeof(void*) * (column_idx_size - 1) + sizeof(int) * column_idx_size);
198
57
  iter->dataframe = dataframe;
199
57
  iter->prefetch_tail = -1;
200
57
  // After created the iterator, we may continue add more derived columns.
201
57
  // Hence, keep the number of existing columns for its cached_data and column_ctx tracking.
202
57
  iter->column_size = column_size;
203
57
  iter->column_idx_size = column_idx_size;
204
57
  iter->column_idxs = (int*)(iter->cached_data + column_size);
205
57
  memcpy(iter->column_idxs, column_idxs, sizeof(int) * column_idx_size);
206
57
  // Preallocate fetched data.
207
57
  iter->fetched_size = 1;
208
57
  iter->fetched_data = (void**)ccmalloc(sizeof(void*) * (column_size + 1));
209
57
  return iter;
210
57
}
211
212
static void _ccv_cnnp_dataframe_data_ctx_columns_free(ccv_cnnp_dataframe_t* const dataframe, ccv_array_t* const columns)
213
58
{
214
58
  int i, j;
215
6.72k
  for (i = 0; i < columns->rnum; 
i++6.66k
)
216
6.66k
  {
217
6.66k
    ccv_array_t* const column = *(ccv_array_t**)ccv_array_get(columns, i);
218
6.66k
    if (!column)
219
6.42k
      continue;
220
239
    void* context;
221
239
    ccv_cnnp_column_data_deinit_f data_deinit;
222
239
    if (i < dataframe->column_size)
223
56
    {
224
56
      data_deinit = dataframe->column_data[i].data_deinit;
225
56
      context = dataframe->column_data[i].context;
226
183
    } else {
227
183
      assert(dataframe->derived_column_data);
228
183
      ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, i - dataframe->column_size);
229
183
      data_deinit = derived_column_data->data_deinit;
230
183
      context = derived_column_data->context;
231
183
    }
232
239
    if (data_deinit)
233
25.1k
      
for (j = 0; 151
j < column->rnum;
j++24.9k
)
234
24.9k
        data_deinit(*(void**)ccv_array_get(column, j), context);
235
239
    ccv_array_free(column);
236
239
  }
237
58
  ccv_array_free(columns);
238
58
}
239
240
static void _ccv_cnnp_dataframe_stream_destructor_hook(const ccv_nnc_stream_context_t* const stream, void* const context)
241
4
{
242
4
  ccv_cnnp_dataframe_t* const dataframe = (ccv_cnnp_dataframe_t*)context;
243
4
  khash_t(ctx)* const data_ctx = dataframe->data_ctx;
244
4
  int64_t ctx = (int64_t)(intptr_t)stream;
245
4
  khiter_t k = kh_get(ctx, data_ctx, ctx);
246
4
  assert(k != kh_end(data_ctx));
247
4
  ccv_array_t* const columns = kh_val(data_ctx, k).columns;
248
4
  _ccv_cnnp_dataframe_data_ctx_columns_free(dataframe, columns);
249
4
  kh_del(ctx, data_ctx, k);
250
4
}
251
252
static void _ccv_cnnp_dataframe_prepare_data_ctx(ccv_cnnp_dataframe_t* const dataframe, ccv_nnc_stream_context_t* const stream_context)
253
181k
{
254
181k
  khash_t(ctx)* const data_ctx = dataframe->data_ctx;
255
181k
  int ret = 0;
256
181k
  int64_t ctx = (int64_t)(intptr_t)stream_context;
257
181k
  khiter_t k = kh_put(ctx, data_ctx, ctx, &ret);
258
181k
  assert(ret >= 0);
259
181k
  if (ret != 0)
260
58
  {
261
58
    const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum46
:
012
);
262
58
    kh_val(data_ctx, k).columns = ccv_array_new(sizeof(ccv_array_t*), column_size, 0);
263
58
    kh_val(data_ctx, k).hook_id = stream_context ? 
ccv_nnc_stream_context_add_destructor_hook(stream_context, _ccv_cnnp_dataframe_stream_destructor_hook, dataframe)16
:
-142
;
264
58
  }
265
181k
}
266
267
static void _ccv_cnnp_dataframe_enqueue_data(ccv_cnnp_dataframe_t* const dataframe, void* const data, const int column_idx, const uint64_t ctx)
268
1.20M
{
269
1.20M
  if (!data)
270
4
    return;
271
1.20M
  khash_t(ctx)* const data_ctx = dataframe->data_ctx;
272
1.20M
  khiter_t k = kh_get(ctx, data_ctx, ctx);
273
1.20M
  if (k == kh_end(data_ctx))
274
1.20M
  {
275
40
    // Free the data directly.
276
40
    void* context;
277
40
    ccv_cnnp_column_data_deinit_f data_deinit;
278
40
    if (column_idx < dataframe->column_size)
279
2
    {
280
2
      data_deinit = dataframe->column_data[column_idx].data_deinit;
281
2
      context = dataframe->column_data[column_idx].context;
282
38
    } else {
283
38
      assert(dataframe->derived_column_data);
284
38
      ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, column_idx - dataframe->column_size);
285
38
      data_deinit = derived_column_data->data_deinit;
286
38
      context = derived_column_data->context;
287
38
    }
288
40
    if (data_deinit)
289
28
      data_deinit(data, context);
290
40
    return;
291
1.20M
  }
292
1.20M
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum1.20M
:
0113
);
293
1.20M
  assert(column_idx < column_size);
294
1.20M
  // If ret == 0, the key already exist, we can get the columns directly, otherwise, create and assign back.
295
1.20M
  ccv_array_t* const columns = kh_val(data_ctx, k).columns;
296
1.20M
  if (columns->rnum < column_size)
297
56
    ccv_array_resize(columns, column_size);
298
1.20M
  ccv_array_t* column = *(ccv_array_t**)ccv_array_get(columns, column_idx);
299
1.20M
  if (!column)
300
239
  {
301
239
    column = ccv_array_new(sizeof(void*), 1, 0);
302
239
    *(ccv_array_t**)ccv_array_get(columns, column_idx) = column;
303
239
  }
304
1.20M
  ccv_array_push(column, &data);
305
1.20M
}
306
307
static void* _ccv_cnnp_dataframe_dequeue_data(ccv_cnnp_dataframe_t* const dataframe, const int column_idx, ccv_nnc_stream_context_t* const stream_context)
308
1.20M
{
309
1.20M
  const uint64_t ctx = (uint64_t)(uintptr_t)stream_context;
310
1.20M
  khash_t(ctx)* const data_ctx = dataframe->data_ctx;
311
1.20M
  khiter_t k = kh_get(ctx, data_ctx, ctx);
312
1.20M
  if (k == kh_end(data_ctx))
313
1.20M
    
return 00
;
314
1.20M
  ccv_array_t* const columns = kh_val(data_ctx, k).columns;
315
1.20M
  if (column_idx >= columns->rnum)
316
44.2k
    return 0;
317
1.16M
  ccv_array_t* const column = *(ccv_array_t**)ccv_array_get(columns, column_idx);
318
1.16M
  if (!column || column->rnum == 0)
319
57
    return 0;
320
1.16M
  void* const data = *(void**)ccv_array_get(column, column->rnum - 1);
321
1.16M
  --column->rnum;
322
1.16M
  return data;
323
1.16M
}
324
325
static ccv_cnnp_dataframe_column_ctx_t _ccv_cnnp_child_column_ctx_for_stream_type(ccv_cnnp_dataframe_t* const dataframe, ccv_cnnp_dataframe_iter_t* const iter, const int column_idx, ccv_nnc_stream_context_t* const stream_context, const int stream_type)
326
4.75k
{
327
4.75k
  ccv_cnnp_dataframe_column_ctx_t child_ctx = {
328
4.75k
    .stream_context = stream_context,
329
4.75k
  };
330
4.75k
  if (stream_context && 
ccv_nnc_stream_context_type(stream_context) != stream_type3.72k
&&
stream_type != 03.13k
)
331
603
  {
332
603
    if (!iter->column_ctx)
333
4
      iter->column_ctx = kh_init(iter_ctx);
334
603
    khash_t(iter_ctx)* const column_ctx = iter->column_ctx;
335
603
    int ret = 0;
336
603
    khiter_t k = kh_put(iter_ctx, column_ctx, (uint64_t)(uintptr_t)stream_context, &ret);
337
603
    assert(ret >= 0);
338
603
    const int column_size = iter->column_size;
339
603
    ccv_cnnp_dataframe_column_ctx_t* const ctx = (ret == 0) ? 
kh_val597
(column_ctx, k) :
cccalloc6
(column_size, sizeof(ccv_cnnp_dataframe_column_ctx_t))6
;
340
603
    if (ret != 0)
341
603
      
kh_val6
(column_ctx, k) = ctx6
;
342
603
    assert(column_idx < column_size);
343
603
    if (!ctx[column_idx].stream_context)
344
39
      ctx[column_idx].stream_context = ccv_nnc_stream_context_new(stream_type);
345
603
    if (!ctx[column_idx].signal)
346
39
      ctx[column_idx].signal = ccv_nnc_stream_signal_new(stream_type);
347
603
    child_ctx = ctx[column_idx];
348
603
  }
349
4.75k
  return child_ctx;
350
4.75k
}
351
352
static void _ccv_cnnp_dataframe_column_data(ccv_cnnp_dataframe_t* const dataframe, ccv_cnnp_dataframe_iter_t* const iter, ccv_cnnp_dataframe_data_item_t* const cached_data, void** const fetched_data, const int* const row_idxs, const int row_size, const int column_idx, const int cached_step, ccv_nnc_stream_context_t* const stream_context)
353
187k
{
354
187k
  int i;
355
187k
  if (cached_data[column_idx * cached_step].flag)
356
182k
  {
357
333k
    for (i = 1; i < row_size; 
i++150k
)
358
150k
      { assert(cached_data[i + column_idx * cached_step].flag); }
359
515k
    
for (i = 0; 182k
i < row_size;
i++333k
)
360
333k
      fetched_data[i] = cached_data[i + column_idx * cached_step].data;
361
182k
    return;
362
4.75k
  } else {
363
1.20M
    for (i = 1; i < row_size; 
i++1.20M
)
364
1.20M
      { assert(!cached_data[i + column_idx * cached_step].flag); }
365
1.20M
    
for (i = 0; 4.75k
i < row_size;
i++1.20M
)
366
1.20M
      fetched_data[i] = _ccv_cnnp_dataframe_dequeue_data(dataframe, column_idx, stream_context);
367
4.75k
  }
368
187k
  
if (4.75k
column_idx >= dataframe->column_size4.75k
)
369
3.83k
  {
370
3.83k
    assert(dataframe->derived_column_data);
371
3.83k
    const int derived_column_idx = column_idx - dataframe->column_size;
372
3.83k
    const ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, derived_column_idx);
373
3.83k
    ccv_cnnp_dataframe_column_ctx_t child_ctx = _ccv_cnnp_child_column_ctx_for_stream_type(dataframe, iter, column_idx, stream_context, derived_column_data->stream_type);
374
3.83k
    const int column_idx_size = derived_column_data->column_idx_size;
375
3.83k
    if (derived_column_data->map)
376
3.11k
    {
377
3.11k
      int i;
378
3.11k
      if (!iter->derived_data)
379
41
        iter->derived_data = (void****)cccalloc(dataframe->derived_column_data->rnum, sizeof(void***));
380
3.11k
      if (!iter->derived_data[derived_column_idx])
381
168
        iter->derived_data[derived_column_idx] = (void***)cccalloc(derived_column_data->column_idx_size, sizeof(void**));
382
3.11k
      void*** const derived_data = iter->derived_data[derived_column_idx];
383
6.57k
      for (i = 0; i < column_idx_size; 
i++3.45k
)
384
3.45k
      {
385
3.45k
        derived_data[i] = FETCHED_DATA(iter, derived_column_data->column_idxs[i]);
386
3.45k
        _ccv_cnnp_dataframe_column_data(dataframe, iter, cached_data, derived_data[i], row_idxs, row_size, derived_column_data->column_idxs[i], cached_step, stream_context);
387
3.45k
      }
388
3.11k
      // Mark it as const.
389
3.11k
      derived_column_data->map((void *const *const *)derived_data, derived_column_data->column_idx_size, row_size, fetched_data, derived_column_data->context, child_ctx.stream_context);
390
3.11k
    } else
391
721
      derived_column_data->data_enum(column_idx, row_idxs, row_size, fetched_data, derived_column_data->context, child_ctx.stream_context);
392
3.83k
    if (child_ctx.stream_context != stream_context)
393
603
    {
394
603
      ccv_nnc_stream_context_emit_signal(child_ctx.stream_context, child_ctx.signal);
395
603
      ccv_nnc_stream_context_wait_signal(stream_context, child_ctx.signal);
396
603
    }
397
3.83k
  } else {
398
916
    const ccv_cnnp_column_data_t* const column_data = dataframe->column_data + column_idx;
399
916
    ccv_cnnp_dataframe_column_ctx_t child_ctx = _ccv_cnnp_child_column_ctx_for_stream_type(dataframe, iter, column_idx, stream_context, column_data->stream_type);
400
916
    column_data->data_enum(column_idx, row_idxs, row_size, fetched_data, column_data->context, child_ctx.stream_context);
401
916
    if (child_ctx.stream_context != stream_context)
402
0
    {
403
0
      ccv_nnc_stream_context_emit_signal(child_ctx.stream_context, child_ctx.signal);
404
0
      ccv_nnc_stream_context_wait_signal(stream_context, child_ctx.signal);
405
0
    }
406
916
  }
407
1.20M
  
for (i = 0; 4.75k
i < row_size;
i++1.20M
)
408
1.20M
  {
409
1.20M
    cached_data[i + column_idx * cached_step].flag = 1;
410
1.20M
    cached_data[i + column_idx * cached_step].ctx = (uint64_t)(uintptr_t)stream_context;
411
1.20M
    cached_data[i + column_idx * cached_step].data = fetched_data[i];
412
1.20M
  }
413
4.75k
}
414
415
int ccv_cnnp_dataframe_iter_next(ccv_cnnp_dataframe_iter_t* const iter, void** const data_ref, const int column_idx_size, ccv_nnc_stream_context_t* const stream_context)
416
181k
{
417
181k
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
418
181k
  assert(column_idx_size <= iter->column_idx_size);
419
181k
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum181k
:
0120
);
420
181k
  int i;
421
181k
  // Push existing data back to reusable state (note, these may not be reused immediately because they may be on a different stream context).
422
1.54M
  for (i = 0; i < column_size; 
i++1.36M
)
423
1.36M
    if (iter->cached_data[i].flag)
424
1.20M
    {
425
1.20M
      _ccv_cnnp_dataframe_enqueue_data(dataframe, iter->cached_data[i].data, i, iter->cached_data[i].ctx);
426
1.20M
      iter->cached_data[i].flag = 0;
427
1.20M
      iter->cached_data[i].data = 0;
428
1.20M
      iter->cached_data[i].ctx = 0;
429
1.20M
    }
430
181k
  const int idx = iter->idx;
431
181k
  iter->flag = 1; // Mark it as we called next already.
432
181k
  if (idx > dataframe->row_count) // If we exceed row count, return -2.
433
0
    return -2;
434
181k
  if (idx == dataframe->row_count) // Otherwise, no more row, return -1.
435
26
  {
436
26
    ++iter->idx;
437
26
    return -1;
438
26
  }
439
181k
  if (iter->prefetch_tail != -1) // If there is something in prefetch log.
440
180k
  {
441
180k
    ccv_array_t* const prefetches = iter->prefetches;
442
180k
    assert(prefetches);
443
180k
    const int lines = prefetches->rnum / column_size;
444
180k
    if (iter->prefetch_head == iter->prefetch_tail) // Only one item.
445
679
      iter->prefetch_tail = -1;
446
180k
    ccv_cnnp_dataframe_data_item_t* const cached_data = (ccv_cnnp_dataframe_data_item_t*)ccv_array_get(iter->prefetches, iter->prefetch_head);
447
1.41M
    for (i = 0; i < column_size; 
i++1.23M
)
448
1.23M
    {
449
1.23M
      if (!cached_data[i * lines].flag)
450
32.1k
        continue;
451
1.20M
      if (cached_data[i * lines].ctx == (uint64_t)(uintptr_t)stream_context) // If match existing stream context.
452
1.20M
        iter->cached_data[i] = cached_data[i * lines];
453
0
      else // Recycle
454
0
        _ccv_cnnp_dataframe_enqueue_data(dataframe, cached_data[i * lines].data, i, cached_data[i * lines].ctx);
455
1.20M
    }
456
180k
    ++iter->prefetch_head;
457
180k
    assert(prefetches->rnum % column_size == 0);
458
180k
    if (iter->prefetch_head >= lines)
459
666
      iter->prefetch_head = 0;
460
180k
  }
461
181k
  // Now we are about to create cached_data (above code only uses cached data).
462
181k
  // We are ready to prepare the data_ctx cache.
463
181k
  _ccv_cnnp_dataframe_prepare_data_ctx(dataframe, stream_context);
464
363k
  for (i = 0; i < column_idx_size; 
i++182k
)
465
182k
  {
466
182k
    void* fetched_data[1]; // This guards better than just give away data_ref + i.
467
182k
    _ccv_cnnp_dataframe_column_data(dataframe, iter, iter->cached_data, fetched_data, dataframe->shuffled_idx ? 
dataframe->shuffled_idx + idx100k
:
&idx81.9k
, 1, iter->column_idxs[i], 1, stream_context);
468
182k
    data_ref[i] = fetched_data[0];
469
182k
  }
470
181k
  ++iter->idx;
471
181k
  return 0;
472
181k
}
473
474
void ccv_cnnp_dataframe_iter_peek(ccv_cnnp_dataframe_iter_t* const iter, void** const data_ref, const int offset, const int data_ref_size, ccv_nnc_stream_context_t* const stream_context)
475
19
{
476
19
  assert(iter->flag);
477
19
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
478
19
  assert(offset + data_ref_size <= iter->column_idx_size);
479
19
  const int idx = iter->idx - 1; // next is called, therefore, index is already incremented.
480
19
  assert(idx >= 0);
481
19
  assert(idx < dataframe->row_count);
482
19
  int i;
483
51
  for (i = 0; i < data_ref_size; 
i++32
)
484
32
    _ccv_cnnp_dataframe_column_data(dataframe, iter, iter->cached_data, data_ref + i, dataframe->shuffled_idx ? 
dataframe->shuffled_idx + idx0
: &idx, 1, iter->column_idxs[i + offset], 1, stream_context);
485
19
}
486
487
static void _ccv_cnnp_null_prefetches(ccv_cnnp_dataframe_iter_t* const iter)
488
66
{
489
66
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
490
66
  assert(dataframe);
491
66
  int i, j;
492
66
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum49
:
017
);
493
66
  if (iter->prefetch_head <= iter->prefetch_tail)
494
2
  {
495
2
    assert(iter->prefetches);
496
2
    const int lines = iter->prefetches->rnum / column_size;
497
6
    for (i = iter->prefetch_head; i <= iter->prefetch_tail; 
i++4
)
498
4
    {
499
4
      ccv_cnnp_dataframe_data_item_t* const cached_data = ccv_array_get(iter->prefetches, i);
500
10
      for (j = 0; j < column_size; 
j++6
)
501
6
        if (cached_data[j * lines].flag)
502
6
          _ccv_cnnp_dataframe_enqueue_data(dataframe, cached_data[j * lines].data, j, cached_data[j * lines].ctx);
503
4
    }
504
64
  } else if (iter->prefetch_tail >= 0) { // -1 means no item.
505
1
    assert(iter->prefetches);
506
1
    const int lines = iter->prefetches->rnum / column_size;
507
2
    for (i = iter->prefetch_head; i < lines; 
i++1
)
508
1
    {
509
1
      ccv_cnnp_dataframe_data_item_t* const cached_data = ccv_array_get(iter->prefetches, i);
510
3
      for (j = 0; j < column_size; 
j++2
)
511
2
        if (cached_data[j * lines].flag)
512
2
          _ccv_cnnp_dataframe_enqueue_data(dataframe, cached_data[j * lines].data, j, cached_data[j * lines].ctx);
513
1
    }
514
5
    for (i = 0; i <= iter->prefetch_tail; 
i++4
)
515
4
    {
516
4
      ccv_cnnp_dataframe_data_item_t* const cached_data = ccv_array_get(iter->prefetches, i);
517
12
      for (j = 0; j < column_size; 
j++8
)
518
8
        if (cached_data[j * lines].flag)
519
8
          _ccv_cnnp_dataframe_enqueue_data(dataframe, cached_data[j * lines].data, j, cached_data[j * lines].ctx);
520
4
    }
521
1
  }
522
66
  iter->prefetch_head = 0;
523
66
  iter->prefetch_tail = -1;
524
66
}
525
526
static void _ccv_cnnp_prefetch_cached_data(ccv_cnnp_dataframe_iter_t* const iter, ccv_cnnp_dataframe_data_item_t* const cached_data, const int idx, const int max_to_prefetch, ccv_nnc_stream_context_t* const stream_context)
527
715
{
528
715
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
529
715
  assert(dataframe);
530
715
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum694
:
021
);
531
715
  assert(iter->prefetches);
532
715
  const int lines = iter->prefetches->rnum / column_size;
533
715
  int i, j;
534
715
  // Reset
535
8.05k
  for (i = 0; i < column_size; 
i++7.33k
)
536
1.24M
    
for (j = 0; 7.33k
j < max_to_prefetch;
j++1.23M
)
537
1.23M
    {
538
1.23M
      cached_data[j + i * lines].flag = 0;
539
1.23M
      cached_data[j + i * lines].data = 0;
540
1.23M
      cached_data[j + i * lines].ctx = 0;
541
1.23M
    }
542
715
  if (iter->fetched_size < max_to_prefetch)
543
20
  {
544
20
    iter->fetched_data = ccrealloc(iter->fetched_data, sizeof(void*) * max_to_prefetch * (column_size + 1));
545
20
    iter->fetched_size = max_to_prefetch;
546
20
  }
547
715
  if (dataframe->shuffled_idx)
548
502
    
for (i = 0; 251
i < iter->column_idx_size;
i++251
)
549
251
      _ccv_cnnp_dataframe_column_data(dataframe, iter, cached_data, FETCHED_DATA(iter, iter->column_idxs[i]), dataframe->shuffled_idx + idx, max_to_prefetch, iter->column_idxs[i], lines, stream_context);
550
464
  else {
551
81.1k
    for (i = 0; i < max_to_prefetch; 
i++80.6k
)
552
80.6k
      INDEX_DATA(iter)[i] = idx + i;
553
1.81k
    for (i = 0; i < iter->column_idx_size; 
i++1.34k
)
554
1.34k
      _ccv_cnnp_dataframe_column_data(dataframe, iter, cached_data, FETCHED_DATA(iter, iter->column_idxs[i]), INDEX_DATA(iter), max_to_prefetch, iter->column_idxs[i], lines, stream_context);
555
464
  }
556
715
}
557
558
int ccv_cnnp_dataframe_iter_prefetch(ccv_cnnp_dataframe_iter_t* const iter, const int prefetch_count, ccv_nnc_stream_context_t* const stream_context)
559
721
{
560
721
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
561
721
  assert(dataframe);
562
721
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum699
:
022
);
563
721
  int i, j;
564
721
  assert(iter->idx <= dataframe->row_count);
565
721
  int lines, next, max_to_prefetch;
566
721
  if (iter->prefetch_tail == -1)
567
688
  {
568
688
    if (iter->idx == dataframe->row_count)
569
6
      return -1; // Cannot be done.
570
682
    max_to_prefetch = ccv_min(dataframe->row_count - iter->idx, prefetch_count);
571
682
    if (!iter->prefetches)
572
28
    {
573
28
      iter->prefetches = ccv_array_new(sizeof(ccv_cnnp_dataframe_data_item_t), max_to_prefetch * column_size, 0);
574
28
      ccv_array_resize(iter->prefetches, max_to_prefetch * column_size);
575
28
    }
576
682
    iter->prefetch_tail = iter->prefetch_head = 0; // Advance!
577
682
    next = iter->idx;
578
682
    lines = iter->prefetches->rnum / column_size;
579
682
    // Reset to enough space.
580
682
    if (lines < max_to_prefetch)
581
1
    {
582
1
      ccv_array_resize(iter->prefetches, max_to_prefetch * column_size);
583
1
      lines = max_to_prefetch;
584
1
    }
585
682
  } else {
586
33
    assert(iter->prefetches);
587
33
    ccv_array_t* const prefetches = iter->prefetches;
588
33
    assert(prefetches->rnum % column_size == 0);
589
33
    lines = prefetches->rnum / column_size;
590
33
    const int prefetched = iter->prefetch_tail >= iter->prefetch_head ? 
iter->prefetch_tail - iter->prefetch_head + 122
:
lines - iter->prefetch_head + iter->prefetch_tail + 111
;
591
33
    if (iter->idx + prefetched == dataframe->row_count) // Nothing to prefetch.
592
2
      return -1;
593
31
    max_to_prefetch = ccv_min(dataframe->row_count - (iter->idx + prefetched), prefetch_count);
594
31
    // Not enough space, need to resize.
595
31
    if (prefetched + max_to_prefetch > lines)
596
20
    {
597
20
      const int new_lines = prefetched + max_to_prefetch;
598
20
      ccv_array_resize(prefetches, new_lines * column_size);
599
20
      // These are overlap moves, have to make sure start from the end and move it up to the beginning.
600
20
      if (iter->prefetch_head > iter->prefetch_tail)
601
7
      {
602
7
        const int offset = new_lines - lines;
603
19
        for (i = column_size - 1; i >= 0; 
i--12
)
604
12
        {
605
24
          for (j = lines - 1; j >= iter->prefetch_head; 
j--12
)
606
12
            *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + offset + i * new_lines) = *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * lines);
607
41
          for (j = iter->prefetch_tail; j >= 0; 
j--29
)
608
29
            *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * new_lines) = *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * lines);
609
12
        }
610
7
        iter->prefetch_head += offset;
611
13
      } else {
612
33
        for (i = column_size - 1; i >= 0; 
i--20
)
613
49
          
for (j = iter->prefetch_tail; 20
j >= iter->prefetch_head;
j--29
)
614
29
            *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * new_lines) = *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * lines);
615
13
      }
616
20
      lines = new_lines;
617
20
    }
618
31
    ++iter->prefetch_tail; // Move to the next ready tail.
619
31
    if (iter->prefetch_tail >= lines)
620
4
      iter->prefetch_tail = 0;
621
31
    next = iter->idx + prefetched;
622
31
  }
623
721
  ccv_array_t* const prefetches = iter->prefetches;
624
713
  ccv_cnnp_dataframe_data_item_t* const cached_data = (ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, iter->prefetch_tail);
625
713
  // Now we are about to create cached_data (above code only uses cached data).
626
713
  // We are ready to prepare the data_ctx cache.
627
713
  _ccv_cnnp_dataframe_prepare_data_ctx(dataframe, stream_context);
628
713
  // If the tail is before the head, we must have enough space for the max_to_prefetch
629
713
  if (iter->prefetch_tail < iter->prefetch_head)
630
15
  {
631
15
    assert(iter->prefetch_tail + max_to_prefetch - 1 < iter->prefetch_head);
632
15
    _ccv_cnnp_prefetch_cached_data(iter, cached_data, next, max_to_prefetch, stream_context);
633
15
    iter->prefetch_tail += max_to_prefetch - 1;
634
698
  } else {
635
698
    // First, fetch to the end.
636
698
    const int fetch_to_end = ccv_min(max_to_prefetch, lines - iter->prefetch_tail);
637
698
    _ccv_cnnp_prefetch_cached_data(iter, cached_data, next, fetch_to_end, stream_context);
638
698
    if (fetch_to_end == max_to_prefetch)
639
696
      iter->prefetch_tail += fetch_to_end - 1;
640
2
    else {
641
2
      // Need to fetch more.
642
2
      ccv_cnnp_dataframe_data_item_t* const more_data = (ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, 0);
643
2
      assert(max_to_prefetch > fetch_to_end);
644
2
      _ccv_cnnp_prefetch_cached_data(iter, more_data, next + fetch_to_end, max_to_prefetch - fetch_to_end, stream_context);
645
2
      iter->prefetch_tail = max_to_prefetch - fetch_to_end - 1;
646
2
    }
647
698
  }
648
713
  return 0;
649
713
}
650
651
int ccv_cnnp_dataframe_iter_set_cursor(ccv_cnnp_dataframe_iter_t* const iter, const int idx)
652
12
{
653
12
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
654
12
  assert(dataframe);
655
12
  if (idx >= dataframe->row_count)
656
0
    return -1;
657
12
  if (idx == iter->idx)
658
3
    return 0;
659
9
  iter->idx = idx;
660
9
  iter->flag = 0;
661
9
  _ccv_cnnp_null_prefetches(iter);
662
9
  return 0;
663
9
}
664
665
void ccv_cnnp_dataframe_iter_free(ccv_cnnp_dataframe_iter_t* const iter)
666
57
{
667
57
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
668
57
  const int column_size = iter->column_size;
669
57
  int i;
670
57
  // Push existing data back to reusable state (note, these may not be reused immediately because they may be on a different stream context).
671
6.72k
  for (i = 0; i < column_size; 
i++6.67k
)
672
6.67k
    if (iter->cached_data[i].flag)
673
160
      _ccv_cnnp_dataframe_enqueue_data(dataframe, iter->cached_data[i].data, i, iter->cached_data[i].ctx);
674
57
  // Push prefetches back to reusable state.
675
57
  _ccv_cnnp_null_prefetches(iter);
676
57
  if (iter->prefetches)
677
28
    ccv_array_free(iter->prefetches);
678
57
  if (iter->derived_data)
679
41
  {
680
41
    assert(dataframe->derived_column_data);
681
227
    
for (i = 0; 41
i < dataframe->derived_column_data->rnum;
i++186
)
682
186
      if (iter->derived_data[i])
683
186
        
ccfree168
(iter->derived_data[i])168
;
684
41
    ccfree(iter->derived_data);
685
41
  }
686
57
  ccfree(iter->fetched_data);
687
57
  if (iter->column_ctx)
688
4
  {
689
4
    khash_t(iter_ctx)* const column_ctx = iter->column_ctx;
690
4
    khiter_t k;
691
20
    for (k = 
kh_begin4
(column_ctx); k != kh_end(column_ctx);
++k16
)
692
16
    {
693
16
      if (!kh_exist(column_ctx, k))
694
16
        
continue10
;
695
6
      ccv_cnnp_dataframe_column_ctx_t* const ctx = kh_val(column_ctx, k);
696
82
      for (i = 0; i < column_size; 
i++76
)
697
76
      {
698
76
        if (ctx[i].stream_context)
699
39
          ccv_nnc_stream_context_free(ctx[i].stream_context);
700
76
        if (ctx[i].signal)
701
39
          ccv_nnc_stream_signal_free(ctx[i].signal);
702
76
      }
703
6
    }
704
4
    kh_destroy(iter_ctx, column_ctx);
705
4
  }
706
57
  ccfree(iter);
707
57
}
708
709
void ccv_cnnp_dataframe_free(ccv_cnnp_dataframe_t* const dataframe)
710
52
{
711
52
  int i;
712
52
  khash_t(ctx)* const data_ctx = dataframe->data_ctx;
713
52
  khiter_t k;
714
52
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum40
:
012
);
715
260
  for (k = 
kh_begin52
(data_ctx); k != kh_end(data_ctx);
++k208
)
716
208
  {
717
208
    if (!kh_exist(data_ctx, k))
718
208
      
continue154
;
719
54
    ccv_nnc_stream_context_t* const stream_context = (ccv_nnc_stream_context_t*)(intptr_t)kh_key(data_ctx, k);
720
54
    ccv_array_t* const columns = kh_val(data_ctx, k).columns;
721
54
    if (stream_context)
722
12
    {
723
12
      const int hook_id = kh_val(data_ctx, k).hook_id;
724
12
      ccv_nnc_stream_context_remove_destructor_hook(stream_context, hook_id);
725
12
    }
726
54
    assert(columns->rnum <= column_size);
727
54
    _ccv_cnnp_dataframe_data_ctx_columns_free(dataframe, columns);
728
54
  }
729
52
  kh_destroy(ctx, data_ctx);
730
52
  if (dataframe->derived_column_data)
731
40
  {
732
225
    for (i = 0; i < dataframe->derived_column_data->rnum; 
i++185
)
733
185
    {
734
185
      ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, i);
735
185
      if (derived_column_data->context_deinit)
736
128
        derived_column_data->context_deinit(derived_column_data->context);
737
185
      ccfree(derived_column_data->column_idxs);
738
185
      if (derived_column_data->name)
739
185
        
ccfree1
(derived_column_data->name)1
;
740
185
    }
741
40
    ccv_array_free(dataframe->derived_column_data);
742
40
  }
743
6.53k
  for (i = 0; i < dataframe->column_size; 
i++6.48k
)
744
6.48k
  {
745
6.48k
    if (dataframe->column_data[i].context_deinit)
746
20
      dataframe->column_data[i].context_deinit(dataframe->column_data[i].context);
747
6.48k
    if (dataframe->column_data[i].name)
748
6.48k
      
ccfree3.21k
(dataframe->column_data[i].name)3.21k
;
749
6.48k
  }
750
52
  if (dataframe->shuffled_idx)
751
52
    
ccfree7
(dataframe->shuffled_idx)7
;
752
52
#ifdef HAVE_GSL
753
52
  if (dataframe->rng)
754
7
    gsl_rng_free(dataframe->rng);
755
52
#endif
756
52
  ccfree(dataframe);
757
52
}