Coverage Report

Created: 2019-07-03 22:50

/home/liu/buildslave/linux-x64-runtests/build/lib/nnc/ccv_cnnp_dataframe.c
Line
Count
Source (jump to first uncovered line)
1
#include "ccv_nnc.h"
2
#include "ccv_nnc_easy.h"
3
#include "ccv_nnc_internal.h"
4
#include "ccv_internal.h"
5
#include "3rdparty/khash/khash.h"
6
#ifdef HAVE_GSL
7
#include <gsl/gsl_rng.h>
8
#include <gsl/gsl_randist.h>
9
#else
10
#include "3rdparty/sfmt/SFMT.h"
11
#endif
12
13
KHASH_MAP_INIT_INT64(ctx, ccv_array_t*)
14
15
struct ccv_cnnp_dataframe_s {
16
  int row_count;
17
  int column_size;
18
  int* shuffled_idx;
19
#ifdef HAVE_GSL
20
  gsl_rng* rng;
21
#else
22
  sfmt_t sfmt;
23
#endif
24
  khash_t(ctx)* data_ctx; // The stream context based cache for data entity of columns. This helps us to avoid allocations when iterate through data.
25
  ccv_array_t* derived_column_data;
26
  ccv_cnnp_column_data_t column_data[1];
27
};
28
29
typedef struct {
30
  int stream_type;
31
  int column_idx_size;
32
  int* column_idxs;
33
  ccv_cnnp_column_data_enum_f data_enum;
34
  ccv_cnnp_column_data_deinit_f data_deinit;
35
  void* context;
36
  ccv_cnnp_column_data_context_deinit_f context_deinit;
37
  ccv_cnnp_column_data_map_f map;
38
} ccv_cnnp_derived_column_data_t;
39
40
ccv_cnnp_dataframe_t* ccv_cnnp_dataframe_new(const ccv_cnnp_column_data_t* const column_data, const int column_size, const int row_count)
41
30
{
42
30
  assert(column_size >= 0);
43
30
  ccv_cnnp_dataframe_t* const dataframe = (ccv_cnnp_dataframe_t*)cccalloc(1, sizeof(ccv_cnnp_dataframe_t) + sizeof(ccv_cnnp_column_data_t) * (column_size - 1));
44
30
  dataframe->row_count = row_count;
45
30
  dataframe->column_size = column_size;
46
30
  dataframe->data_ctx = kh_init(ctx);
47
30
  if (column_size > 0)
48
29
    memcpy(dataframe->column_data, column_data, sizeof(ccv_cnnp_column_data_t) * column_size);
49
30
  return dataframe;
50
30
}
51
52
void ccv_cnnp_dataframe_shuffle(ccv_cnnp_dataframe_t* const dataframe)
53
73
{
54
73
  assert(dataframe->row_count);
55
73
  int i;
56
73
  if (!dataframe->shuffled_idx)
57
4
  {
58
4
    dataframe->shuffled_idx = (int*)ccmalloc(sizeof(int) * dataframe->row_count);
59
100k
    for (i = 0; i < dataframe->row_count; 
i++100k
)
60
100k
      dataframe->shuffled_idx[i] = i;
61
4
#ifdef HAVE_GSL
62
4
    assert(!dataframe->rng);
63
4
    gsl_rng_env_setup();
64
4
    dataframe->rng = gsl_rng_alloc(gsl_rng_default);
65
4
    gsl_rng_set(dataframe->rng, (unsigned long int)dataframe);
66
#else
67
    sfmt_init_gen_rand(&dataframe->sfmt, (uint32_t)dataframe);
68
#endif
69
  }
70
73
#ifdef HAVE_GSL
71
73
  gsl_ran_shuffle(dataframe->rng, dataframe->shuffled_idx, dataframe->row_count, sizeof(int));
72
#else
73
  sfmt_genrand_shuffle(&dataframe->sfmt, dataframe->shuffled_idx, dataframe->row_count, sizeof(int));
74
#endif
75
}
76
77
int ccv_cnnp_dataframe_row_count(ccv_cnnp_dataframe_t* const dataframe)
78
9
{
79
9
  return dataframe->row_count;
80
9
}
81
82
int ccv_cnnp_dataframe_add(ccv_cnnp_dataframe_t* const dataframe, ccv_cnnp_column_data_enum_f data_enum, const int stream_type, ccv_cnnp_column_data_deinit_f data_deinit, void* const context, ccv_cnnp_column_data_context_deinit_f context_deinit)
83
17
{
84
17
  if (!dataframe->derived_column_data)
85
1
    dataframe->derived_column_data = ccv_array_new(sizeof(ccv_cnnp_derived_column_data_t), 1, 0);
86
17
  ccv_cnnp_derived_column_data_t column_data = {
87
17
    .stream_type = stream_type,
88
17
    .data_enum = data_enum,
89
17
    .data_deinit = data_deinit,
90
17
    .context = context,
91
17
    .context_deinit = context_deinit,
92
17
  };
93
17
  ccv_array_push(dataframe->derived_column_data, &column_data);
94
17
  return dataframe->column_size + dataframe->derived_column_data->rnum - 1;
95
17
}
96
97
int ccv_cnnp_dataframe_map(ccv_cnnp_dataframe_t* const dataframe, ccv_cnnp_column_data_map_f map, const int stream_type, ccv_cnnp_column_data_deinit_f data_deinit, const int* const column_idxs, const int column_idx_size, void* const context, ccv_cnnp_column_data_context_deinit_f context_deinit)
98
53
{
99
53
  assert(column_idx_size > 0);
100
53
  if (!dataframe->derived_column_data)
101
22
    dataframe->derived_column_data = ccv_array_new(sizeof(ccv_cnnp_derived_column_data_t), 1, 0);
102
53
  const int column_size = dataframe->column_size + dataframe->derived_column_data->rnum;
103
53
  int i;
104
110
  for (i = 0; i < column_idx_size; 
i++57
)
105
57
    { assert(column_idxs[i] < column_size); }
106
53
  ccv_cnnp_derived_column_data_t column_data = {
107
53
    .stream_type = stream_type,
108
53
    .column_idx_size = column_idx_size,
109
53
    .column_idxs = (int*)ccmalloc(sizeof(int) * column_idx_size),
110
53
    .map = map,
111
53
    .data_deinit = data_deinit,
112
53
    .context = context,
113
53
    .context_deinit = context_deinit,
114
53
  };
115
53
  memcpy(column_data.column_idxs, column_idxs, sizeof(int) * column_idx_size);
116
53
  ccv_array_push(dataframe->derived_column_data, &column_data);
117
53
  return dataframe->column_size + dataframe->derived_column_data->rnum - 1;
118
53
}
119
120
void* ccv_cnnp_dataframe_column_context(const ccv_cnnp_dataframe_t* const dataframe, const int column_idx)
121
2
{
122
2
  assert(column_idx >= 0);
123
2
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? dataframe->derived_column_data->rnum : 
00
);
124
2
  assert(column_idx < column_size);
