Coverage Report

Created: 2019-07-03 22:50

/home/liu/buildslave/linux-x64-runtests/build/lib/nnc/ccv_nnc_symbolic_graph_parallel.c
Line
Count
Source (jump to first uncovered line)
1
#include "ccv_nnc.h"
2
#include "ccv_nnc_easy.h"
3
#include "ccv_nnc_internal.h"
4
#include "ccv_internal.h"
5
#include "_ccv_nnc_symbolic_graph.h"
6
7
#pragma mark - Level-3.5 API
8
9
enum {
10
  CCV_NNC_PARALLEL_BROADCAST = 0x1,
11
  CCV_NNC_PARALLEL_ALLREDUCER = 0x2,
12
  CCV_NNC_PARALLEL_REDUCER = 0x3,
13
};
14
15
static int _ccv_nnc_exec_inputs_contain(const ccv_nnc_graph_exec_symbol_info_t* const node, const int d)
16
572
{
17
572
  int i;
18
1.82k
  for (i = 0; i < node->input_size; 
i++1.25k
)
19
1.47k
    if (node->inputs[i] == d)
20
216
      return 1;
21
572
  
return 0356
;
22
572
}
23
24
void ccv_nnc_symbolic_graph_data_parallel(ccv_nnc_symbolic_graph_t* const graph, const int parallel, const ccv_nnc_tensor_symbol_t* const broadcasts, const int broadcast_size, const ccv_nnc_tensor_symbol_t* const allreducers, const int allreducer_size, const ccv_nnc_tensor_symbol_t* const reducers, const int reducer_size, const int reduce_op_type, const ccv_nnc_graph_exec_symbol_t* const sources, const int source_size, const ccv_nnc_graph_exec_symbol_t* const destinations, const int destination_size)
25
5
{
26
5
  assert(reduce_op_type == CCV_NNC_PARALLEL_REDUCE_OP_SUM);
27
5
  const int parallel_count = (parallel == 0) ? 
ccv_nnc_device_count(CCV_STREAM_CONTEXT_GPU)0
: parallel;
28
5
  if (parallel_count == 1)
29
0
    return;
30
5
  assert(parallel_count > 1);
31
10
  
ccv_nnc_graph_visit_t* const visit = 5
ccv_nnc_graph_visit_new5
(graph, (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, 0), graph->exec_symbol_info->rnum, sources, source_size, destinations, destination_size, 0);
32
10
  int i, j, k;
33
10
  // Tensor symbol has to be on device 0 or any.
34
236
  ccv_nnc_graph_visit_for(visit, (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, 0), node, idx) {
35
1.10k
    for (i = 0; i < node->input_size; 
i++864
)
36
864
      if (node->inputs[i] >= 0)
37
612
      {
38
612
        ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, node->inputs[i]);
39
612
        if (CCV_TENSOR_GET_MEMORY(tensor_symbol->info.type) == CCV_TENSOR_GPU_MEMORY &&
40
612
          CCV_TENSOR_GET_DEVICE(tensor_symbol->info.type) != CCV_COMPUTE_DEVICE_ANY)
41
612
          { assert(CCV_TENSOR_GET_DEVICE(tensor_symbol->info.type) == CCV_COMPUTE_DEVICE_000); }
42
612
      }
43
710
    
for (i = 0; 236
i < node->output_size;
i++474
)
44
474
      if (node->outputs[i] >= 0)
45
432
      {
46
432
        ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, node->outputs[i]);
47
432
        if (CCV_TENSOR_GET_MEMORY(tensor_symbol->info.type) == CCV_TENSOR_GPU_MEMORY &&
48
432
          
CCV_TENSOR_GET_DEVICE429
(tensor_symbol->info.type) != CCV_COMPUTE_DEVICE_ANY429
)
49
429
          { assert(CCV_TENSOR_GET_DEVICE(tensor_symbol->info.type) == CCV_COMPUTE_DEVICE_000); }
50
432
      }
51
236
  } ccv_nnc_graph_visit_endfor
52
10
  // Run infer in the graph to get all tensors shaped.
53
10
  
ccv_nnc_symbolic_graph_symbol_infer(graph, visit, sources, source_size, destinations, destination_size, 0, 0, (ccv_nnc_tensor_symbol_info_t*)5
ccv_array_get5
(graph->tensor_symbol_info, 0), (ccv_nnc_graph_exec_symbol_info_t*)
ccv_array_get5
(graph->exec_symbol_info, 0));
54
5
  // Set ANY device to default device. Make a list of execution nodes / tensors to be duplicated.
55
5
  ccv_array_t* const dup_tensors = ccv_array_new(sizeof(int), 0, 0);
56
5
  ccv_array_t* const dup_execs = ccv_array_new(sizeof(int), 0, 0);
57
5
  ccv_array_t* const broadcast_reduce_execs = ccv_array_new(sizeof(int), 0, 0);
58
5
  const int tensor_symbol_size = graph->tensor_symbol_info->rnum;
59
5
  const int graph_exec_symbol_size = graph->exec_symbol_info->rnum;
60
5
  int* const tensor_flags = cccalloc(tensor_symbol_size + graph_exec_symbol_size, sizeof(int));
61
5
  int* const exec_flags = tensor_flags + tensor_symbol_size;
