Coverage Report

Created: 2022-07-27 23:53

/home/liu/buildslave/linux-x64-runtests/build/lib/nnc/ccv_cnnp_dataframe.c
Line
Count
Source (jump to first uncovered line)
1
#include "ccv_nnc.h"
2
#include "ccv_nnc_easy.h"
3
#include "ccv_nnc_internal.h"
4
#include "ccv_internal.h"
5
#include "_ccv_cnnp_dataframe.h"
6
#include "3rdparty/khash/khash.h"
7
#ifdef HAVE_GSL
8
#include <gsl/gsl_rng.h>
9
#include <gsl/gsl_randist.h>
10
#else
11
#include "3rdparty/sfmt/SFMT.h"
12
#endif
13
14
typedef struct {
15
  ccv_array_t* columns;
16
  int hook_id;
17
} ccv_cnnp_data_ctx_t;
18
19
KHASH_MAP_INIT_INT64(ctx, ccv_cnnp_data_ctx_t)
20
21
struct ccv_cnnp_dataframe_s {
22
  int row_count;
23
  int column_size;
24
  int* shuffled_idx;
25
#ifdef HAVE_GSL
26
  gsl_rng* rng;
27
#else
28
  sfmt_t sfmt;
29
#endif
30
  khash_t(ctx)* data_ctx; // The stream context based cache for data entity of columns. This helps us to avoid allocations when iterate through data.
31
  ccv_array_t* derived_column_data;
32
  ccv_cnnp_column_data_t column_data[1];
33
};
34
35
typedef struct {
36
  int stream_type;
37
  int column_idx_size;
38
  int* column_idxs;
39
  char* name;
40
  ccv_cnnp_column_data_enum_f data_enum;
41
  ccv_cnnp_column_data_deinit_f data_deinit;
42
  void* context;
43
  ccv_cnnp_column_data_context_deinit_f context_deinit;
44
  ccv_cnnp_column_data_map_f map;
45
} ccv_cnnp_derived_column_data_t;
46
47
ccv_cnnp_dataframe_t* ccv_cnnp_dataframe_new(const ccv_cnnp_column_data_t* const column_data, const int column_size, const int row_count)
48
56
{
49
56
  assert(column_size >= 0);
50
56
  ccv_cnnp_dataframe_t* const dataframe = (ccv_cnnp_dataframe_t*)cccalloc(1, sizeof(ccv_cnnp_dataframe_t) + sizeof(ccv_cnnp_column_data_t) * (column_size - 1));
51
56
  dataframe->row_count = row_count;
52
56
  dataframe->column_size = column_size;
53
56
  dataframe->data_ctx = kh_init(ctx);
54
56
  if (column_size > 0)
55
55
  {
56
55
    memcpy(dataframe->column_data, column_data, sizeof(ccv_cnnp_column_data_t) * column_size);
57
55
    int i;
58
6.53k
    for (i = 0; i < column_size; 
i++6.48k
)
59
6.48k
      dataframe->column_data[i].name = ccv_cnnp_column_copy_name(column_data[i].name);
60
55
  }
61
56
  return dataframe;
62
56
}
63
64
void ccv_cnnp_dataframe_shuffle(ccv_cnnp_dataframe_t* const dataframe)
65
14
{
66
14
  assert(dataframe->row_count);
67
14
  int i;
68
14
  if (!dataframe->shuffled_idx)
69
9
  {
70
9
    dataframe->shuffled_idx = (int*)ccmalloc(sizeof(int) * dataframe->row_count);
71
150k
    for (i = 0; i < dataframe->row_count; 
i++150k
)
72
150k
      dataframe->shuffled_idx[i] = i;
73
9
#ifdef HAVE_GSL
74
9
    assert(!dataframe->rng);
75
9
    gsl_rng_env_setup();
76
9
    dataframe->rng = gsl_rng_alloc(gsl_rng_default);
77
9
    gsl_rng_set(dataframe->rng, (unsigned long int)ccv_nnc_stream_context_genrand_uint32(0));
78
#else
79
    sfmt_init_gen_rand(&dataframe->sfmt, ccv_nnc_stream_context_genrand_uint32(0));
80
#endif
81
9
  }
82
14
#ifdef HAVE_GSL
83
14
  gsl_ran_shuffle(dataframe->rng, dataframe->shuffled_idx, dataframe->row_count, sizeof(int));
84
#else
85
  sfmt_genrand_shuffle(&dataframe->sfmt, dataframe->shuffled_idx, dataframe->row_count, sizeof(int));
86
#endif
87
14
}
88
89
int ccv_cnnp_dataframe_row_count(ccv_cnnp_dataframe_t* const dataframe)
90
23
{
91
23
  return dataframe->row_count;
92
23
}
93
94
int ccv_cnnp_dataframe_add(ccv_cnnp_dataframe_t* const dataframe, ccv_cnnp_column_data_enum_f data_enum, const int stream_type, ccv_cnnp_column_data_deinit_f data_deinit, void* const context, ccv_cnnp_column_data_context_deinit_f context_deinit, const char* name)
95
19
{
96
19
  if (!dataframe->derived_column_data)
97
1
    dataframe->derived_column_data = ccv_array_new(sizeof(ccv_cnnp_derived_column_data_t), 1, 0);
98
19
  ccv_cnnp_derived_column_data_t column_data = {
99
19
    .stream_type = stream_type,
100
19
    .name = ccv_cnnp_column_copy_name(name),
101
19
    .data_enum = data_enum,
102
19
    .data_deinit = data_deinit,
103
19
    .context = context,
104
19
    .context_deinit = context_deinit,
105
19
  };
106
19
  ccv_array_push(dataframe->derived_column_data, &column_data);
107
19
  return dataframe->column_size + dataframe->derived_column_data->rnum - 1;
108
19
}
109
110
int ccv_cnnp_dataframe_map(ccv_cnnp_dataframe_t* const dataframe, ccv_cnnp_column_data_map_f map, const int stream_type, ccv_cnnp_column_data_deinit_f data_deinit, const int* const column_idxs, const int column_idx_size, void* const context, ccv_cnnp_column_data_context_deinit_f context_deinit, const char* name)
111
192
{
112
192
  assert(column_idx_size > 0);
113
192
  if (!dataframe->derived_column_data)
114
43
    dataframe->derived_column_data = ccv_array_new(sizeof(ccv_cnnp_derived_column_data_t), 1, 0);
115
192
  const int column_size = dataframe->column_size + dataframe->derived_column_data->rnum;
116
192
  int i;
117
424
  for (i = 0; i < column_idx_size; 
i++232
)
118
232
    { assert(column_idxs[i] < column_size); }
119
192
  ccv_cnnp_derived_column_data_t column_data = {
120
192
    .stream_type = stream_type,
121
192
    .name = ccv_cnnp_column_copy_name(name),
122
192
    .column_idx_size = column_idx_size,
123
192
    .column_idxs = (int*)ccmalloc(sizeof(int) * column_idx_size),
124
192
    .map = map,
125
192
    .data_deinit = data_deinit,
126
192
    .context = context,
127
192
    .context_deinit = context_deinit,
128
192
  };
129
192
  memcpy(column_data.column_idxs, column_idxs, sizeof(int) * column_idx_size);
130
192
  ccv_array_push(dataframe->derived_column_data, &column_data);
131
192
  return dataframe->column_size + dataframe->derived_column_data->rnum - 1;
132
192
}
133
134
void* ccv_cnnp_dataframe_column_context(const ccv_cnnp_dataframe_t* const dataframe, const int column_idx)
135
2
{
136
2
  assert(column_idx >= 0);
137
2
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? dataframe->derived_column_data->rnum : 
00
);
138
2
  assert(column_idx < column_size);