125
2
  if (column_idx < dataframe->column_size)
126
0
    return dataframe->column_data[column_idx].context;
127
2
  assert(dataframe->derived_column_data);
128
2
  ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, column_idx - dataframe->column_size);
129
2
  return derived_column_data->context;
130
2
}
131
132
typedef struct {
133
  int flag; // Mark this as cached or not.
134
  uint64_t ctx; // The stream context.
135
  void* data;
136
} ccv_cnnp_dataframe_data_item_t;
137
138
typedef struct {
139
  ccv_nnc_stream_context_t* stream_context;
140
  ccv_nnc_stream_signal_t* signal;
141
} ccv_cnnp_dataframe_column_ctx_t;
142
143
KHASH_MAP_INIT_INT64(iter_ctx, ccv_cnnp_dataframe_column_ctx_t*)
144
145
struct ccv_cnnp_dataframe_iter_s {
146
  int idx;
147
  int prefetch_head;
148
  int prefetch_tail;
149
  int column_idx_size;
150
  int fetched_size; // The size of fetched data.
151
  ccv_cnnp_dataframe_t* dataframe;
152
  void**** derived_data; // This is ridiculous, but it is true.
153
  void** fetched_data; // The cache to store fetched data.
154
  khash_t(iter_ctx)* column_ctx; // Column context specific to a stream context. The key will be a parent stream context and value will be child stream context + signal.
155
  ccv_array_t* prefetches; // The prefetch contents.
156
  int* column_idxs;
157
  ccv_cnnp_dataframe_data_item_t cached_data[1]; // The data cached when deriving data.
158
};
159
160
100k
#define INDEX_DATA(iter) ((int*)((iter)->fetched_data))
161
66.7k
#define FETCHED_DATA(iter, idx) ((iter)->fetched_data + ((idx) + 1) * (iter)->fetched_size)
162
163
ccv_cnnp_dataframe_iter_t* ccv_cnnp_dataframe_iter_new(ccv_cnnp_dataframe_t* const dataframe, const int* const column_idxs, const int column_idx_size)
164
35
{
165
35
  assert(column_idx_size > 0);
166
35
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum26
:
09
);
167
35
  int i;
168
98
  for (i = 0; i < column_idx_size; 
i++63
)
169
63
    { assert(column_idxs[i] < column_size); }
170
35
  ccv_cnnp_dataframe_iter_t* const iter = (ccv_cnnp_dataframe_iter_t*)cccalloc(1, sizeof(ccv_cnnp_dataframe_iter_t) + sizeof(ccv_cnnp_dataframe_data_item_t) * column_size + sizeof(void*) * (column_idx_size - 1) + sizeof(int) * column_idx_size);
171
35
  iter->dataframe = dataframe;
172
35
  iter->prefetch_tail = -1;
173
35
  iter->column_idx_size = column_idx_size;
174
35
  iter->column_idxs = (int*)(iter->cached_data + column_size);