62
17
  for (i = 0; i < broadcast_size; 
i++12
)
63
12
  {
64
12
    // Doesn't support alias for these.
65
12
    tensor_flags[broadcasts[i].d] = CCV_NNC_PARALLEL_BROADCAST;
66
12
    assert(graph == broadcasts[i].graph);
67
12
    assert(!((ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, broadcasts[i].d))->alias_ref);
68
12
  }
69
5
  int* const allreduce_producers = allreducer_size > 0 ? 
ccmalloc3
(sizeof(int) * allreducer_size)3
:
02
;
70
61
  for (i = 0; i < allreducer_size; 
i++56
)
71
56
  {
72
56
    // Doesn't support alias for these.
73
56
    tensor_flags[allreducers[i].d] = CCV_NNC_PARALLEL_ALLREDUCER;
74
56
    allreduce_producers[i] = -1; // Should all be filled.
75
56
    assert(graph == allreducers[i].graph);
76
56
    assert(!((ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, allreducers[i].d))->alias_ref);
77
56
  }
78
13
  
for (i = 0; 5
i < reducer_size;
i++8
)
79
8
  {
80
8
    // Doesn't support alias for these.
81
8
    tensor_flags[reducers[i].d] = CCV_NNC_PARALLEL_REDUCER;
82
8
    assert(graph == reducers[i].graph);
83
8
    assert(!((ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, reducers[i].d))->alias_ref);
84
8
  }
85
5
  // No overlap between broadcasts, allreducers, reducers.
86
17
  
for (i = 0; 5
i < broadcast_size;
i++12
)
87
44
    
for (j = 0; 12
j < reducer_size;
j++32
)
88
32
      { assert(broadcasts[i].d != reducers[j].d); }
89
17
  
for (i = 0; 5
i < broadcast_size;
i++12
)
90
28
    
for (j = 0; 12
j < allreducer_size;
j++16
)
91
16
      { assert(broadcasts[i].d != allreducers[j].d); }
92
61
  
for (i = 0; 5
i < allreducer_size;
i++56
)
93
56
    for (j = 0; j < reducer_size; 
j++0
)
94
0
      { assert(allreducers[i].d != reducers[j].d); }
95
236
  ccv_nnc_graph_visit_for(visit, (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, 0), node, idx) {
96
236
    int parallelizable_data = 0;
97
236
    int reduce_inputs = 0;
98
236
    int broadcast_outputs = 0;
99
1.10k
    for (i = 0; i < node->input_size; 
i++864
)
100
864
      if (node->inputs[i] >= 0)
101
612
      {
102
612
        ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, node->inputs[i]);
103
612
        if (CCV_TENSOR_GET_MEMORY(tensor_symbol->info.type) == CCV_TENSOR_GPU_MEMORY)
104
612
        {
105
612
          if (CCV_TENSOR_GET_DEVICE(tensor_symbol->info.type) == CCV_COMPUTE_DEVICE_ANY)
106
612
            
CCV_TENSOR_SET_DEVICE_ID0
(tensor_symbol->info.type, 0);
107
612
          // Don't support alias for broadcast / allreducer / reducer.
108
612
          assert(!tensor_symbol->alias_ref || tensor_flags[tensor_symbol->alias_ref - 1] != CCV_NNC_PARALLEL_BROADCAST);
109
612
          assert(!tensor_symbol->alias_ref || tensor_flags[tensor_symbol->alias_ref - 1] != CCV_NNC_PARALLEL_ALLREDUCER);
110
612
          assert(!tensor_symbol->alias_ref || tensor_flags[tensor_symbol->alias_ref - 1] != CCV_NNC_PARALLEL_REDUCER);
111
612
          if (tensor_flags[node->inputs[i]] == CCV_NNC_PARALLEL_REDUCER)
112
8
            reduce_inputs = 1;
113
612
          parallelizable_data = 1;
114
612
        }
115
612
      }
116
710
    
for (i = 0; 236
i < node->output_size;
i++474
)
117
474
      if (node->outputs[i] >= 0)
118
432
      {
119
432
        ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, node->outputs[i]);
120
432
        if (CCV_TENSOR_GET_MEMORY(tensor_symbol->info.type) == CCV_TENSOR_GPU_MEMORY)
121
432
        {
122
432
          if (CCV_TENSOR_GET_DEVICE(tensor_symbol->info.type) == CCV_COMPUTE_DEVICE_ANY)
123
432
            
CCV_TENSOR_SET_DEVICE_ID0
(tensor_symbol->info.type, 0);
124
432
          // Don't support alias for broadcast / allreducer / reducer.
125
432
          assert(!tensor_symbol->alias_ref || tensor_flags[tensor_symbol->alias_ref - 1] != CCV_NNC_PARALLEL_BROADCAST);
126
432
          assert(!tensor_symbol->alias_ref || tensor_flags[tensor_symbol->alias_ref - 1] != CCV_NNC_PARALLEL_ALLREDUCER);
127
432
          assert(!tensor_symbol->alias_ref || tensor_flags[tensor_symbol->alias_ref - 1] != CCV_NNC_PARALLEL_REDUCER);
128
432
          const int d = node->outputs[i];
129
432
          if (tensor_flags[d] == CCV_NNC_PARALLEL_BROADCAST)
130
0
            broadcast_outputs = 1;
131
432
          else if (tensor_flags[d] == CCV_NNC_PARALLEL_ALLREDUCER) {
132
712
            for (j = 0; j < allreducer_size; 
j++656
)
133
712
              if (allreducers[j].d == d)
134
56
              {
135
56
                allreduce_producers[j] = idx;
136
56
                break;
137
56
              }
138
56
          }
139
432
          parallelizable_data = 1;
140
432
        }