139
2
  if (column_idx < dataframe->column_size)
140
0
    return dataframe->column_data[column_idx].context;
141
2
  assert(dataframe->derived_column_data);
142
2
  ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, column_idx - dataframe->column_size);
143
2
  return derived_column_data->context;
144
2
}
145
146
const char* ccv_cnnp_dataframe_column_name(ccv_cnnp_dataframe_t* const dataframe, const int column_idx)
147
4
{
148
4
  assert(column_idx >= 0);
149
4
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum1
:
03
);
150
4
  assert(column_idx < column_size);
151
4
  if (column_idx < dataframe->column_size)
152
3
    return dataframe->column_data[column_idx].name;
153
1
  ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, column_idx - dataframe->column_size);
154
1
  return derived_column_data->name;
155
4
}
156
157
typedef struct {
158
  int flag; // Mark this as cached or not.
159
  int64_t ctx; // The stream context.
160
  void* data;
161
} ccv_cnnp_dataframe_data_item_t;
162
163
typedef struct {
164
  ccv_nnc_stream_context_t* stream_context;
165
} ccv_cnnp_dataframe_column_ctx_t;
166
167
KHASH_MAP_INIT_INT64(iter_ctx, ccv_cnnp_dataframe_column_ctx_t*)
168
169
struct ccv_cnnp_dataframe_iter_s {
170
  int flag; // Whether we called next or not.
171
  int idx;
172
  int prefetch_head;
173
  int prefetch_tail;
174
  int column_size;
175
  int column_idx_size;
176
  int fetched_size; // The size of fetched data.
177
  ccv_cnnp_dataframe_t* dataframe;
178
  void**** derived_data; // This is ridiculous, but it is true.
179
  void** fetched_data; // The cache to store fetched data.
180
  khash_t(iter_ctx)* column_ctx; // Column context specific to a stream context. The key will be a parent stream context and value will be child stream context + signal.
181
  ccv_array_t* prefetches; // The prefetch contents.
182
  int* column_idxs;
183
  ccv_cnnp_dataframe_data_item_t cached_data[1]; // The data cached when deriving data.
184
};
185
186
82.0k
#define INDEX_DATA(iter) ((int*)((iter)->fetched_data))
187
5.12k
#define FETCHED_DATA(iter, idx) ((iter)->fetched_data + ((idx) + 1) * (iter)->fetched_size)
188
189
ccv_cnnp_dataframe_iter_t* ccv_cnnp_dataframe_iter_new(ccv_cnnp_dataframe_t* const dataframe, const int* const column_idxs, const int column_idx_size)
190
61
{
191
61
  assert(column_idx_size > 0);
192
61
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum47
:
014
);
193
61
  int i;
194
196
  for (i = 0; i < column_idx_size; 
i++135
)
195
135
    { assert(column_idxs[i] < column_size); }
196
61
  ccv_cnnp_dataframe_iter_t* const iter = (ccv_cnnp_dataframe_iter_t*)cccalloc(1, sizeof(ccv_cnnp_dataframe_iter_t) + sizeof(ccv_cnnp_dataframe_data_item_t) * column_size + sizeof(void*) * (column_idx_size - 1) + sizeof(int) * column_idx_size);
197
61
  iter->dataframe = dataframe;
198
61
  iter->prefetch_tail = -1;
199
  // After created the iterator, we may continue add more derived columns.
200
  // Hence, keep the number of existing columns for its cached_data and column_ctx tracking.
201
61
  iter->column_size = column_size;
202
61
  iter->column_idx_size = column_idx_size;