175
35
  memcpy(iter->column_idxs, column_idxs, sizeof(int) * column_idx_size);
176
35
  // Preallocate fetched data.
177
35
  iter->fetched_size = 1;
178
35
  iter->fetched_data = (void**)ccmalloc(sizeof(void*) * (column_size + 1));
179
35
  return iter;
180
35
}
181
182
static void _ccv_cnnp_dataframe_enqueue_data(ccv_cnnp_dataframe_t* const dataframe, void* const data, const int column_idx, const uint64_t ctx)
183
22.8M
{
184
22.8M
  if (!data)
185
3
    return;
186
22.8M
  khash_t(ctx)* const data_ctx = dataframe->data_ctx;
187
22.8M
  int ret = 0;
188
22.8M
  khiter_t k = kh_put(ctx, data_ctx, ctx, &ret);
189
22.8M
  assert(ret >= 0);
190
22.8M
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum22.8M
:
052
);
191
22.8M
  assert(column_idx < column_size);
192
22.8M
  // If ret == 0, the key already exist, we can get the columns directly, otherwise, create and assign back.
193
22.8M
  ccv_array_t* const columns = (ret == 0) ? 
kh_val22.8M
(data_ctx, k) :
ccv_array_new(sizeof(ccv_array_t*), column_size, 0)34
;
194
22.8M
  if (ret != 0)
195
22.8M
    
kh_val34
(data_ctx, k) = columns34
;
196
22.8M
  if (columns->rnum < column_size)
197
34
    ccv_array_resize(columns, column_size);
198
22.8M
  ccv_array_t* column = *(ccv_array_t**)ccv_array_get(columns, column_idx);
199
22.8M
  if (!column)
200
130
  {
201
130
    column = ccv_array_new(sizeof(void*), 1, 0);
202
130
    *(ccv_array_t**)ccv_array_get(columns, column_idx) = column;
203
130
  }
204
22.8M
  ccv_array_push(column, &data);