141
432
      }
142
236
    assert(!(broadcast_outputs && reduce_inputs)); // This node cannot be both broadcast and reducer.
143
236
    if (broadcast_outputs ^ reduce_inputs)
144
8
    {
145
8
      if (broadcast_outputs)
146
0
        exec_flags[idx] = CCV_NNC_PARALLEL_BROADCAST;
147
8
      else if (reduce_inputs)
148
8
        exec_flags[idx] = CCV_NNC_PARALLEL_REDUCER;
149
8
      ccv_array_push(broadcast_reduce_execs, &idx);
150
228
    } else if (parallelizable_data && !broadcast_outputs && !reduce_inputs) {
151
228
      // If this node contains GPU data that need to be parallelized, and this node itself is not a broadcast node or a reducer node..
152
228
      ccv_array_push(dup_execs, &idx);
153
1.06k
      for (i = 0; i < node->input_size; 
i++840
)
154
840
        if (node->inputs[i] >= 0)
155
588
        {
156
588
          ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, node->inputs[i]);
157
588
          if (CCV_TENSOR_GET_MEMORY(tensor_symbol->info.type) == CCV_TENSOR_GPU_MEMORY)
158
588
          {
159
588
            // Add the symbol alias to first.
160
588
            if (tensor_symbol->alias_ref)
161
6
              ccv_array_add_unique_int(dup_tensors, tensor_symbol->alias_ref - 1);
162
588
            ccv_array_add_unique_int(dup_tensors, node->inputs[i]);
163
588
          }
164
588
        }
165
686
      for (i = 0; i < node->output_size; 
i++458
)
166
458
        if (node->outputs[i] >= 0)
167
416
        {
168
416
          ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, node->outputs[i]);
169
416
          if (CCV_TENSOR_GET_MEMORY(tensor_symbol->info.type) == CCV_TENSOR_GPU_MEMORY)
170
416
          {
171
416
            if (tensor_symbol->alias_ref)
172
4
              ccv_array_add_unique_int(dup_tensors, tensor_symbol->alias_ref - 1);
173
416
            ccv_array_add_unique_int(dup_tensors, node->outputs[i]);
174
416
          }
175
416
        }
176
228
    }
177
236
  } ccv_nnc_graph_visit_endfor
178
5
  // Now, actually create these tensors.
179
5
  if (!graph->data_parallel.tensor_symbol_idx)
180
5
    graph->data_parallel.tensor_symbol_idx = (int*)ccmalloc(sizeof(int) * (parallel_count - 1) * tensor_symbol_size);
181
0
  else if (graph->data_parallel.tensor_symbol_size * (graph->data_parallel.count - 1) != tensor_symbol_size * (parallel_count - 1))
182
0
    // This may shrink too, but that is OK.
183
0
    graph->data_parallel.tensor_symbol_idx = (int*)ccrealloc(graph->data_parallel.tensor_symbol_idx, sizeof(int) * (parallel_count - 1) * tensor_symbol_size);
184
5
  graph->data_parallel.tensor_symbol_size = tensor_symbol_size;
185
5
  graph->data_parallel.count = parallel_count;
186
5
  int* const dup_tensor_idx = graph->data_parallel.tensor_symbol_idx;
187
5
  // dup_tensor_idx is the array starts with 0 here.
188
1.61k
  for (i = 0; i < (parallel_count - 1) * tensor_symbol_size; 
i++1.60k
)
189
1.60k
    dup_tensor_idx[i] = -1;
190
5
  // Make the duplicated tensors (on different devices).
191
590
  for (i = 0; i < dup_tensors->rnum; 
i++585
)
192
585
  {
193
585
    const int d = *(int*)ccv_array_get(dup_tensors, i);
194
585
    ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, d);
195
585
    ccv_nnc_tensor_param_t info = tensor_symbol->info;
196
585
    const int flags = tensor_symbol->flags;
197
585
    if (tensor_symbol->alias_ref)
198
8
    {
199
8
      const int alias_ref = tensor_symbol->alias_ref - 1;
200
24
      for (j = 0; j < parallel_count - 1; 
j++16
)
201
16
      {
202
16
        const int dup_d = dup_tensor_idx[alias_ref * (parallel_count - 1) + j];
203
16
        CCV_TENSOR_SET_DEVICE_ID(info.type, j + 1); // Set the device id.
204
16
        assert(dup_d >= 0);
205
16
        // Get tensor symbol again, it may be invalid after added new symbol (we use it for ofs and inc).
206
16
        ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, d);
207
16
        const ccv_nnc_tensor_symbol_t new_symbol = ccv_nnc_tensor_symbol_alias_new(graph, (ccv_nnc_tensor_symbol_t){
208
16
          .d = dup_d,
209
16
          .graph = graph,
210
16
        }, tensor_symbol->ofs, tensor_symbol->inc, info, 0);
211
16
        ccv_nnc_tensor_symbol_set_flags(graph, new_symbol, flags);
212
16
        dup_tensor_idx[d * (parallel_count - 1) + j] = new_symbol.d;
213
16
      }