203
61
  iter->column_idxs = (int*)(iter->cached_data + column_size);
204
61
  memcpy(iter->column_idxs, column_idxs, sizeof(int) * column_idx_size);
205
  // Preallocate fetched data.
206
61
  iter->fetched_size = 1;
207
61
  iter->fetched_data = (void**)ccmalloc(sizeof(void*) * (column_size + 1));
208
61
  return iter;
209
61
}
210
211
static void _ccv_cnnp_dataframe_data_ctx_columns_free(ccv_cnnp_dataframe_t* const dataframe, ccv_array_t* const columns)
212
62
{
213
62
  int i, j;
214
6.75k
  for (i = 0; i < columns->rnum; 
i++6.69k
)
215
6.69k
  {
216
6.69k
    ccv_array_t* const column = *(ccv_array_t**)ccv_array_get(columns, i);
217
6.69k
    if (!column)
218
6.42k
      continue;
219
269
    void* context;
220
269
    ccv_cnnp_column_data_deinit_f data_deinit;
221
269
    if (i < dataframe->column_size)
222
60
    {
223
60
      data_deinit = dataframe->column_data[i].data_deinit;
224
60
      context = dataframe->column_data[i].context;
225
209
    } else {
226
209
      assert(dataframe->derived_column_data);
227
209
      ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, i - dataframe->column_size);
228
209
      data_deinit = derived_column_data->data_deinit;
229
209
      context = derived_column_data->context;
230
209
    }
231
269
    if (data_deinit)
232
25.4k
      
for (j = 0; 171
j < column->rnum;
j++25.2k
)
233
25.2k
        data_deinit(*(void**)ccv_array_get(column, j), context);
234
269
    ccv_array_free(column);
235
269
  }
236
62
  ccv_array_free(columns);
237
62
}
238
239
static void _ccv_cnnp_dataframe_stream_destructor_hook(const ccv_nnc_stream_context_t* const stream, void* const context)
240
4
{
241
4
  ccv_cnnp_dataframe_t* const dataframe = (ccv_cnnp_dataframe_t*)context;
242
4
  khash_t(ctx)* const data_ctx = dataframe->data_ctx;
243
4
  int64_t ctx = (int64_t)(intptr_t)stream;
244
4
  khiter_t k = kh_get(ctx, data_ctx, ctx);
245
4
  assert(k != kh_end(data_ctx));
246
4
  ccv_array_t* const columns = kh_val(data_ctx, k).columns;
247
4
  _ccv_cnnp_dataframe_data_ctx_columns_free(dataframe, columns);
248
4
  kh_del(ctx, data_ctx, k);
249
4
}
250
251
static void _ccv_cnnp_dataframe_prepare_data_ctx(ccv_cnnp_dataframe_t* const dataframe, ccv_nnc_stream_context_t* const stream_context)
252
182k
{
253
182k
  khash_t(ctx)* const data_ctx = dataframe->data_ctx;
254
182k
  int ret = 0;
255
182k
  int64_t ctx = (int64_t)(intptr_t)stream_context;
256
182k
  khiter_t k = kh_put(ctx, data_ctx, ctx, &ret);
257
182k
  assert(ret >= 0);
258
182k
  if (ret != 0)
259
62
  {
260
62
    const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum50
:
012
);
261
62
    kh_val(data_ctx, k).columns = ccv_array_new(sizeof(ccv_array_t*), column_size, 0);
262
62
    kh_val(data_ctx, k).hook_id = stream_context ? 
ccv_nnc_stream_context_add_destructor_hook(stream_context, _ccv_cnnp_dataframe_stream_destructor_hook, dataframe)16
:
-146
;
263
62
  }
264
182k
}
265
266
static void _ccv_cnnp_dataframe_enqueue_data(ccv_cnnp_dataframe_t* const dataframe, void* const data, const int column_idx, const uint64_t ctx)
267
1.20M
{
268
1.20M
  if (!data)
269
4
    return;
270
1.20M
  khash_t(ctx)* const data_ctx = dataframe->data_ctx;
271
1.20M
  khiter_t k = kh_get(ctx, data_ctx, ctx);
272
1.20M
  if (k == kh_end(data_ctx))
273
40
  {
274
    // Free the data directly.
275
40
    void* context;
276
40
    ccv_cnnp_column_data_deinit_f data_deinit;
277
40
    if (column_idx < dataframe->column_size)
278
2
    {
279
2
      data_deinit = dataframe->column_data[column_idx].data_deinit;
280
2
      context = dataframe->column_data[column_idx].context;
281
38
    } else {
282
38
      assert(dataframe->derived_column_data);
283
38
      ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, column_idx - dataframe->column_size);
284
38
      data_deinit = derived_column_data->data_deinit;
285
38
      context = derived_column_data->context;
286
38
    }
287
40
    if (data_deinit)
288
28
      data_deinit(data, context);
289
40
    return;
290
40
  }
291
1.20M
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum1.20M
:
0113
);
292
1.20M
  assert(column_idx < column_size);
293
  // If ret == 0, the key already exist, we can get the columns directly, otherwise, create and assign back.
294
1.20M
  ccv_array_t* const columns = kh_val(data_ctx, k).columns;
295
1.20M
  if (columns->rnum < column_size)
296
60
    ccv_array_resize(columns, column_size);
297
1.20M
  ccv_array_t* column = *(ccv_array_t**)ccv_array_get(columns, column_idx);
298
1.20M
  if (!column)
299
269
  {
300
269
    column = ccv_array_new(sizeof(void*), 1, 0);
301
269
    *(ccv_array_t**)ccv_array_get(columns, column_idx) = column;
302
269
  }