205
22.8M
}
206
207
static void* _ccv_cnnp_dataframe_dequeue_data(ccv_cnnp_dataframe_t* const dataframe, const int column_idx, ccv_nnc_stream_context_t* const stream_context)
208
22.8M
{
209
22.8M
  const uint64_t ctx = (uint64_t)(uintptr_t)stream_context;
210
22.8M
  khash_t(ctx)* const data_ctx = dataframe->data_ctx;
211
22.8M
  khiter_t k = kh_get(ctx, data_ctx, ctx);
212
22.8M
  if (k == kh_end(data_ctx))
213
22.8M
    
return 035.9k
;
214
22.8M
  ccv_array_t* const columns = kh_val(data_ctx, k);
215
22.8M
  if (column_idx >= columns->rnum)
216
0
    return 0;
217
22.8M
  ccv_array_t* const column = *(ccv_array_t**)ccv_array_get(columns, column_idx);
218
22.8M
  if (!column || column->rnum == 0)
219
51
    return 0;
220
22.8M
  void* const data = *(void**)ccv_array_get(column, column->rnum - 1);
221
22.8M
  --column->rnum;
222
22.8M
  return data;
223
22.8M
}
224
225
static ccv_cnnp_dataframe_column_ctx_t _ccv_cnnp_child_column_ctx_for_stream_type(ccv_cnnp_dataframe_t* const dataframe, ccv_cnnp_dataframe_iter_t* const iter, const int column_idx, ccv_nnc_stream_context_t* const stream_context, const int stream_type)
226
53.6k
{
227
53.6k
  ccv_cnnp_dataframe_column_ctx_t child_ctx = {
228
53.6k
    .stream_context = stream_context,
229
53.6k
  };
230
53.6k
  if (stream_context && 
ccv_nnc_stream_context_type(stream_context) != stream_type53.1k
&&
stream_type != 046.3k
)
231
20.5k
  {
232
20.5k
    if (!iter->column_ctx)
233
2
      iter->column_ctx = kh_init(iter_ctx);
234
20.5k
    khash_t(iter_ctx)* const column_ctx = iter->column_ctx;
235
20.5k
    int ret = 0;
236
20.5k
    khiter_t k = kh_put(iter_ctx, column_ctx, (uint64_t)(uintptr_t)stream_context, &ret);
237
20.5k
    assert(ret >= 0);
238
20.5k
    const int column_size = dataframe->column_size + (dataframe->derived_column_data ? dataframe->derived_column_data->rnum : 
00
);
239
20.5k
    ccv_cnnp_dataframe_column_ctx_t* const ctx = (ret == 0) ? 
kh_val20.5k
(column_ctx, k) :
cccalloc4
(column_size, sizeof(ccv_cnnp_dataframe_column_ctx_t))4
;
240
20.5k
    if (ret != 0)
241
20.5k
      
kh_val4
(column_ctx, k) = ctx4
;
242
20.5k
    if (!ctx[column_idx].stream_context)
243
24
      ctx[column_idx].stream_context = ccv_nnc_stream_context_new(stream_type);
244
20.5k
    if (!ctx[column_idx].signal)
245
24
      ctx[column_idx].signal = ccv_nnc_stream_signal_new(stream_type);
246
20.5k
    child_ctx = ctx[column_idx];
247
20.5k
  }
248
53.6k
  return child_ctx;
249
53.6k
}
250
251
static void _ccv_cnnp_dataframe_column_data(ccv_cnnp_dataframe_t* const dataframe, ccv_cnnp_dataframe_iter_t* const iter, ccv_cnnp_dataframe_data_item_t* const cached_data, void** const fetched_data, const int* const row_idxs, const int row_size, const int column_idx, const int cached_step, ccv_nnc_stream_context_t* const stream_context)
252
3.61M
{
253
3.61M
  int i;
254
3.61M
  if (cached_data[column_idx * cached_step].flag)
255
3.56M
  {
256
7.05M
    for (i = 1; i < row_size; 
i++3.49M
)
257
3.49M
      { assert(cached_data[i + column_idx * cached_step].flag); }
258
10.6M
    
for (i = 0; 3.56M
i < row_size;
i++7.05M
)
259
7.05M
      fetched_data[i] = cached_data[i + column_idx * cached_step].data;
260
3.56M
    return;
261
53.6k
  } else {
262
22.8M
    for (i = 1; i < row_size; 
i++22.8M
)
263
22.8M
      { assert(!cached_data[i + column_idx * cached_step].flag); }
264
22.9M
    
for (i = 0; 53.6k
i < row_size;
i++22.8M
)
265
22.8M
      fetched_data[i] = _ccv_cnnp_dataframe_dequeue_data(dataframe, column_idx, stream_context);
266
53.6k
  }
267
3.61M
  
if (53.6k
column_idx >= dataframe->column_size53.6k
)
268
46.6k
  {
269
46.6k
    assert(dataframe->derived_column_data);
270
46.6k
    const int derived_column_idx = column_idx - dataframe->column_size;
271
46.6k
    const ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, derived_column_idx);
272
46.6k
    ccv_cnnp_dataframe_column_ctx_t child_ctx = _ccv_cnnp_child_column_ctx_for_stream_type(dataframe, iter, column_idx, stream_context, derived_column_data->stream_type);
273
46.6k
    const int column_idx_size = derived_column_data->column_idx_size;
274
46.6k
    if (derived_column_data->map)
275
32.8k
    {
276
32.8k
      int i;
277
32.8k
      if (!iter->derived_data)
278
24
        iter->derived_data = (void****)cccalloc(dataframe->derived_column_data->rnum, sizeof(void***));
279
32.8k
      if (!iter->derived_data[derived_column_idx])
280
55
        iter->derived_data[derived_column_idx] = (void***)cccalloc(derived_column_data->column_idx_size, sizeof(void**));
281
32.8k
      void*** const derived_data = iter->derived_data[derived_column_idx];
282
69.1k
      for (i = 0; i < column_idx_size; 
i++36.2k
)
283
36.2k
      {
284
36.2k
        derived_data[i] = FETCHED_DATA(iter, derived_column_data->column_idxs[i]);
285
36.2k
        _ccv_cnnp_dataframe_column_data(dataframe, iter, cached_data, derived_data[i], row_idxs, row_size, derived_column_data->column_idxs[i], cached_step, stream_context);
286
36.2k
      }
287
32.8k
      derived_column_data->map(derived_data, derived_column_data->column_idx_size, row_size, fetched_data, derived_column_data->context, child_ctx.stream_context);
288
32.8k
    } else
289
13.8k
      derived_column_data->data_enum(column_idx, row_idxs, row_size, fetched_data, derived_column_data->context, child_ctx.stream_context);
290
46.6k
    if (child_ctx.stream_context != stream_context)
291
20.5k
    {
292
20.5k
      ccv_nnc_stream_context_emit_signal(child_ctx.stream_context, child_ctx.signal);
293
20.5k
      ccv_nnc_stream_context_wait_signal(stream_context, child_ctx.signal);
294
20.5k
    }
295
46.6k
  } else {
296
7.02k
    const ccv_cnnp_column_data_t* const column_data = dataframe->column_data + column_idx;
297
7.02k
    ccv_cnnp_dataframe_column_ctx_t child_ctx = _ccv_cnnp_child_column_ctx_for_stream_type(dataframe, iter, column_idx, stream_context, column_data->stream_type);
298
7.02k
    column_data->data_enum(column_idx, row_idxs, row_size, fetched_data, column_data->context, child_ctx.stream_context);
299
7.02k
    if (child_ctx.stream_context != stream_context)
300
0
    {
301
0
      ccv_nnc_stream_context_emit_signal(child_ctx.stream_context, child_ctx.signal);
302
0
      ccv_nnc_stream_context_wait_signal(stream_context, child_ctx.signal);
303
0
    }
304
7.02k
  }
305
22.9M
  