214
577
    } else {
215
2.15k
      for (j = 0; j < parallel_count - 1; 
j++1.57k
)
216
1.57k
      {
217
1.57k
        CCV_TENSOR_SET_DEVICE_ID(info.type, j + 1); // Set the device id.
218
1.57k
        const ccv_nnc_tensor_symbol_t new_symbol = ccv_nnc_tensor_symbol_new(graph, info, 0);
219
1.57k
        ccv_nnc_tensor_symbol_set_flags(graph, new_symbol, flags);
220
1.57k
        dup_tensor_idx[d * (parallel_count - 1) + j] = new_symbol.d;
221
1.57k
      }
222
577
    }
223
585
  }
224
5
  ccv_array_free(dup_tensors);
225
5
  // Now, create execs.
226
5
  if (!graph->data_parallel.exec_symbol_idx)
227
5
    graph->data_parallel.exec_symbol_idx = (int*)ccmalloc(sizeof(int) * (parallel_count - 1) * graph_exec_symbol_size);
228
0
  else if (graph->data_parallel.exec_symbol_size * (graph->data_parallel.count - 1) != graph_exec_symbol_size * (parallel_count - 1))
229
0
    // This may shrink too, but that is OK.
230
0
    graph->data_parallel.exec_symbol_idx = (int*)ccrealloc(graph->data_parallel.exec_symbol_idx, sizeof(int) * (parallel_count - 1) * graph_exec_symbol_size);
231
5
  graph->data_parallel.exec_symbol_size = graph_exec_symbol_size;
232
5
  int* const dup_exec_idx = graph->data_parallel.exec_symbol_idx;
233
5
  // dup_exec_idx is the array starts with 0 here.
234
609
  for (i = 0; i < (parallel_count - 1) * graph_exec_symbol_size; 
i++604
)
235
604
    dup_exec_idx[i] = -1;
236
5
  int max_io_size = 1 + parallel_count;
237
5
  // Now make the duplicated execs nodes (on different devices).
238
233
  for (i = 0; i < dup_execs->rnum; 
i++228
)
239
228
  {
240
228
    const int d = *(int*)ccv_array_get(dup_execs, i);
241
228
    ccv_nnc_graph_exec_symbol_info_t* const exec_symbol = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, d);
242
228
    max_io_size = ccv_max(max_io_size, exec_symbol->input_size + exec_symbol->output_size);
243
228
  }
244
5
  max_io_size = ccv_max(max_io_size, parallel_count * 2); // tensors from all parallel_count, the output is to all parallel_count (thus, allreduce).
245
5
  ccv_nnc_tensor_symbol_t max_io[max_io_size];
246
233
  for (i = 0; i < dup_execs->rnum; 
i++228
)
247
228
  {
248
228
    const int d = *(int*)ccv_array_get(dup_execs, i);
249
824
    for (j = 0; j < parallel_count - 1; 
j++596
)
250
596
    {
251
596
      ccv_nnc_graph_exec_symbol_info_t* const exec_symbol = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, d);
252
596
      const ccv_nnc_cmd_t cmd = exec_symbol->cmd;
253
596
      const ccv_nnc_hint_t hint = exec_symbol->hint;
254
596
      const int input_size = exec_symbol->input_size;
255
596
      const int output_size = exec_symbol->output_size;
256
596
      ccv_nnc_tensor_symbol_t* const inputs = max_io;
257
2.90k
      for (k = 0; k < input_size; 
k++2.30k
)
258
2.30k
      {
259
2.30k
        const int idx = exec_symbol->inputs[k];
260
2.30k
        if (idx >= 0)
261
1.62k
          inputs[k].d = dup_tensor_idx[idx * (parallel_count - 1) + j] >= 0 ? dup_tensor_idx[idx * (parallel_count - 1) + j] : 
idx0
;
262
684
        else
263
684
          inputs[k].d = idx;
264
2.30k
        inputs[k].graph = idx != CCV_NNC_NO_TENSOR_SYMBOL ? 
graph1.62k
:
0684
;
265
2.30k
      }
266
596
      ccv_nnc_tensor_symbol_t* const outputs = max_io + input_size;
267
1.83k
      for (k = 0; k < output_size; 
k++1.24k
)
268
1.24k
      {
269
1.24k
        const int idx = exec_symbol->outputs[k];
270
1.24k
        if (idx >= 0)
271
1.12k
          outputs[k].d = dup_tensor_idx[idx * (parallel_count - 1) + j] >= 0 ? dup_tensor_idx[idx * (parallel_count - 1) + j] : 
idx0
;
272
114
        else
273
114
          outputs[k].d = idx;
274
1.24k
        outputs[k].graph = idx != CCV_NNC_NO_TENSOR_SYMBOL ? 
graph1.12k
:
0114
;
275
1.24k
      }
276
596
      const ccv_nnc_graph_exec_symbol_t new_symbol = ccv_nnc_graph_exec_symbol_new(graph, cmd, inputs, input_size, outputs, output_size, 0);
277
596
      ccv_nnc_graph_exec_symbol_set_hint(graph, new_symbol, hint);
278
596
      dup_exec_idx[d * (parallel_count - 1) + j] = new_symbol.d;
279
596
    }
280
228
  }
281
5
  // Create new tensors for broadcast / reduce.
282
5
  int* const broadcast_reduce_tensor_idx = cccalloc(tensor_symbol_size, sizeof(int));
283
25
  for (i = 0; i < broadcast_size + reducer_size; 
i++20
)
284
20
  {
285
20
    const int idx = i >= broadcast_size ? 
reducers[i - broadcast_size].d8
:
broadcasts[i].d12
;
286
20
    ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, idx);