303
1.20M
  ccv_array_push(column, &data);
304
1.20M
}
305
306
static void* _ccv_cnnp_dataframe_dequeue_data(ccv_cnnp_dataframe_t* const dataframe, const int column_idx, ccv_nnc_stream_context_t* const stream_context)
307
1.20M
{
308
1.20M
  const uint64_t ctx = (uint64_t)(uintptr_t)stream_context;
309
1.20M
  khash_t(ctx)* const data_ctx = dataframe->data_ctx;
310
1.20M
  khiter_t k = kh_get(ctx, data_ctx, ctx);
311
1.20M
  if (k == kh_end(data_ctx))
312
0
    return 0;
313
1.20M
  ccv_array_t* const columns = kh_val(data_ctx, k).columns;
314
1.20M
  if (column_idx >= columns->rnum)
315
44.9k
    return 0;
316
1.16M
  ccv_array_t* const column = *(ccv_array_t**)ccv_array_get(columns, column_idx);
317
1.16M
  if (!column || column->rnum == 0)
318
67
    return 0;
319
1.16M
  void* const data = *(void**)ccv_array_get(column, column->rnum - 1);
320
1.16M
  --column->rnum;
321
1.16M
  return data;
322
1.16M
}
323
324
static ccv_cnnp_dataframe_column_ctx_t _ccv_cnnp_child_column_ctx_for_stream_type(ccv_cnnp_dataframe_t* const dataframe, ccv_cnnp_dataframe_iter_t* const iter, const int column_idx, ccv_nnc_stream_context_t* const stream_context, const int stream_type)
325
4.81k
{
326
4.81k
  ccv_cnnp_dataframe_column_ctx_t child_ctx = {
327
4.81k
    .stream_context = stream_context,
328
4.81k
  };
329
4.81k
  if (stream_context && 
ccv_nnc_stream_context_type(stream_context) != stream_type3.72k
&&
stream_type != 03.13k
)
330
603
  {
331
603
    if (!iter->column_ctx)
332
4
      iter->column_ctx = kh_init(iter_ctx);
333
603
    khash_t(iter_ctx)* const column_ctx = iter->column_ctx;
334
603
    int ret = 0;
335
603
    khiter_t k = kh_put(iter_ctx, column_ctx, (uint64_t)(uintptr_t)stream_context, &ret);
336
603
    assert(ret >= 0);
337
603
    const int column_size = iter->column_size;
338
603
    ccv_cnnp_dataframe_column_ctx_t* const ctx = (ret == 0) ? 
kh_val597
(column_ctx, k) :
cccalloc6
(column_size, sizeof(ccv_cnnp_dataframe_column_ctx_t))6
;
339
603
    if (ret != 0)
340
6
      kh_val(column_ctx, k) = ctx;
341
603
    assert(column_idx < column_size);
342
603
    if (!ctx[column_idx].stream_context)
343
39
      ctx[column_idx].stream_context = ccv_nnc_stream_context_new(stream_type);
344
603
    child_ctx = ctx[column_idx];
345
603
  }
346
4.81k
  return child_ctx;
347
4.81k
}
348
349
static void _ccv_cnnp_dataframe_column_data(ccv_cnnp_dataframe_t* const dataframe, ccv_cnnp_dataframe_iter_t* const iter, ccv_cnnp_dataframe_data_item_t* const cached_data, void** const fetched_data, const int* const row_idxs, const int row_size, const int column_idx, const int cached_step, ccv_nnc_stream_context_t* const stream_context)
350
187k
{
351
187k
  int i;
352
187k
  if (cached_data[column_idx * cached_step].flag)
353
182k
  {
354
334k
    for (i = 1; i < row_size; 
i++151k
)
355
151k
      { assert(cached_data[i + column_idx * cached_step].flag); }
356
516k
    
for (i = 0; 182k
i < row_size;
i++334k
)
357
334k
      fetched_data[i] = cached_data[i + column_idx * cached_step].data;
358
182k
    return;
359
182k
  } else {
360
1.20M
    for (i = 1; i < row_size; 
i++1.20M
)
361
1.20M
      { assert(!cached_data[i + column_idx * cached_step].flag); }
362
1.21M
    
for (i = 0; 4.81k
i < row_size;
i++1.20M
)
363
1.20M
      fetched_data[i] = _ccv_cnnp_dataframe_dequeue_data(dataframe, column_idx, stream_context);
364
4.81k
  }
365
4.81k
  if (column_idx >= dataframe->column_size)
366
3.88k
  {
367
3.88k
    assert(dataframe->derived_column_data);
368
3.88k
    const int derived_column_idx = column_idx - dataframe->column_size;
369
3.88k
    const ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, derived_column_idx);
370
3.88k
    ccv_cnnp_dataframe_column_ctx_t child_ctx = _ccv_cnnp_child_column_ctx_for_stream_type(dataframe, iter, column_idx, stream_context, derived_column_data->stream_type);
371
3.88k
    const int column_idx_size = derived_column_data->column_idx_size;
372
3.88k
    if (derived_column_data->map)
373
3.16k
    {
374
3.16k
      int i;
375
3.16k
      if (!iter->derived_data)
376
45
        iter->derived_data = (void****)cccalloc(dataframe->derived_column_data->rnum, sizeof(void***));
377
3.16k
      if (!iter->derived_data[derived_column_idx])
378
194
        iter->derived_data[derived_column_idx] = (void***)cccalloc(derived_column_data->column_idx_size, sizeof(void**));
379
3.16k
      void*** const derived_data = iter->derived_data[derived_column_idx];
380
6.68k
      for (i = 0; i < column_idx_size; 
i++3.51k
)
381
3.51k
      {
382
3.51k
        derived_data[i] = FETCHED_DATA(iter, derived_column_data->column_idxs[i]);
383
3.51k
        _ccv_cnnp_dataframe_column_data(dataframe, iter, cached_data, derived_data[i], row_idxs, row_size, derived_column_data->column_idxs[i], cached_step, stream_context);
384
3.51k
      }
385
      // Mark it as const.
386
3.16k
      derived_column_data->map((void *const *const *)derived_data, derived_column_data->column_idx_size, row_size, fetched_data, derived_column_data->context, child_ctx.stream_context);
387
3.16k
    } else
388
721
      derived_column_data->data_enum(column_idx, row_idxs, row_size, fetched_data, derived_column_data->context, child_ctx.stream_context);
389
3.88k
    if (child_ctx.stream_context != stream_context)
390
603
    {
391
603
      ccv_nnc_stream_signal_t* const signal = ccv_nnc_stream_context_emit_signal_new(child_ctx.stream_context);
392
603
      ccv_nnc_stream_context_wait_signal(stream_context, signal);
393
603
    }
394
3.88k
  } else {
395
924
    const ccv_cnnp_column_data_t* const column_data = dataframe->column_data + column_idx;
396
924
    ccv_cnnp_dataframe_column_ctx_t child_ctx = _ccv_cnnp_child_column_ctx_for_stream_type(dataframe, iter, column_idx, stream_context, column_data->stream_type);
397
924
    column_data->data_enum(column_idx, row_idxs, row_size, fetched_data, column_data->context, child_ctx.stream_context);
398
924
    if (child_ctx.stream_context != stream_context)
399
0
    {
400
0
      ccv_nnc_stream_signal_t* const signal = ccv_nnc_stream_context_emit_signal_new(child_ctx.stream_context);
401
0
      ccv_nnc_stream_context_wait_signal(stream_context, signal);
402
0
    }
403
924
  }