for (i = 0; 53.6k
i < row_size;
i++22.8M
)
306
22.8M
  {
307
22.8M
    cached_data[i + column_idx * cached_step].flag = 1;
308
22.8M
    cached_data[i + column_idx * cached_step].ctx = (uint64_t)(uintptr_t)stream_context;
309
22.8M
    cached_data[i + column_idx * cached_step].data = fetched_data[i];
310
22.8M
  }
311
53.6k
}
312
313
int ccv_cnnp_dataframe_iter_next(ccv_cnnp_dataframe_iter_t* const iter, void** const data_ref, const int column_idx_size, ccv_nnc_stream_context_t* const stream_context)
314
3.52M
{
315
3.52M
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
316
3.52M
  assert(column_idx_size <= iter->column_idx_size);
317
3.52M
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum3.52M
:
061
);
318
3.52M
  int i;
319
3.52M
  // Push existing data back to reusable state (note, these may not be reused immediately because they may be on a different stream context).
320
26.3M
  for (i = 0; i < column_size; 
i++22.8M
)
321
22.8M
    if (iter->cached_data[i].flag)
322
22.8M
    {
323
22.8M
      _ccv_cnnp_dataframe_enqueue_data(dataframe, iter->cached_data[i].data, i, iter->cached_data[i].ctx);
324
22.8M
      iter->cached_data[i].flag = 0;
325
22.8M
      iter->cached_data[i].data = 0;
326
22.8M
      iter->cached_data[i].ctx = 0;
327
22.8M
    }
328
3.52M
  const int idx = iter->idx;
329
3.52M
  if (idx == dataframe->row_count)
330
86
    return -1;
331
3.52M
  if (iter->prefetch_tail != -1) // If there is something in prefetch log.
332
3.52M
  {
333
3.52M
    ccv_array_t* const prefetches = iter->prefetches;
334
3.52M
    assert(prefetches);
335
3.52M
    const int lines = prefetches->rnum / column_size;
336
3.52M
    if (iter->prefetch_head == iter->prefetch_tail) // Only one item.
337
6.83k
      iter->prefetch_tail = -1;
338
3.52M
    ccv_cnnp_dataframe_data_item_t* const cached_data = (ccv_cnnp_dataframe_data_item_t*)ccv_array_get(iter->prefetches, iter->prefetch_head);
339
26.3M
    for (i = 0; i < column_size; 
i++22.8M
)
340
22.8M
    {
341
22.8M
      if (!cached_data[i * lines].flag)
342
0
        continue;
343
22.8M
      if (cached_data[i * lines].ctx == (uint64_t)(uintptr_t)stream_context) // If match existing stream context.
344
22.8M
        iter->cached_data[i] = cached_data[i * lines];
345
0
      else // Recycle
346
0
        _ccv_cnnp_dataframe_enqueue_data(dataframe, cached_data[i * lines].data, i, cached_data[i * lines].ctx);
347
22.8M
    }
348
3.52M
    ++iter->prefetch_head;
349
3.52M
    assert(prefetches->rnum % column_size == 0);
350
3.52M
    if (iter->prefetch_head >= lines)
351
6.75k
      iter->prefetch_head = 0;
352
3.52M
  }
353
7.07M
  
for (i = 0; 3.52M
i < column_idx_size;
i++3.54M
)
354
3.54M
    _ccv_cnnp_dataframe_column_data(dataframe, iter, iter->cached_data, data_ref + i, dataframe->shuffled_idx ? 
dataframe->shuffled_idx + idx3.45M
:
&idx97.7k
, 1, iter->column_idxs[i], 1, stream_context);
355
3.52M
  ++iter->idx;
356
3.52M
  return 0;
357
3.52M
}
358
359
static void _ccv_cnnp_null_prefetches(ccv_cnnp_dataframe_iter_t* const iter)
360
177
{
361
177
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
362
177
  assert(dataframe);
363
177
  int i, j;
364
177
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum165
:
012
);
365
177
  if (iter->prefetch_head <= iter->prefetch_tail)
366
2
  {
367
2
    assert(iter->prefetches);
368
2
    const int lines = iter->prefetches->rnum / column_size;
369
6
    for (i = iter->prefetch_head; i <= iter->prefetch_tail; 
i++4
)
370
4
    {
371
4
      ccv_cnnp_dataframe_data_item_t* const cached_data = ccv_array_get(iter->prefetches, i);
372
10
      for (j = 0; j < column_size; 
j++6
)
373
6
        if (cached_data[j * lines].flag)
374
6
          _ccv_cnnp_dataframe_enqueue_data(dataframe, cached_data[j * lines].data, j, cached_data[j * lines].ctx);
375
4
    }
376
175
  } else if (iter->prefetch_tail >= 0) { // -1 means no item.
377
1
    assert(iter->prefetches);
378
1
    const int lines = iter->prefetches->rnum / column_size;
379
2
    for (i = iter->prefetch_head; i < lines; 
i++1
)
380
1
    {
381
1
      ccv_cnnp_dataframe_data_item_t* const cached_data = ccv_array_get(iter->prefetches, i);
382
3
      for (j = 0; j < column_size; 
j++2
)
383
2
        if (cached_data[j * lines].flag)
384
2
          _ccv_cnnp_dataframe_enqueue_data(dataframe, cached_data[j * lines].data, j, cached_data[j * lines].ctx);
385
1
    }
386
5
    for (i = 0; i <= iter->prefetch_tail; 
i++4
)
387
4
    {
388
4
      ccv_cnnp_dataframe_data_item_t* const cached_data = ccv_array_get(iter->prefetches, i);
389
12
      for (j = 0; j < column_size; 
j++8
)
390
8
        if (cached_data[j * lines].flag)
391
8
          _ccv_cnnp_dataframe_enqueue_data(dataframe, cached_data[j * lines].data, j, cached_data[j * lines].ctx);
392
4
    }
393
1
  }