287
20
    ccv_nnc_tensor_param_t info = tensor_symbol->info;
288
20
    const int flags = tensor_symbol->flags;
289
20
    // No alias handling.
290
20
    assert(!tensor_symbol->alias_ref);
291
20
    const ccv_nnc_tensor_symbol_t new_symbol = ccv_nnc_tensor_symbol_new(graph, info, 0);
292
20
    ccv_nnc_tensor_symbol_set_flags(graph, new_symbol, flags);
293
20
    broadcast_reduce_tensor_idx[idx] = new_symbol.d + 1;
294
20
  }
295
5
  int* const broadcast_exec_idx = cccalloc(tensor_symbol_size, sizeof(int));
296
5
  int* const reduce_exec_idx = cccalloc(tensor_symbol_size, sizeof(int));
297
5
  // Create node for broadcast (thus, transfer data to different parallel_count) and reducer (transfer data back to a device, and sum).
298
13
  for (i = 0; i < broadcast_reduce_execs->rnum; 
i++8
)
299
8
  {
300
8
    const int d = *(int*)ccv_array_get(broadcast_reduce_execs, i);
301
8
    // For broadcast, we create data transfers as our dup node, and create connections to these data transfers.
302
8
    if (exec_flags[d] == CCV_NNC_PARALLEL_BROADCAST)
303
0
    {
304
0
      ccv_nnc_graph_exec_symbol_info_t* const exec_symbol = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, d);
305
0
      ccv_nnc_tensor_symbol_t* const inputs = max_io;
306
0
      ccv_nnc_tensor_symbol_t* const outputs = max_io + 1;
307
0
      const ccv_nnc_graph_exec_symbol_t source = {
308
0
        .d = d,
309
0
        .graph = graph,
310
0
      };
311
0
      for (j = 0; j < exec_symbol->output_size; j++)
312
0
      {
313
0
        const int idx = exec_symbol->outputs[j];
314
0
        if (idx >= 0 && tensor_flags[idx] == CCV_NNC_PARALLEL_BROADCAST)
315
0
        {
316
0
          inputs[0] = (ccv_nnc_tensor_symbol_t){
317
0
            .d = idx,
318
0
            .graph = graph,
319
0
          };
320
0
          // Reset the tensor flags, it is broadcasted now.
321
0
          tensor_flags[idx] = 0;
322
0
          outputs[0] = (ccv_nnc_tensor_symbol_t){
323
0
            .d = broadcast_reduce_tensor_idx[idx] - 1,
324
0
            .graph = graph,
325
0
          };
326
0
          assert(broadcast_reduce_tensor_idx[idx] > 0);
327
0
          for (k = 0; k < parallel_count - 1; k++)
328
0
            outputs[k + 1] = (ccv_nnc_tensor_symbol_t){
329
0
              .d = dup_tensor_idx[idx * (parallel_count - 1) + k],
330
0
              .graph = graph,
331
0
            };
332
0
          const ccv_nnc_graph_exec_symbol_t bcast = ccv_nnc_graph_exec_symbol_new(graph, CMD_COMM_BROADCAST_FORWARD(), inputs, 1, outputs, parallel_count, 0);
333
0
          ccv_nnc_graph_exec_symbol_concat(graph, source, bcast);
334
0
          broadcast_exec_idx[idx] = bcast.d + 1;
335
0
        }
336
0
      }
337
8
    } else if (exec_flags[d] == CCV_NNC_PARALLEL_REDUCER) {
338
8
      // Gather is a bit more sophisticated, we need to use the new tensor to hold the summed value.
339
8
      // This is what we have right now, I will use NCCL later.
340
8
      ccv_nnc_tensor_symbol_t* const inputs = max_io;
341
8
      ccv_nnc_tensor_symbol_t* const outputs = max_io + parallel_count;
342
8
      ccv_nnc_graph_exec_symbol_info_t* exec_symbol = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, d);
343
32
      for (j = 0; j < exec_symbol->input_size; 
j++24
)
344
24
      {
345
24
        const int idx = exec_symbol->inputs[j];
346
24
        if (idx >= 0 && tensor_flags[idx] == CCV_NNC_PARALLEL_REDUCER)
347
8
        {
348
8
          inputs[0] = (ccv_nnc_tensor_symbol_t){
349
8
            .d = idx,
350
8
            .graph = graph,
351
8
          };
352
16
          for (k = 0; k < parallel_count - 1; 
k++8
)
353
8
            inputs[k + 1] = (ccv_nnc_tensor_symbol_t){
354
8
              .d = dup_tensor_idx[idx * (parallel_count - 1) + k],
355
8
              .graph = graph,
356
8
            };
357
8
          outputs[0] = (ccv_nnc_tensor_symbol_t){
358
8
            .d = broadcast_reduce_tensor_idx[idx] - 1,
359
8
            .graph = graph,
360
8
          };
361
8
          // Create new symbol for all other tensors to facilitate copy (this is not useful for NCCL, but useful for REF implementation).
362
8
          ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, idx);
363
8
          ccv_nnc_tensor_param_t info = tensor_symbol->info;
364
8
          const int flags = tensor_symbol->flags;
365
8
          // No alias handling.
366
8
          assert(!tensor_symbol->alias_ref);
367
16
          
for (k = 1; 8
k < parallel_count;
k++8
)
368
8
          {
369
8
            const ccv_nnc_tensor_symbol_t new_symbol = ccv_nnc_tensor_symbol_new(graph, info, 0);
370
8
            ccv_nnc_tensor_symbol_set_flags(graph, new_symbol, flags);
371
8
            outputs[k] = new_symbol;
372
8
          }
373
8
          const ccv_nnc_graph_exec_symbol_t reduce = ccv_nnc_graph_exec_symbol_new(graph, CMD_COMM_REDUCE_FORWARD(), inputs, parallel_count, outputs, parallel_count, 0);
374
8
          // Refresh the pointer to keep it up to date.
375
8
          exec_symbol = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, d);