404
1.21M
  
for (i = 0; 4.81k
i < row_size;
i++1.20M
)
405
1.20M
  {
406
1.20M
    cached_data[i + column_idx * cached_step].flag = 1;
407
1.20M
    cached_data[i + column_idx * cached_step].ctx = (uint64_t)(uintptr_t)stream_context;
408
1.20M
    cached_data[i + column_idx * cached_step].data = fetched_data[i];
409
1.20M
  }
410
4.81k
}
411
412
int ccv_cnnp_dataframe_iter_next(ccv_cnnp_dataframe_iter_t* const iter, void** const data_ref, const int column_idx_size, ccv_nnc_stream_context_t* const stream_context)
413
181k
{
414
181k
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
415
181k
  assert(column_idx_size <= iter->column_idx_size);
416
181k
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum181k
:
0120
);
417
181k
  int i;
418
  // Push existing data back to reusable state (note, these may not be reused immediately because they may be on a different stream context).
419
1.54M
  for (i = 0; i < column_size; 
i++1.36M
)
420
1.36M
    if (iter->cached_data[i].flag)
421
1.20M
    {
422
1.20M
      _ccv_cnnp_dataframe_enqueue_data(dataframe, iter->cached_data[i].data, i, iter->cached_data[i].ctx);
423
1.20M
      iter->cached_data[i].flag = 0;
424
1.20M
      iter->cached_data[i].data = 0;
425
1.20M
      iter->cached_data[i].ctx = 0;
426
1.20M
    }
427
181k
  const int idx = iter->idx;
428
181k
  iter->flag = 1; // Mark it as we called next already.
429
181k
  if (idx > dataframe->row_count) // If we exceed row count, return -2.
430
0
    return -2;
431
181k
  if (idx == dataframe->row_count) // Otherwise, no more row, return -1.
432
26
  {
433
26
    ++iter->idx;
434
26
    return -1;
435
26
  }
436
181k
  if (iter->prefetch_tail != -1) // If there is something in prefetch log.
437
181k
  {
438
181k
    ccv_array_t* const prefetches = iter->prefetches;
439
181k
    assert(prefetches);
440
181k
    const int lines = prefetches->rnum / column_size;
441
181k
    if (iter->prefetch_head == iter->prefetch_tail) // Only one item.
442
683
      iter->prefetch_tail = -1;
443
181k
    ccv_cnnp_dataframe_data_item_t* const cached_data = (ccv_cnnp_dataframe_data_item_t*)ccv_array_get(iter->prefetches, iter->prefetch_head);
444
1.41M
    for (i = 0; i < column_size; 
i++1.23M
)
445
1.23M
    {
446
1.23M
      if (!cached_data[i * lines].flag)
447
32.1k
        continue;
448
1.20M
      if (cached_data[i * lines].ctx == (uint64_t)(uintptr_t)stream_context) // If match existing stream context.
449
1.20M
        iter->cached_data[i] = cached_data[i * lines];
450
0
      else // Recycle
451
0
        _ccv_cnnp_dataframe_enqueue_data(dataframe, cached_data[i * lines].data, i, cached_data[i * lines].ctx);
452
1.20M
    }
453
181k
    ++iter->prefetch_head;
454
181k
    assert(prefetches->rnum % column_size == 0);
455
181k
    if (iter->prefetch_head >= lines)
456
670
      iter->prefetch_head = 0;
457
181k
  }
458
  // Now we are about to create cached_data (above code only uses cached data).
459
  // We are ready to prepare the data_ctx cache.
460
181k
  _ccv_cnnp_dataframe_prepare_data_ctx(dataframe, stream_context);