394
177
  iter->prefetch_head = 0;
395
177
  iter->prefetch_tail = -1;
396
177
}
397
398
static void _ccv_cnnp_prefetch_cached_data(ccv_cnnp_dataframe_iter_t* const iter, ccv_cnnp_dataframe_data_item_t* const cached_data, const int idx, const int max_to_prefetch, ccv_nnc_stream_context_t* const stream_context)
399
6.87k
{
400
6.87k
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
401
6.87k
  assert(dataframe);
402
6.87k
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum6.85k
:
020
);
403
6.87k
  assert(iter->prefetches);
404
6.87k
  const int lines = iter->prefetches->rnum / column_size;
405
6.87k
  int i, j;
406
6.87k
  // Reset
407
59.6k
  for (i = 0; i < column_size; 
i++52.7k
)
408
22.9M
    
for (j = 0; 52.7k
j < max_to_prefetch;
j++22.8M
)
409
22.8M
    {
410
22.8M
      cached_data[j + i * lines].flag = 0;
411
22.8M
      cached_data[j + i * lines].data = 0;
412
22.8M
      cached_data[j + i * lines].ctx = 0;
413
22.8M
    }
414
6.87k
  if (iter->fetched_size < max_to_prefetch)
415
13
  {
416
13
    iter->fetched_data = ccrealloc(iter->fetched_data, sizeof(void*) * max_to_prefetch * (column_size + 1));
417
13
    iter->fetched_size = max_to_prefetch;
418
13
  }
419
6.87k
  if (dataframe->shuffled_idx)
420
6.77k
    
for (i = 0; 3.38k
i < iter->column_idx_size;
i++3.38k
)
421
3.38k
      _ccv_cnnp_dataframe_column_data(dataframe, iter, cached_data, FETCHED_DATA(iter, iter->column_idxs[i]), dataframe->shuffled_idx + idx, max_to_prefetch, iter->column_idxs[i], lines, stream_context);
422
3.48k
  else {
423
76.9k
    for (i = 0; i < max_to_prefetch; 
i++73.4k
)
424
73.4k
      INDEX_DATA(iter)[i] = idx + i;
425
30.5k
    for (i = 0; i < iter->column_idx_size; 
i++27.0k
)
426
27.0k
      _ccv_cnnp_dataframe_column_data(dataframe, iter, cached_data, FETCHED_DATA(iter, iter->column_idxs[i]), INDEX_DATA(iter), max_to_prefetch, iter->column_idxs[i], lines, stream_context);
427
3.48k
  }
428
6.87k
}
429
430
int ccv_cnnp_dataframe_iter_prefetch(ccv_cnnp_dataframe_iter_t* const iter, const int prefetch_count, ccv_nnc_stream_context_t* const stream_context)
431
6.94k
{
432
6.94k
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
433
6.94k
  assert(dataframe);
434
6.94k
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum6.92k
:
021
);
435
6.94k
  int i, j;
436
6.94k
  assert(iter->idx <= dataframe->row_count);
437
6.94k
  int lines, next, max_to_prefetch;
438
6.94k
  if (iter->prefetch_tail == -1)