376
8
          ccv_nnc_graph_exec_symbol_concat(graph, reduce, (ccv_nnc_graph_exec_symbol_t){
377
8
            .d = d,
378
8
            .graph = graph,
379
8
          });
380
8
          reduce_exec_idx[idx] = reduce.d + 1;
381
8
        }
382
24
      }
383
8
      // Update the inputs pointing to the summed value.
384
32
      
for (j = 0; 8
j < exec_symbol->input_size;
j++24
)
385
24
      {
386
24
        const int idx = exec_symbol->inputs[j];
387
24
        if (idx >= 0 && tensor_flags[idx] == CCV_NNC_PARALLEL_REDUCER)
388
8
          exec_symbol->inputs[j] = broadcast_reduce_tensor_idx[idx] - 1;
389
24
      }
390
8
    }
391
8
  }
392
5
  ccv_array_free(broadcast_reduce_execs);
393
5
  // If this tensor is not broadcasted yet, that means there is no exec to generate this tensor. We just generate headless copy.
394
233
  for (i = 0; i < dup_execs->rnum; 
i++228
)
395
228
  {
396
228
    const int idx = *(int*)ccv_array_get(dup_execs, i);
397
228
    ccv_nnc_graph_exec_symbol_info_t* const node = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, idx);
398
228
    if (exec_flags[idx] == CCV_NNC_PARALLEL_REDUCER)
399
0
      continue;
400
228
    // We try to make copy command as compact as possible by having one copy for multiple tensors if they used together.
401
228
    ccv_nnc_tensor_symbol_t* const inputs = max_io;
402
228
    ccv_nnc_tensor_symbol_t* const outputs = max_io + 1;
403
1.06k
    for (j = 0; j < node->input_size; 
j++840
)
404
840
    {
405
840
      const int idx = node->inputs[j];
406
840
      // Now, figure out whether we need to create copy command.
407
840
      if (idx >= 0 && 
idx < tensor_symbol_size588
&&
tensor_flags[idx] == CCV_NNC_PARALLEL_BROADCAST588
)
408
12
      {
409
12
        inputs[0] = (ccv_nnc_tensor_symbol_t){
410
12
          .d = idx,
411
12
          .graph = graph,
412
12
        };
413
12
        // Reset the tensor flags, it is broadcasted now.
414
12
        tensor_flags[idx] = 0;
415
12
        outputs[0] = (ccv_nnc_tensor_symbol_t){
416
12
          .d = broadcast_reduce_tensor_idx[idx] - 1,
417
12
          .graph = graph,
418
12
        };
419
12
        assert(broadcast_reduce_tensor_idx[idx] > 0);
420
24
        
for (k = 0; 12
k < parallel_count - 1;
k++12
)
421
12
          outputs[k + 1] = (ccv_nnc_tensor_symbol_t){
422
12
            .d = dup_tensor_idx[idx * (parallel_count - 1) + k],
423
12
            .graph = graph,
424
12
          };
425
12
        const ccv_nnc_graph_exec_symbol_t bcast = ccv_nnc_graph_exec_symbol_new(graph, CMD_COMM_BROADCAST_FORWARD(), inputs, 1, outputs, parallel_count, 0);
426
12
        broadcast_exec_idx[idx] = bcast.d + 1;
427
12
      }
428
840
    }
429
228
  }
430
5
  ccfree(broadcast_reduce_tensor_idx);
431
5
  ccv_array_free(dup_execs);
432
5
  // Now everything is dup'ed, connect them all.