461
363k
  for (i = 0; i < column_idx_size; 
i++182k
)
462
182k
  {
463
182k
    void* fetched_data[1]; // This guards better than just give away data_ref + i.
464
182k
    _ccv_cnnp_dataframe_column_data(dataframe, iter, iter->cached_data, fetched_data, dataframe->shuffled_idx ? 
dataframe->shuffled_idx + idx100k
:
&idx81.9k
, 1, iter->column_idxs[i], 1, stream_context);
465
182k
    data_ref[i] = fetched_data[0];
466
182k
  }
467
181k
  ++iter->idx;
468
181k
  return 0;
469
181k
}
470
471
void ccv_cnnp_dataframe_iter_peek(ccv_cnnp_dataframe_iter_t* const iter, void** const data_ref, const int offset, const int data_ref_size, ccv_nnc_stream_context_t* const stream_context)
472
19
{
473
19
  assert(iter->flag);
474
19
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
475
19
  assert(offset + data_ref_size <= iter->column_idx_size);
476
19
  const int idx = iter->idx - 1; // next is called, therefore, index is already incremented.
477
19
  assert(idx >= 0);
478
19
  assert(idx < dataframe->row_count);
479
19
  int i;
480
51
  for (i = 0; i < data_ref_size; 
i++32
)
481
32
    _ccv_cnnp_dataframe_column_data(dataframe, iter, iter->cached_data, data_ref + i, dataframe->shuffled_idx ? 
dataframe->shuffled_idx + idx0
: &idx, 1, iter->column_idxs[i + offset], 1, stream_context);
482
19
}
483
484
static void _ccv_cnnp_null_prefetches(ccv_cnnp_dataframe_iter_t* const iter)
485
68
{
486
68
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
487
68
  assert(dataframe);
488
68
  int i, j;
489
68
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum54
:
014
);
490
68
  if (iter->prefetch_head <= iter->prefetch_tail)
491
2
  {
492
2
    assert(iter->prefetches);
493
2
    const int lines = iter->prefetches->rnum / column_size;
494
6
    for (i = iter->prefetch_head; i <= iter->prefetch_tail; 
i++4
)
495
4
    {
496
4
      ccv_cnnp_dataframe_data_item_t* const cached_data = ccv_array_get(iter->prefetches, i);
497
10
      for (j = 0; j < column_size; 
j++6
)
498
6
        if (cached_data[j * lines].flag)
499
6
          _ccv_cnnp_dataframe_enqueue_data(dataframe, cached_data[j * lines].data, j, cached_data[j * lines].ctx);
500
4
    }
501
66
  } else if (iter->prefetch_tail >= 0) { // -1 means no item.
502
1
    assert(iter->prefetches);
503
1
    const int lines = iter->prefetches->rnum / column_size;
504
2
    for (i = iter->prefetch_head; i < lines; 
i++1
)
505
1
    {
506
1
      ccv_cnnp_dataframe_data_item_t* const cached_data = ccv_array_get(iter->prefetches, i);
507
3
      for (j = 0; j < column_size; 
j++2
)
508
2
        if (cached_data[j * lines].flag)
509
2
          _ccv_cnnp_dataframe_enqueue_data(dataframe, cached_data[j * lines].data, j, cached_data[j * lines].ctx);
510
1
    }
511
5
    for (i = 0; i <= iter->prefetch_tail; 
i++4
)
512
4
    {
513
4
      ccv_cnnp_dataframe_data_item_t* const cached_data = ccv_array_get(iter->prefetches, i);
514
12
      for (j = 0; j < column_size; 
j++8
)
515
8
        if (cached_data[j * lines].flag)
516
8
          _ccv_cnnp_dataframe_enqueue_data(dataframe, cached_data[j * lines].data, j, cached_data[j * lines].ctx);
517
4
    }
518
1
  }
519
68
  iter->prefetch_head = 0;
520
68
  iter->prefetch_tail = -1;
521
68
}
522
523
static void _ccv_cnnp_prefetch_cached_data(ccv_cnnp_dataframe_iter_t* const iter, ccv_cnnp_dataframe_data_item_t* const cached_data, const int idx, const int max_to_prefetch, ccv_nnc_stream_context_t* const stream_context)
524
719
{
525
719
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
526
719
  assert(dataframe);
527
719
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum698
:
021
);
528
719
  assert(iter->prefetches);
529
719
  const int lines = iter->prefetches->rnum / column_size;
530
719
  int i, j;
531
  // Reset
532
8.07k
  for (i = 0; i < column_size; 
i++7.35k
)
533
1.24M
    
for (j = 0; 7.35k
j < max_to_prefetch;
j++1.23M
)
534
1.23M
    {
535
1.23M
      cached_data[j + i * lines].flag = 0;
536
1.23M
      cached_data[j + i * lines].data = 0;
537
1.23M
      cached_data[j + i * lines].ctx = 0;
538
1.23M
    }
539
719
  if (iter->fetched_size < max_to_prefetch)
540
22
  {
541
22
    iter->fetched_data = ccrealloc(iter->fetched_data, sizeof(void*) * max_to_prefetch * (column_size + 1));
542
22
    iter->fetched_size = max_to_prefetch;
543
22
  }
544
719
  if (dataframe->shuffled_idx)
545
510
    
for (i = 0; 255
i < iter->column_idx_size;
i++255
)
546
255
      _ccv_cnnp_dataframe_column_data(dataframe, iter, cached_data, FETCHED_DATA(iter, iter->column_idxs[i]), dataframe->shuffled_idx + idx, max_to_prefetch, iter->column_idxs[i], lines, stream_context);