439
6.91k
  {
440
6.91k
    if (iter->idx == dataframe->row_count)
441
73
      return -1; // Cannot be done.
442
6.84k
    max_to_prefetch = ccv_min(dataframe->row_count - iter->idx, prefetch_count);
443
6.84k
    if (!iter->prefetches)
444
20
    {
445
20
      iter->prefetches = ccv_array_new(sizeof(ccv_cnnp_dataframe_data_item_t), max_to_prefetch * column_size, 0);
446
20
      ccv_array_resize(iter->prefetches, max_to_prefetch * column_size);
447
20
    }
448
6.84k
    iter->prefetch_tail = iter->prefetch_head = 0; // Advance!
449
6.84k
    next = iter->idx;
450
6.84k
    lines = iter->prefetches->rnum / column_size;
451
6.84k
    // Reset to enough space.
452
6.84k
    if (lines < max_to_prefetch)
453
1
    {
454
1
      ccv_array_resize(iter->prefetches, max_to_prefetch * column_size);
455
1
      lines = max_to_prefetch;
456
1
    }
457
6.84k
  } else {
458
33
    assert(iter->prefetches);
459
33
    ccv_array_t* const prefetches = iter->prefetches;
460
33
    assert(prefetches->rnum % column_size == 0);
461
33
    lines = prefetches->rnum / column_size;
462
33
    const int prefetched = iter->prefetch_tail >= iter->prefetch_head ? 
iter->prefetch_tail - iter->prefetch_head + 122
:
lines - iter->prefetch_head + iter->prefetch_tail + 111
;
463
33
    if (iter->idx + prefetched == dataframe->row_count) // Nothing to prefetch.
464
2
      return -1;
465
31
    max_to_prefetch = ccv_min(dataframe->row_count - (iter->idx + prefetched), prefetch_count);
466
31
    // Not enough space, need to resize.
467
31
    if (prefetched + max_to_prefetch > lines)
468
20
    {
469
20
      const int new_lines = prefetched + max_to_prefetch;
470
20
      ccv_array_resize(prefetches, new_lines * column_size);
471
20
      // These are overlap moves, have to make sure start from the end and move it up to the beginning.
472
20
      if (iter->prefetch_head > iter->prefetch_tail)
473
7
      {
474
7
        const int offset = new_lines - lines;
475
19
        for (i = column_size - 1; i >= 0; 
i--12
)
476
12
        {
477
24
          for (j = lines - 1; j >= iter->prefetch_head; 
j--12
)
478
12
            *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + offset + i * new_lines) = *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * lines);
479
41
          for (j = iter->prefetch_tail; j >= 0; 
j--29
)
480
29
            *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * new_lines) = *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * lines);
481
12
        }
482
7
        iter->prefetch_head += offset;
483
13
      } else {
484
33
        for (i = column_size - 1; i >= 0; 
i--20
)
485
49
          
for (j = iter->prefetch_tail; 20
j >= iter->prefetch_head;
j--29
)
486
29
            *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * new_lines) = *(ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, j + i * lines);
487
13
      }
488
20
      lines = new_lines;
489
20
    }
490
31
    ++iter->prefetch_tail; // Move to the next ready tail.
491
31
    if (iter->prefetch_tail >= lines)
492
4
      iter->prefetch_tail = 0;
493
31
    next = iter->idx + prefetched;
494
31
  }
495
6.94k
  ccv_array_t* const prefetches = iter->prefetches;
496
6.87k
  ccv_cnnp_dataframe_data_item_t* const cached_data = (ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, iter->prefetch_tail);
497
6.87k
  // If the tail is before the head, we must have enough space for the max_to_prefetch
498
6.87k
  if (iter->prefetch_tail < iter->prefetch_head)
499
15
  {
500
15
    assert(iter->prefetch_tail + max_to_prefetch - 1 < iter->prefetch_head);
501
15
    _ccv_cnnp_prefetch_cached_data(iter, cached_data, next, max_to_prefetch, stream_context);
502
15
    iter->prefetch_tail += max_to_prefetch - 1;
503
6.85k
  } else {
504
6.85k
    // First, fetch to the end.
505
6.85k
    const int fetch_to_end = ccv_min(max_to_prefetch, lines - iter->prefetch_tail);
506
6.85k
    _ccv_cnnp_prefetch_cached_data(iter, cached_data, next, fetch_to_end, stream_context);
507
6.85k
    if (fetch_to_end == max_to_prefetch)
508
6.85k
      iter->prefetch_tail += fetch_to_end - 1;
509
2
    else {
510
2
      // Need to fetch more.
511
2
      ccv_cnnp_dataframe_data_item_t* const more_data = (ccv_cnnp_dataframe_data_item_t*)ccv_array_get(prefetches, 0);
512
2
      assert(max_to_prefetch > fetch_to_end);
513
2
      _ccv_cnnp_prefetch_cached_data(iter, more_data, next + fetch_to_end, max_to_prefetch - fetch_to_end, stream_context);
514
2
      iter->prefetch_tail = max_to_prefetch - fetch_to_end - 1;
515
2
    }
516
6.85k
  }
517
6.87k
  return 0;
518
6.87k
}
519
520
int ccv_cnnp_dataframe_iter_set_cursor(ccv_cnnp_dataframe_iter_t* const iter, const int idx)
521
144
{
522
144
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
523
144
  assert(dataframe);
524
144
  if (idx >= dataframe->row_count)
525
0
    return -1;
526
144
  if (idx == iter->idx)
527
2
    return 0;
528
142
  iter->idx = idx;
529
142
  _ccv_cnnp_null_prefetches(iter);
530
142
  return 0;
531
142
}
532
533
void ccv_cnnp_dataframe_iter_free(ccv_cnnp_dataframe_iter_t* const iter)
534
35
{
535
35
  ccv_cnnp_dataframe_t* const dataframe = iter->dataframe;
536
35
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum26
:
09
);
537
35
  int i;
538
35
  // Push existing data back to reusable state (note, these may not be reused immediately because they may be on a different stream context).
539
141
  for (i = 0; i < column_size; 
i++106
)
540
106
    if (iter->cached_data[i].flag)