433
236
  ccv_nnc_graph_visit_for(visit, (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, 0), node, idx) {
434
1.10k
    for (i = 0; i < node->input_size; 
i++864
)
435
864
    {
436
864
      const int input = node->inputs[i];
437
864
      // If it is broadcast worthy.
438
864
      if (input >= 0 && 
input < tensor_symbol_size612
&&
broadcast_exec_idx[input]604
)
439
27
        ccv_nnc_graph_exec_symbol_concat(graph, (ccv_nnc_graph_exec_symbol_t){
440
27
          .d = broadcast_exec_idx[input] - 1,
441
27
          .graph = graph,
442
27
        }, (ccv_nnc_graph_exec_symbol_t){
443
27
          .d = idx,
444
27
          .graph = graph,
445
27
        });
446
864
    }
447
236
    // Check whether this node has outgoing to the reducer node, if so, replace that to the sum node.
448
236
    if (node->outgoings && 
node->outgoings->rnum172
)
449
527
      
for (i = 0; 172
i < node->outgoings->rnum;
i++355
)
450
355
      {
451
355
        const int outgoing_idx = *(int*)ccv_array_get(node->outgoings, i);
452
355
        if (outgoing_idx >= graph_exec_symbol_size)
453
0
          continue;
454
355
        if (exec_flags[outgoing_idx] == CCV_NNC_PARALLEL_REDUCER)
455
40
          
for (j = 0; 12
j < node->output_size;
j++28
)
456
28
          {
457
28
            const int output_idx = node->outputs[j];
458
28
            if (output_idx >= 0 && 
tensor_flags[output_idx] == CCV_NNC_PARALLEL_REDUCER24
)
459
16
            {
460
16
              assert(reduce_exec_idx[output_idx]);
461
16
              ccv_array_replace_unique_int(node->outgoings, outgoing_idx, reduce_exec_idx[output_idx] - 1);
462
16
            }
463
28
          }
464
355
      }
465
840
    
for (i = 0; 236
i < parallel_count - 1;
i++604
)
466
604
    {
467
604
      const int d = dup_exec_idx[idx * (parallel_count - 1) + i];
468
604
      if (d < 0)
469
8
        continue;
470
596
      const ccv_nnc_graph_exec_symbol_t source = {
471
596
        .d = d,
472
596
        .graph = graph,
473
596
      };
474
596
      // If it is broadcast worthy.
475
2.90k
      for (j = 0; j < node->input_size; 
j++2.30k
)
476
2.30k
      {
477
2.30k
        const int input = node->inputs[j];
478
2.30k
        if (input >= 0 && 
input < tensor_symbol_size1.62k
&&
broadcast_exec_idx[input]1.62k
)
479
19
          ccv_nnc_graph_exec_symbol_concat(graph, (ccv_nnc_graph_exec_symbol_t){
480
19
            .d = broadcast_exec_idx[input] - 1,
481
19
            .graph = graph,
482
19
          }, source);
483
2.30k
      }
484
596
      // If it is reduce worthy.
485
1.83k
      for (j = 0; j < node->output_size; 
j++1.24k
)
486
1.24k
      {
487
1.24k
        const int output = node->outputs[j];
488
1.24k
        if (output >= 0 && 
output < tensor_symbol_size1.12k
&&
reduce_exec_idx[output]1.12k
)
489
8
          ccv_nnc_graph_exec_symbol_concat(graph, source, (ccv_nnc_graph_exec_symbol_t){
490
8
            .d = reduce_exec_idx[output] - 1,
491
8
            .graph = graph,
492
8
          });
493
1.24k
      }
494
596
      if (node->outgoings && 
node->outgoings->rnum436
)
495
1.37k
        
for (j = 0; 436
j < node->outgoings->rnum;
j++939
)
496
939
        {
497
939
          const int outgoing_idx = *(int*)ccv_array_get(node->outgoings, j);
498
939
          if (outgoing_idx > graph_exec_symbol_size)
499
8
            continue;
500
931
          const int outgoing_d = dup_exec_idx[outgoing_idx * (parallel_count - 1) + i];
501
931
          if (outgoing_d < 0)
502
4
            continue;
503
927
          ccv_nnc_graph_exec_symbol_concat(graph, source, (ccv_nnc_graph_exec_symbol_t){
504
927
            .d = outgoing_d,
505
927
            .graph = graph,
506
927
          });
507
927
        }
508
596
    }
509
236
  } ccv_nnc_graph_visit_endfor
510
5
  ccfree(broadcast_exec_idx);
511
5
  ccfree(reduce_exec_idx);
512
5
  ccfree(tensor_flags);
513
5
  ccv_nnc_graph_visit_free(visit);
514
5
  // Allreduce is easier to do, we do that the last. It consists of two steps:
515
5
  // 1. Generate allreduce node for each symbol;
516
5
  // 2. Disconnect them from source and connect them through all reduce nodes.
517
61
  for (i = 0; i < allreducer_size; 
i++56
)
518
56
  {
519
56
    ccv_nnc_tensor_symbol_t* const outputs = max_io + parallel_count;
520
56
    outputs[0] = allreducers[i];
521
56
    // Copy over allreducers output symbols (as the old symbol).
522
216
    for (j = 0; j < parallel_count - 1; 
j++160
)
523
160
    {
524
160
      outputs[j + 1].graph = graph;
525
160
      outputs[j + 1].d = dup_tensor_idx[allreducers[i].d * (parallel_count - 1) + j];
526
160
    }
527
56
    ccv_nnc_tensor_symbol_t* const inputs = max_io;
528
56
    // Create identical new tensor symbols
529
272
    for (j = 0; j < parallel_count; 
j++216
)
530
216
      inputs[j] = ccv_nnc_tensor_symbol_new(graph, ccv_nnc_tensor_symbol_params(graph, outputs[j]), 0);
531
56
    // Create allreduce node.
532
56
    const ccv_nnc_graph_exec_symbol_t allreduce = ccv_nnc_graph_exec_symbol_new(graph, CMD_COMM_ALLREDUCE_FORWARD(), inputs, parallel_count, outputs, parallel_count, 0);
533
56
    const int exec_idx = allreduce_producers[i];
534
56
    assert(exec_idx >= 0);
535
56
    ccv_nnc_graph_exec_symbol_info_t* const node = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, exec_idx);
536
272
    for (j = 0; j < node->output_size; 
j++216
)
537
216
      if (node->outputs[j] == outputs[0].d)
538
56
        node->outputs[j] = inputs[0].d;
539
56
    ccv_nnc_graph_exec_symbol_concat(graph, (ccv_nnc_graph_exec_symbol_t){
540
56
      .graph = graph,
541
56
      .d = exec_idx,
542
56
    }, allreduce);
543
260
    for (j = 0; j < node->outgoings->rnum;)