547
464
  else {
548
81.1k
    for (i = 0; i < max_to_prefetch; 
i++80.6k
)
549
80.6k
      INDEX_DATA(iter)[i] = idx + i;
550
1.81k
    for (i = 0; i < iter->column_idx_size; 
i++1.34k
)
551
1.34k
      _ccv_cnnp_dataframe_column_data(dataframe, iter, cached_data, FETCHED_DATA(iter, iter->column_idxs[i]), INDEX_DATA(iter), max_to_prefetch, iter->column_idxs[i], lines, stream_context);
552
464
  }
553
719
}
554
555
int ccv_cnnp_dataframe_iter_prefetch(ccv_cnnp_dataframe_iter_t* const iter, const int prefetch_count, ccv_nnc_stream_context_t* const stream_context)
556
725
{
557
725
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
558
725
  assert(dataframe);
559
725
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum703
:
022
);
560
725
  int i, j;
561
725
  assert(iter->idx <= dataframe->row_count);
562
725
  int lines, next, max_to_prefetch;
563
725
  if (iter->prefetch_tail == -1)
564
692
  {
565
692
    if (iter->idx == dataframe->row_count)
566
6
      return -1; // Cannot be done.
567
686
    max_to_prefetch = ccv_min(dataframe->row_count - iter->idx, prefetch_count);
568
686
    if (!iter->prefetches)
569
30
    {
570
30
      iter->prefetches = ccv_array_new(sizeof(ccv_cnnp_dataframe_data_item_t), max_to_prefetch * column_size, 0);
571
30
      ccv_array_resize(iter->prefetches, max_to_prefetch * column_size);
572
30
    }
573
686
    iter->prefetch_tail = iter->prefetch_head = 0; // Advance!
574
686
    next = iter->idx;
575
686
    lines = iter->prefetches->rnum / column_size;
576
    // Reset to enough space.
577
686
    if (lines < max_to_prefetch)
578
1
    {
579
1
      ccv_array_resize(iter->prefetches, max_to_prefetch * column_size);
580
1
      lines = max_to_prefetch;
581
1
    }
582
686
  } else {
583
33
    assert(iter->prefetches);
584
33
    ccv_array_t* const prefetches = iter->prefetches;
585
33
    assert(prefetches->rnum % column_size == 0);
586
33
    lines = prefetches->rnum / column_size;
587
33
    const int prefetched = iter->prefetch_tail >= iter->prefetch_head ? 
iter->prefetch_tail - iter->prefetch_head + 122
:
lines - iter->prefetch_head + iter->prefetch_tail + 111
;
588
33
    if (iter->idx + prefetched == dataframe->row_count) // Nothing to prefetch.
589
2
      return -1;
590
31
    max_to_prefetch = ccv_min(dataframe->row_count - (iter->idx + prefetched), prefetch_count);
591
    // Not enough space, need to resize.
592
31
    if (prefetched + max_to_prefetch > lines)
593
20
    {
594
20
      const int new_lines = prefetched + max_to_prefetch;
595
20
      ccv_array_resize(prefetches, new_lines * column_size);
596
      // These are overlap moves, have to make sure start from the end and move it up to the beginning.
597
20
      if (iter->prefetch_head > iter->prefetch_tail)
598
7
      {
599
7
        const int offset = new_lines - lines;
600
19
        for (i = column_size - 1; i >= 0; 
i--12
)
601
12
        {
602
24
          for (j = lines - 1; j >= iter->prefetch_head; 
j--12
)
603
12
            *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + offset + i * new_lines) = *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * lines);
604
41
          for (j = iter->prefetch_tail; j >= 0; 
j--29
)
605
29
            *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * new_lines) = *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * lines);
606
12
        }
607
7
        iter->prefetch_head += offset;
608
13
      } else {
609
33
        for (i = column_size - 1; i >= 0; 
i--20
)
610
49
          
for (j = iter->prefetch_tail; 20
j >= iter->prefetch_head;
j--29
)
611
29
            *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * new_lines) = *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * lines);
612
13
      }
613
20
      lines = new_lines;
614
20
    }
615
31
    ++iter->prefetch_tail; // Move to the next ready tail.
616
31
    if (iter->prefetch_tail >= lines)
617
4
      iter->prefetch_tail = 0;
618
31
    next = iter->idx + prefetched;
619
31
  }
620
717
  ccv_array_t* const prefetches = iter->prefetches;
621
717
  ccv_cnnp_dataframe_data_item_t* const cached_data = (ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, iter->prefetch_tail);
622
  // Now we are about to create cached_data (above code only uses cached data).
623
  // We are ready to prepare the data_ctx cache.
624
717
  _ccv_cnnp_dataframe_prepare_data_ctx(dataframe, stream_context);
625
  // If the tail is before the head, we must have enough space for the max_to_prefetch
626
717
  if (iter->prefetch_tail < iter->prefetch_head)
627
15
  {
628
15
    assert(iter->prefetch_tail + max_to_prefetch - 1 < iter->prefetch_head);
629
15
    _ccv_cnnp_prefetch_cached_data(iter, cached_data, next, max_to_prefetch, stream_context);
630
15
    iter->prefetch_tail += max_to_prefetch - 1;
631
702
  } else {
632
    // First, fetch to the end.
633
702
    const int fetch_to_end = ccv_min(max_to_prefetch, lines - iter->prefetch_tail);
634
702
    _ccv_cnnp_prefetch_cached_data(iter, cached_data, next, fetch_to_end, stream_context);
635
702
    if (fetch_to_end == max_to_prefetch)
636
700
      iter->prefetch_tail += fetch_to_end - 1;
637
2
    else {
638
      // Need to fetch more.
639
2
      ccv_cnnp_dataframe_data_item_t* const more_data = (ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, 0);
640
2
      assert(max_to_prefetch > fetch_to_end);
641
2
      _ccv_cnnp_prefetch_cached_data(iter, more_data, next + fetch_to_end, max_to_prefetch - fetch_to_end, stream_context);
642
2
      iter->prefetch_tail = max_to_prefetch - fetch_to_end - 1;
643
2
    }
644
702
  }