541
60
      _ccv_cnnp_dataframe_enqueue_data(dataframe, iter->cached_data[i].data, i, iter->cached_data[i].ctx);
542
35
  // Push prefetches back to reusable state.
543
35
  _ccv_cnnp_null_prefetches(iter);
544
35
  if (iter->prefetches)
545
20
    ccv_array_free(iter->prefetches);
546
35
  if (iter->derived_data)
547
24
  {
548
24
    assert(dataframe->derived_column_data);
549
95
    
for (i = 0; 24
i < dataframe->derived_column_data->rnum;
i++71
)
550
71
      if (iter->derived_data[i])
551
71
        
ccfree55
(iter->derived_data[i])55
;
552
24
    ccfree(iter->derived_data);
553
24
  }
554
35
  ccfree(iter->fetched_data);
555
35
  if (iter->column_ctx)
556
2
  {
557
2
    khash_t(iter_ctx)* const column_ctx = iter->column_ctx;
558
2
    khiter_t k;
559
10
    for (k = 
kh_begin2
(column_ctx); k != kh_end(column_ctx);
++k8
)
560
8
    {
561
8
      if (!kh_exist(column_ctx, k))
562
8
        
continue4
;
563
4
      ccv_cnnp_dataframe_column_ctx_t* const ctx = kh_val(column_ctx, k);
564
40
      for (i = 0; i < column_size; 
i++36
)
565
36
      {
566
36
        if (ctx[i].stream_context)
567
24
          ccv_nnc_stream_context_free(ctx[i].stream_context);
568
36
        if (ctx[i].signal)
569
24
          ccv_nnc_stream_signal_free(ctx[i].signal);
570
36
      }
571
4
    }
572
2
    kh_destroy(iter_ctx, column_ctx);
573
2
  }
574
35
  ccfree(iter);
575
35
}
576
577
void ccv_cnnp_dataframe_free(ccv_cnnp_dataframe_t* const dataframe)
578
30
{
579
30
  int i, j;
580
30
  khash_t(ctx)* const data_ctx = dataframe->data_ctx;
581
30
  khiter_t k;
582
30
  const int column_size = dataframe->column_size + (dataframe->derived_column_data ? 
dataframe->derived_column_data->rnum23
:
07
);
583
150
  for (k = 
kh_begin30
(data_ctx); k != kh_end(data_ctx);
++k120
)
584
120
  {
585
120
    if (!kh_exist(data_ctx, k))
586
120
      
continue86
;
587
34
    ccv_array_t* const columns = kh_val(data_ctx, k);
588
34
    assert(columns->rnum <= column_size);
589
164
    
for (i = 0; 34
i < columns->rnum;
i++130
)
590
130
    {
591
130
      ccv_array_t* const column = *(ccv_array_t**)ccv_array_get(columns, i);
592
130
      void* context;
593
130
      ccv_cnnp_column_data_deinit_f data_deinit;
594
130
      if (i < dataframe->column_size)
595
33
      {
596
33
        data_deinit = dataframe->column_data[i].data_deinit;
597
33
        context = dataframe->column_data[i].context;
598
97
      } else {
599
97
        assert(dataframe->derived_column_data);
600
97
        ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, i - dataframe->column_size);
601
97
        data_deinit = derived_column_data->data_deinit;
602
97
        context = derived_column_data->context;
603
97
      }
604
130
      if (data_deinit)
605
20.6k
        
for (j = 0; 83
j < column->rnum;
j++20.5k
)
606
20.5k
          data_deinit(*(void**)ccv_array_get(column, j), context);
607
130
      ccv_array_free(column);
608
130
    }
609
34
    ccv_array_free(columns);
610
34
  }
611
30
  kh_destroy(ctx, data_ctx);
612
30
  if (dataframe->derived_column_data)
613
23
  {
614
93
    for (i = 0; i < dataframe->derived_column_data->rnum; 
i++70
)
615
70
    {
616
70
      ccv_cnnp_derived_column_data_t* const derived_column_data = (ccv_cnnp_derived_column_data_t*)ccv_array_get(dataframe->derived_column_data, i);
617
70
      if (derived_column_data->context_deinit)
618
52
        derived_column_data->context_deinit(derived_column_data->context);
619
70
      ccfree(derived_column_data->column_idxs);
620
70
    }
621
23
    ccv_array_free(dataframe->derived_column_data);
622
23
  }
623
59
  for (i = 0; i < dataframe->column_size; 
i++29
)
624
29
    if (dataframe->column_data[i].context_deinit)
625
9
      dataframe->column_data[i].context_deinit(dataframe->column_data[i].context);
626
30
  if (dataframe->shuffled_idx)
627
30
    
ccfree4
(dataframe->shuffled_idx)4
;
628
30
#ifdef HAVE_GSL
629
30
  if (dataframe->rng)
630
4
    gsl_rng_free(dataframe->rng);
631
30
#endif
632
30
  ccfree(dataframe);
633
30
}