544
204
    {
545
204
      const int d = *(int*)ccv_array_get(node->outgoings, j);
546
204
      if (d == allreduce.d)
547
56
      {
548
56
        ++j;
549
56
        continue;
550
56
      }
551
148
      // Get the destination nodes, and check whether they have inputs matches our outputs.
552
148
      ccv_nnc_graph_exec_symbol_info_t* const outgoing_node = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, d);
553
148
      if (_ccv_nnc_exec_inputs_contain(outgoing_node, allreducers[i].d))
554
56
      {
555
56
        ccv_nnc_graph_exec_symbol_concat(graph, allreduce, (ccv_nnc_graph_exec_symbol_t){
556
56
          .graph = graph,
557
56
          .d = d,
558
56
        });
559
56
        // Remove the connection.
560
56
        if (j < node->outgoings->rnum - 1)
561
56
          *(int*)ccv_array_get(node->outgoings, j) = *(int*)ccv_array_get(node->outgoings, node->outgoings->rnum - 1);
562
56
        --node->outgoings->rnum;
563
56
      } else
564
92
        ++j;
565
148
    }
566
216
    for (j = 0; j < parallel_count - 1; 
j++160
)
567
160
    {
568
160
      const int new_exec_idx = dup_exec_idx[exec_idx * (parallel_count - 1) + j];
569
160
      ccv_nnc_graph_exec_symbol_info_t* const node = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, new_exec_idx);
570
784
      for (k = 0; k < node->output_size; 
k++624
)
571
624
        if (node->outputs[k] == outputs[j + 1].d)
572
160
          node->outputs[k] = inputs[j + 1].d;
573
160
      ccv_nnc_graph_exec_symbol_concat(graph, (ccv_nnc_graph_exec_symbol_t){
574
160
        .graph = graph,
575
160
        .d = new_exec_idx,
576
160
      }, allreduce);
577
744
      for (k = 0; k < node->outgoings->rnum;)
578
584
      {
579
584
        const int d = *(int*)ccv_array_get(node->outgoings, k);
580
584
        if (d == allreduce.d)
581
160
        {
582
160
          ++k;
583
160
          continue;
584
160
        }
585
424
        // Get the destination nodes, and check whether they have inputs matches our outputs.
586
424
        ccv_nnc_graph_exec_symbol_info_t* const outgoing_node = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, d);
587
424
        if (_ccv_nnc_exec_inputs_contain(outgoing_node, outputs[j + 1].d))
588
160
        {
589
160
          ccv_nnc_graph_exec_symbol_concat(graph, allreduce, (ccv_nnc_graph_exec_symbol_t){
590
160
            .graph = graph,
591
160
            .d = d,
592
160
          });
593
160
          // Remove the connection.
594
160
          if (k < node->outgoings->rnum - 1)
595
160
            *(int*)ccv_array_get(node->outgoings, k) = *(int*)ccv_array_get(node->outgoings, node->outgoings->rnum - 1);
596
160
          --node->outgoings->rnum;
597
160
        } else
598
264
          ++k;
599
424
      }
600
160
    }
601
56
  }
602
5
}
603
604
ccv_nnc_tensor_symbol_t ccv_nnc_tensor_symbol_copy(const ccv_nnc_symbolic_graph_t* const graph, const ccv_nnc_tensor_symbol_t symbol, const int device_id)
605
31.9k
{
606
31.9k
  assert(graph->data_parallel.tensor_symbol_idx);
607
31.9k
  assert(symbol.d >= 0);
608
31.9k
  assert(symbol.d < graph->data_parallel.tensor_symbol_size);
609
31.9k
  if (device_id == 0)
610
0
    return symbol;
611
31.9k
  const int parallel_count = graph->data_parallel.count;
612
31.9k
  if (graph->data_parallel.tensor_symbol_idx[symbol.d * (parallel_count - 1) + device_id - 1] < 0)
613
0
    return NO_TENSOR_SYMBOL;
614
31.9k
  ccv_nnc_tensor_symbol_t tensor = {
615
31.9k
    .d = graph->data_parallel.tensor_symbol_idx[symbol.d * (parallel_count - 1) + device_id - 1],
616
31.9k
    .graph = graph,
617
31.9k
  };
618
31.9k
  return tensor;
619
31.9k
}
620
621
ccv_nnc_graph_exec_symbol_t ccv_nnc_graph_exec_symbol_copy(const ccv_nnc_symbolic_graph_t* const graph, const ccv_nnc_graph_exec_symbol_t symbol, const int device_id)
622
534k
{
623
534k
  assert(graph->data_parallel.exec_symbol_idx);
624
534k
  assert(symbol.d >= 0);
625
534k
  assert(symbol.d < graph->data_parallel.exec_symbol_size);
626
534k
  if (device_id == 0)
627
0
    return symbol;
628
534k
  const int parallel_count = graph->data_parallel.count;
629
534k
  if (graph->data_parallel.exec_symbol_idx[symbol.d * (parallel_count - 1) + device_id - 1] < 0)
630
0
    return NO_GRAPH_EXEC_SYMBOL;
631
534k
  ccv_nnc_graph_exec_symbol_t graph_exec = {
632
534k
    .d = graph->data_parallel.exec_symbol_idx[symbol.d * (parallel_count - 1) + device_id - 1],
633
534k
    .graph = graph,
634
534k
  };
635
534k
  return graph_exec;
636
534k
}