645
717
  return 0;
646
717
}
647
648
int ccv_cnnp_dataframe_iter_set_cursor(ccv_cnnp_dataframe_iter_t* const iter, const int idx)
649
10
{
650
10
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
651
10
  assert(dataframe);
652
10
  if (idx >= dataframe->row_count)
653
0
    return -1;
654
10
  if (idx == iter->idx)
655
3
    return 0;
656
7
  iter->idx = idx;
657
7
  iter->flag = 0;
658
7
  _ccv_cnnp_null_prefetches(iter);
659
7
  return 0;
660
10
}
661
662
void ccv_cnnp_dataframe_iter_free(ccv_cnnp_dataframe_iter_t* const iter)
663
61
{
664
61
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
665
61
  const int column_size = iter->column_size;
666
61
  int i;
667
  // Push existing data back to reusable state (note, these may not be reused immediately because they may be on a different stream context).
668
6.76k
  for (i = 0; i < column_size; 
i++6.70k
)
669
6.70k
    if (iter->cached_data[i].flag)
670
190
      _ccv_cnnp_dataframe_enqueue_data(dataframe, iter->cached_data[i].data, i, iter->cached_data[i].ctx);
671
  // Push prefetches back to reusable state.
672
61
  _ccv_cnnp_null_prefetches(iter);
673
61
  if (iter->prefetches)
674
30
    ccv_array_free(iter->prefetches);
675
61
  if (iter->derived_data)
676
45
  {
677
45
    assert(dataframe->derived_column_data);
678
257
    
for (i = 0; 45
i < dataframe->derived_column_data->rnum;
i++212
)
679
212
      if (iter->derived_data[i])
680
194
        ccfree(iter->derived_data[i]);
681
45
    ccfree(iter->derived_data);
682
45
  }
683
61
  ccfree(iter->fetched_data);
684
61
  if (iter->column_ctx)
685
4
  {
686
4
    khash_t(iter_ctx)* const column_ctx = iter->column_ctx;
687
4
    khiter_t k;
688
20
    for (k = 
kh_begin4
(column_ctx); k != kh_end(column_ctx);
++k16
)
689
16
    {
690
16
      if (!kh_exist(column_ctx, k))
691
10
        continue;
692
6
      ccv_cnnp_dataframe_column_ctx_t* const ctx = kh_val(column_ctx, k);
693
82
      for (i = 0; i < column_size; 
i++76
)
694
76
      {
695
76
        if (ctx[i].stream_context)
696
39
          ccv_nnc_stream_context_free(ctx[i].stream_context);
697
76
      }
698
6
    }
699
4
    kh_destroy(iter_ctx, column_ctx);
700
4
  }
701
61
  ccfree(iter);
702
61
}
703
704
void ccv_cnnp_dataframe_free(ccv_cnnp_dataframe_t* const dataframe)
705
56
{
706
56
  int i;
707
56
  khash_t(ctx)* const data_ctx = dataframe->data_ctx;
708
56
  khiter_t k;
709
56
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum44
:
012
);
710
280
  for (k = 
kh_begin56
(data_ctx); k != kh_end(data_ctx);
++k224
)
711
224
  {
712
224
    if (!kh_exist(data_ctx, k))
713
166
      continue;
714
58
    ccv_nnc_stream_context_t* const stream_context = (ccv_nnc_stream_context_t*)(intptr_t)kh_key(data_ctx, k);
715
58
    ccv_array_t* const columns = kh_val(data_ctx, k).columns;
716
58
    if (stream_context)
717
12
    {
718
12
      const int hook_id = kh_val(data_ctx, k).hook_id;
719
12
      ccv_nnc_stream_context_remove_destructor_hook(stream_context, hook_id);
720
12
    }
721
58
    assert(columns->rnum <= column_size);
722
58
    _ccv_cnnp_dataframe_data_ctx_columns_free(dataframe, columns);
723
58
  }
724
56
  kh_destroy(ctx, data_ctx);
725
56
  if (dataframe->derived_column_data)
726
44
  {
727
255
    for (i = 0; i < dataframe->derived_column_data->rnum; 
i++211
)
728
211
    {
729
211
      ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, i);
730
211
      if (derived_column_data->context_deinit)
731
144
        derived_column_data->context_deinit(derived_column_data->context);
732
211
      ccfree(derived_column_data->column_idxs);
733
211
      if (derived_column_data->name)
734
1
        ccfree(derived_column_data->name);
735
211
    }
736
44
    ccv_array_free(dataframe->derived_column_data);
737
44
  }
738
6.54k
  for (i = 0; i < dataframe->column_size; 
i++6.48k
)
739
6.48k
  {
740
6.48k
    if (dataframe->column_data[i].context_deinit)
741
22
      dataframe->column_data[i].context_deinit(dataframe->column_data[i].context);
742
6.48k
    if (dataframe->column_data[i].name)
743
3.21k
      ccfree(dataframe->column_data[i].name);
744
6.48k
  }
745
56
  if (dataframe->shuffled_idx)
746
9
    ccfree(dataframe->shuffled_idx);
747
56
#ifdef HAVE_GSL
748
56
  if (dataframe->rng)
749
9
    gsl_rng_free(dataframe->rng);
750
56
#endif
751
56
  ccfree(dataframe);
752
56
}