Coverage Report

Created: 2021-04-07 03:47

/home/liu/buildslave/linux-x64-runtests/build/lib/nnc/ccv_nnc_symbolic_graph_parallel.c
Line
Count
Source (jump to first uncovered line)
1
#include "ccv_nnc.h"
2
#include "ccv_nnc_easy.h"
3
#include "ccv_nnc_internal.h"
4
#include "ccv_internal.h"
5
#include "_ccv_nnc_symbolic_graph.h"
6
7
// MARK - Level-3.5 API
8
9
enum {
10
  CCV_NNC_PARALLEL_BROADCAST = 0x1,
11
  CCV_NNC_PARALLEL_ALLREDUCER = 0x2,
12
  CCV_NNC_PARALLEL_REDUCER = 0x3,
13
};
14
15
static int _ccv_nnc_exec_inputs_contain(const ccv_nnc_graph_exec_symbol_info_t* const node, const int d)
16
1.77k
{
17
1.77k
  int i;
18
5.91k
  for (i = 0; i < node->input_size; 
i++4.14k
)
19
4.77k
    if (node->inputs[i] == d)
20
632
      return 1;
21
1.77k
  
return 01.14k
;
22
1.77k
}
23
24
void ccv_nnc_symbolic_graph_data_parallel(ccv_nnc_symbolic_graph_t* const graph, const int parallel, const ccv_nnc_tensor_symbol_t* const broadcasts, const int broadcast_size, const ccv_nnc_tensor_symbol_t* const allreducers, const int allreducer_size, ccv_nnc_tensor_symbol_t* const allreducer_outs, const ccv_nnc_tensor_symbol_t* const reducers, const int reducer_size, ccv_nnc_tensor_symbol_t* const reducer_outs, const int reduce_op_type, const ccv_nnc_graph_exec_symbol_t* const sources, const int source_size, const ccv_nnc_graph_exec_symbol_t* const destinations, const int destination_size)
25
18
{
26
18
  assert(reduce_op_type == CCV_NNC_PARALLEL_REDUCE_OP_SUM);
27
18
  const int parallel_count = (parallel == 0) ? 
ccv_nnc_device_count(CCV_STREAM_CONTEXT_GPU)0
: parallel;
28
18
  if (parallel_count == 1)
29
0
    return;
30
18
  assert(parallel_count > 1);
31
36
  
ccv_nnc_graph_visit_t* const visit = 18
ccv_nnc_graph_visit_new18
(graph, (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, 0), graph->exec_symbol_info->rnum, sources, source_size, destinations, destination_size, 0);
32
36
  int i, j, k;
33
36
  // Tensor symbol has to be on device 0 or any.
34
704
  ccv_nnc_graph_visit_for(visit, (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, 0), node, idx) {
35
2.96k
    for (i = 0; i < node->input_size; 
i++2.25k
)
36
2.25k
      if (node->inputs[i] >= 0)
37
1.65k
      {
38
1.65k
        ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, node->inputs[i]);
39
1.65k
        if (CCV_TENSOR_GET_MEMORY(tensor_symbol->info.type) == CCV_TENSOR_GPU_MEMORY &&
40
1.65k
          CCV_TENSOR_GET_DEVICE(tensor_symbol->info.type) != CCV_COMPUTE_DEVICE_ANY)
41
1.65k
          { assert(CCV_TENSOR_GET_DEVICE(tensor_symbol->info.type) == CCV_COMPUTE_DEVICE_000); }
42
1.65k
      }
43
1.90k
    
for (i = 0; 704
i < node->output_size;
i++1.20k
)
44
1.20k
      if (node->outputs[i] >= 0)
45
1.15k
      {
46
1.15k
        ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, node->outputs[i]);
47
1.15k
        if (CCV_TENSOR_GET_MEMORY(tensor_symbol->info.type) == CCV_TENSOR_GPU_MEMORY &&
48
1.15k
          
CCV_TENSOR_GET_DEVICE1.15k
(tensor_symbol->info.type) != CCV_COMPUTE_DEVICE_ANY1.15k
)
49
1.15k
          { assert(CCV_TENSOR_GET_DEVICE(tensor_symbol->info.type) == CCV_COMPUTE_DEVICE_000); }
50
1.15k
      }
51
704
  } ccv_nnc_graph_visit_endfor
52
36
  // Run infer in the graph to get all tensors shaped.
53
36
  
ccv_nnc_symbolic_graph_symbol_infer(graph, visit, sources, source_size, destinations, destination_size, 0, 0, (ccv_nnc_tensor_symbol_info_t*)18
ccv_array_get18
(graph->tensor_symbol_info, 0), (ccv_nnc_graph_exec_symbol_info_t*)
ccv_array_get18
(graph->exec_symbol_info, 0));
54
18
  // Set ANY device to default device. Make a list of execution nodes / tensors to be duplicated.
55
18
  ccv_array_t* const dup_tensors = ccv_array_new(sizeof(int), 0, 0);
56
18
  ccv_array_t* const dup_execs = ccv_array_new(sizeof(int), 0, 0);
57
18
  ccv_array_t* const broadcast_reduce_execs = ccv_array_new(sizeof(int), 0, 0);
58
18
  int* const allreduce_inputs = allreducer_size > 0 ? 
(int*)10
ccmalloc10
(sizeof(int) * allreducer_size) :
08
;
59
178
  for (i = 0; i < allreducer_size; 
i++160
)
60
160
    allreduce_inputs[i] = ccv_nnc_tensor_symbol_new(graph, ccv_nnc_tensor_symbol_params(graph, allreducers[i]), 0).d;
61
18
  const int tensor_symbol_size = graph->tensor_symbol_info->rnum;
62
18
  const int graph_exec_symbol_size = graph->exec_symbol_info->rnum;
63
18
  int* const tensor_flags = (int*)cccalloc(tensor_symbol_size + graph_exec_symbol_size, sizeof(int));
64
18
  int* const exec_flags = tensor_flags + tensor_symbol_size;
65
30
  for (i = 0; i < broadcast_size; 
i++12
)
66
12
  {
67
12
    // Doesn't support alias for these.
68
12
    tensor_flags[broadcasts[i].d] = CCV_NNC_PARALLEL_BROADCAST;
69
12
    assert(graph == broadcasts[i].graph);
70
12
    assert(!((ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, broadcasts[i].d))->alias_ref);
71
12
  }
72
18
  int* const allreduce_producers = allreducer_size > 0 ? 
(int*)10
cccalloc10
(tensor_symbol_size, sizeof(int)) :
08
;
73
178
  for (i = 0; i < allreducer_size; 
i++160
)
74
160
  {
75
160
    // Doesn't support alias for these.
76
160
    tensor_flags[allreducers[i].d] = CCV_NNC_PARALLEL_ALLREDUCER;
77
160
    assert(graph == allreducers[i].graph);
78
160
    assert(!((ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, allreducers[i].d))->alias_ref);
79
160
  }
80
26
  
for (i = 0; 18
i < reducer_size;
i++8
)
81
8
  {
82
8
    // Doesn't support alias for these.
83
8
    tensor_flags[reducers[i].d] = CCV_NNC_PARALLEL_REDUCER;
84
8
    assert(graph == reducers[i].graph);
85
8
    assert(!((ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, reducers[i].d))->alias_ref);
86
8
  }
87
18
  // No overlap between broadcasts, allreducers, reducers.
88
30
  
for (i = 0; 18
i < broadcast_size;
i++12
)
89
44
    
for (j = 0; 12
j < reducer_size;
j++32
)
90
32
      { assert(broadcasts[i].d != reducers[j].d); }
91
30
  
for (i = 0; 18
i < broadcast_size;
i++12
)
92
28
    
for (j = 0; 12
j < allreducer_size;
j++16
)
93
16
      { assert(broadcasts[i].d != allreducers[j].d); }
94
178
  
for (i = 0; 18
i < allreducer_size;
i++160
)
95
160
    for (j = 0; j < reducer_size; 
j++0
)
96
0
      { assert(allreducers[i].d != reducers[j].d); }
97
704
  ccv_nnc_graph_visit_for(visit, (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, 0), node, idx) {
98
704
    int parallelizable_data = 0;
99
704
    int reduce_inputs = 0;
100
704
    int broadcast_outputs = 0;
101
2.96k
    for (i = 0; i < node->input_size; 
i++2.25k
)
102
2.25k
      if (node->inputs[i] >= 0)
103
1.65k
      {
104
1.65k
        ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, node->inputs[i]);
105
1.65k
        if (CCV_TENSOR_GET_MEMORY(tensor_symbol->info.type) == CCV_TENSOR_GPU_MEMORY)
106
1.65k
        {
107
1.65k
          if (CCV_TENSOR_GET_DEVICE(tensor_symbol->info.type) == CCV_COMPUTE_DEVICE_ANY)
108
1.65k
            
CCV_TENSOR_SET_DEVICE_ID0
(tensor_symbol->info.type, 0);
109
1.65k
          // Don't support alias for broadcast / allreducer / reducer.
110
1.65k
          assert(!tensor_symbol->alias_ref || tensor_flags[tensor_symbol->alias_ref - 1] != CCV_NNC_PARALLEL_BROADCAST);
111
1.65k
          assert(!tensor_symbol->alias_ref || tensor_flags[tensor_symbol->alias_ref - 1] != CCV_NNC_PARALLEL_ALLREDUCER);
112
1.65k
          assert(!tensor_symbol->alias_ref || tensor_flags[tensor_symbol->alias_ref - 1] != CCV_NNC_PARALLEL_REDUCER);
113
1.65k
          const int d = node->inputs[i];
114
1.65k
          if (tensor_flags[d] == CCV_NNC_PARALLEL_REDUCER)
115
8
            reduce_inputs = 1;
116
1.65k
          parallelizable_data = 1;
117
1.65k
        }
118
1.65k
      }
119
1.90k
    
for (i = 0; 704
i < node->output_size;
i++1.20k
)
120
1.20k
      if (node->outputs[i] >= 0)
121
1.15k
      {
122
1.15k
        ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, node->outputs[i]);
123
1.15k
        if (CCV_TENSOR_GET_MEMORY(tensor_symbol->info.type) == CCV_TENSOR_GPU_MEMORY)
124
1.15k
        {
125
1.15k
          if (CCV_TENSOR_GET_DEVICE(tensor_symbol->info.type) == CCV_COMPUTE_DEVICE_ANY)
126
1.15k
            
CCV_TENSOR_SET_DEVICE_ID0
(tensor_symbol->info.type, 0);
127
1.15k
          // Don't support alias for broadcast / allreducer / reducer.
128
1.15k
          assert(!tensor_symbol->alias_ref || tensor_flags[tensor_symbol->alias_ref - 1] != CCV_NNC_PARALLEL_BROADCAST);
129
1.15k
          assert(!tensor_symbol->alias_ref || tensor_flags[tensor_symbol->alias_ref - 1] != CCV_NNC_PARALLEL_ALLREDUCER);
130
1.15k
          assert(!tensor_symbol->alias_ref || tensor_flags[tensor_symbol->alias_ref - 1] != CCV_NNC_PARALLEL_REDUCER);
131
1.15k
          const int d = node->outputs[i];
132
1.15k
          if (tensor_flags[d] == CCV_NNC_PARALLEL_BROADCAST)
133
0
            broadcast_outputs = 1;
134
1.15k
          else if (tensor_flags[d] == CCV_NNC_PARALLEL_ALLREDUCER)
135
160
            allreduce_producers[d] = idx + 1;
136
1.15k
          parallelizable_data = 1;
137
1.15k
        }
138
1.15k
      }
139
704
    assert(!(broadcast_outputs && reduce_inputs)); // This node cannot be both broadcast and reducer.
140
704
    if (broadcast_outputs ^ reduce_inputs)
141
8
    {
142
8
      if (broadcast_outputs)
143
0
        exec_flags[idx] = CCV_NNC_PARALLEL_BROADCAST;
144
8
      else if (reduce_inputs)
145
8
        exec_flags[idx] = CCV_NNC_PARALLEL_REDUCER;
146
8
      ccv_array_push(broadcast_reduce_execs, &idx);
147
696
    } else if (parallelizable_data && !broadcast_outputs && !reduce_inputs) {
148
696
      // If this node contains GPU data that need to be parallelized, and this node itself is not a broadcast node or a reducer node..
149
696
      ccv_array_push(dup_execs, &idx);
150
2.92k
      for (i = 0; i < node->input_size; 
i++2.23k
)
151
2.23k
        if (node->inputs[i] >= 0)
152
1.62k
        {
153
1.62k
          ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, node->inputs[i]);
154
1.62k
          if (CCV_TENSOR_GET_MEMORY(tensor_symbol->info.type) == CCV_TENSOR_GPU_MEMORY)
155
1.62k
          {
156
1.62k
            // Add the symbol alias to first.
157
1.62k
            if (tensor_symbol->alias_ref)
158
165
              ccv_array_add_unique_int(dup_tensors, tensor_symbol->alias_ref - 1);
159
1.62k
            ccv_array_add_unique_int(dup_tensors, node->inputs[i]);
160
1.62k
          }
161
1.62k
        }
162
1.88k
      for (i = 0; i < node->output_size; 
i++1.18k
)
163
1.18k
        if (node->outputs[i] >= 0)
164
1.13k
        {
165
1.13k
          ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, node->outputs[i]);
166
1.13k
          if (CCV_TENSOR_GET_MEMORY(tensor_symbol->info.type) == CCV_TENSOR_GPU_MEMORY)
167
1.13k
          {
168
1.13k
            if (tensor_symbol->alias_ref)
169
106
              ccv_array_add_unique_int(dup_tensors, tensor_symbol->alias_ref - 1);
170
1.13k
            ccv_array_add_unique_int(dup_tensors, node->outputs[i]);
171
1.13k
          }
172
1.13k
        }
173
696
    }
174
704
  } ccv_nnc_graph_visit_endfor
175
18
  // Now, actually create these tensors.
176
18
  if (!graph->data_parallel.tensor_symbol_idx)
177
18
    graph->data_parallel.tensor_symbol_idx = (int*)ccmalloc(sizeof(int) * (parallel_count - 1) * tensor_symbol_size);
178
0
  else if (graph->data_parallel.tensor_symbol_size * (graph->data_parallel.count - 1) != tensor_symbol_size * (parallel_count - 1))
179
0
    // This may shrink too, but that is OK.
180
0
    graph->data_parallel.tensor_symbol_idx = (int*)ccrealloc(graph->data_parallel.tensor_symbol_idx, sizeof(int) * (parallel_count - 1) * tensor_symbol_size);
181
18
  graph->data_parallel.tensor_symbol_size = tensor_symbol_size;
182
18
  graph->data_parallel.count = parallel_count;
183
18
  int* const dup_tensor_idx = graph->data_parallel.tensor_symbol_idx;
184
18
  // dup_tensor_idx is the array starts with 0 here.
185
5.49k
  for (i = 0; i < (parallel_count - 1) * tensor_symbol_size; 
i++5.48k
)
186
5.48k
    dup_tensor_idx[i] = -1;
187
18
  // Make the duplicated tensors (on different devices).
188
1.73k
  for (i = 0; i < dup_tensors->rnum; 
i++1.71k
)
189
1.71k
  {
190
1.71k
    const int d = *(int*)ccv_array_get(dup_tensors, i);
191
1.71k
    ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, d);
192
1.71k
    ccv_nnc_tensor_param_t info = tensor_symbol->info;
193
1.71k
    const int flags = tensor_symbol->flags;
194
1.71k
    if (tensor_symbol->alias_ref)
195
200
    {
196
200
      const int alias_ref = tensor_symbol->alias_ref - 1;
197
792
      for (j = 0; j < parallel_count - 1; 
j++592
)
198
592
      {
199
592
        const int dup_d = dup_tensor_idx[alias_ref * (parallel_count - 1) + j];
200
592
        CCV_TENSOR_SET_DEVICE_ID(info.type, j + 1); // Set the device id.
201
592
        assert(dup_d >= 0);
202
592
        // Get tensor symbol again, it may be invalid after added new symbol (we use it for ofs and inc).
203
592
        ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, d);
204
592
        const ccv_nnc_tensor_symbol_t new_symbol = ccv_nnc_tensor_symbol_alias_new(graph, (ccv_nnc_tensor_symbol_t){
205
592
          .d = dup_d,
206
592
          .graph = graph,
207
592
        }, tensor_symbol->ofs, tensor_symbol->inc, info, 0);
208
592
        ccv_nnc_tensor_symbol_set_flags(graph, new_symbol, flags);
209
592
        dup_tensor_idx[d * (parallel_count - 1) + j] = new_symbol.d;
210
592
      }
211
1.51k
    } else {
212
5.91k
      for (j = 0; j < parallel_count - 1; 
j++4.39k
)
213
4.39k
      {
214
4.39k
        CCV_TENSOR_SET_DEVICE_ID(info.type, j + 1); // Set the device id.
215
4.39k
        const ccv_nnc_tensor_symbol_t new_symbol = ccv_nnc_tensor_symbol_new(graph, info, 0);
216
4.39k
        ccv_nnc_tensor_symbol_set_flags(graph, new_symbol, flags);
217
4.39k
        dup_tensor_idx[d * (parallel_count - 1) + j] = new_symbol.d;
218
4.39k
      }
219
1.51k
    }
220
1.71k
  }
221
18
  ccv_array_free(dup_tensors);
222
18
  // Now, create execs.
223
18
  if (!graph->data_parallel.exec_symbol_idx)
224
18
    graph->data_parallel.exec_symbol_idx = (int*)ccmalloc(sizeof(int) * (parallel_count - 1) * graph_exec_symbol_size);
225
0
  else if (graph->data_parallel.exec_symbol_size * (graph->data_parallel.count - 1) != graph_exec_symbol_size * (parallel_count - 1))
226
0
    // This may shrink too, but that is OK.
227
0
    graph->data_parallel.exec_symbol_idx = (int*)ccrealloc(graph->data_parallel.exec_symbol_idx, sizeof(int) * (parallel_count - 1) * graph_exec_symbol_size);
228
18
  graph->data_parallel.exec_symbol_size = graph_exec_symbol_size;
229
18
  int* const dup_exec_idx = graph->data_parallel.exec_symbol_idx;
230
18
  // dup_exec_idx is the array starts with 0 here.
231
2.02k
  for (i = 0; i < (parallel_count - 1) * graph_exec_symbol_size; 
i++2.00k
)
232
2.00k
    dup_exec_idx[i] = -1;
233
18
  int max_io_size = 1 + parallel_count;
234
18
  // Now make the duplicated execs nodes (on different devices).
235
714
  for (i = 0; i < dup_execs->rnum; 
i++696
)
236
696
  {
237
696
    const int d = *(int*)ccv_array_get(dup_execs, i);
238
696
    ccv_nnc_graph_exec_symbol_info_t* const exec_symbol = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, d);
239
696
    max_io_size = ccv_max(max_io_size, exec_symbol->input_size + exec_symbol->output_size);
240
696
  }
241
18
  max_io_size = ccv_max(max_io_size, parallel_count * 2); // tensors from all parallel_count, the output is to all parallel_count (thus, allreduce).
242
18
  ccv_nnc_tensor_symbol_t max_io[max_io_size];
243
714
  for (i = 0; i < dup_execs->rnum; 
i++696
)
244
696
  {
245
696
    const int d = *(int*)ccv_array_get(dup_execs, i);
246
2.69k
    for (j = 0; j < parallel_count - 1; 
j++2.00k
)
247
2.00k
    {
248
2.00k
      ccv_nnc_graph_exec_symbol_info_t* const exec_symbol = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, d);
249
2.00k
      const ccv_nnc_cmd_t cmd = exec_symbol->cmd;
250
2.00k
      const ccv_nnc_hint_t hint = exec_symbol->hint;
251
2.00k
      const int input_size = exec_symbol->input_size;
252
2.00k
      const int output_size = exec_symbol->output_size;
253
2.00k
      ccv_nnc_tensor_symbol_t* const inputs = max_io;
254
8.48k
      for (k = 0; k < input_size; 
k++6.48k
)
255
6.48k
      {
256
6.48k
        const int idx = exec_symbol->inputs[k];
257
6.48k
        if (idx >= 0)
258
4.73k
          inputs[k].d = dup_tensor_idx[idx * (parallel_count - 1) + j] >= 0 ? dup_tensor_idx[idx * (parallel_count - 1) + j] : 
idx0
;
259
1.74k
        else
260
1.74k
          inputs[k].d = idx;
261
6.48k
        inputs[k].graph = idx != CCV_NNC_NO_TENSOR_SYMBOL ? 
graph4.73k
:
01.74k
;
262
6.48k
      }
263
2.00k
      ccv_nnc_tensor_symbol_t* const outputs = max_io + input_size;
264
5.42k
      for (k = 0; k < output_size; 
k++3.42k
)
265
3.42k
      {
266
3.42k
        const int idx = exec_symbol->outputs[k];
267
3.42k
        if (idx >= 0)
268
3.29k
          outputs[k].d = dup_tensor_idx[idx * (parallel_count - 1) + j] >= 0 ? dup_tensor_idx[idx * (parallel_count - 1) + j] : 
idx0
;
269
132
        else
270
132
          outputs[k].d = idx;
271
3.42k
        outputs[k].graph = idx != CCV_NNC_NO_TENSOR_SYMBOL ? 
graph3.29k
:
0132
;
272
3.42k
      }
273
2.00k
      const ccv_nnc_graph_exec_symbol_t new_symbol = ccv_nnc_graph_exec_symbol_new(graph, cmd, inputs, input_size, outputs, output_size, 0);
274
2.00k
      ccv_nnc_graph_exec_symbol_set_hint(graph, new_symbol, hint);
275
2.00k
      dup_exec_idx[d * (parallel_count - 1) + j] = new_symbol.d;
276
2.00k
    }
277
696
  }
278
18
  // Create new tensors for broadcast / reduce.
279
18
  int* const broadcast_reduce_tensor_idx = (int*)cccalloc(tensor_symbol_size, sizeof(int));
280
38
  for (i = 0; i < broadcast_size + reducer_size; 
i++20
)
281
20
  {
282
20
    const int idx = i >= broadcast_size ? 
reducers[i - broadcast_size].d8
:
broadcasts[i].d12
;
283
20
    ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, idx);
284
20
    ccv_nnc_tensor_param_t info = tensor_symbol->info;
285
20
    const int flags = tensor_symbol->flags;
286
20
    // No alias handling.
287
20
    assert(!tensor_symbol->alias_ref);
288
20
    const ccv_nnc_tensor_symbol_t new_symbol = ccv_nnc_tensor_symbol_new(graph, info, 0);
289
20
    ccv_nnc_tensor_symbol_set_flags(graph, new_symbol, flags);
290
20
    broadcast_reduce_tensor_idx[idx] = new_symbol.d + 1;
291
20
  }
292
18
  int* const broadcast_exec_idx = (int*)cccalloc(tensor_symbol_size, sizeof(int));
293
18
  int* const reduce_exec_idx = (int*)cccalloc(tensor_symbol_size, sizeof(int));
294
18
  // Create node for broadcast (thus, transfer data to different parallel_count) and reducer (transfer data back to a device, and sum).
295
26
  for (i = 0; i < broadcast_reduce_execs->rnum; 
i++8
)
296
8
  {
297
8
    const int d = *(int*)ccv_array_get(broadcast_reduce_execs, i);
298
8
    // For broadcast, we create data transfers as our dup node, and create connections to these data transfers.
299
8
    if (exec_flags[d] == CCV_NNC_PARALLEL_BROADCAST)
300
0
    {
301
0
      ccv_nnc_graph_exec_symbol_info_t* const exec_symbol = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, d);
302
0
      ccv_nnc_tensor_symbol_t* const inputs = max_io;
303
0
      ccv_nnc_tensor_symbol_t* const outputs = max_io + 1;
304
0
      const ccv_nnc_graph_exec_symbol_t source = {
305
0
        .d = d,
306
0
        .graph = graph,
307
0
      };
308
0
      for (j = 0; j < exec_symbol->output_size; j++)
309
0
      {
310
0
        const int idx = exec_symbol->outputs[j];
311
0
        if (idx >= 0 && tensor_flags[idx] == CCV_NNC_PARALLEL_BROADCAST)
312
0
        {
313
0
          inputs[0] = (ccv_nnc_tensor_symbol_t){
314
0
            .d = idx,
315
0
            .graph = graph,
316
0
          };
317
0
          // Reset the tensor flags, it is broadcasted now.
318
0
          tensor_flags[idx] = 0;
319
0
          outputs[0] = (ccv_nnc_tensor_symbol_t){
320
0
            .d = broadcast_reduce_tensor_idx[idx] - 1,
321
0
            .graph = graph,
322
0
          };
323
0
          assert(broadcast_reduce_tensor_idx[idx] > 0);
324
0
          for (k = 0; k < parallel_count - 1; k++)
325
0
            outputs[k + 1] = (ccv_nnc_tensor_symbol_t){
326
0
              .d = dup_tensor_idx[idx * (parallel_count - 1) + k],
327
0
              .graph = graph,
328
0
            };
329
0
          const ccv_nnc_graph_exec_symbol_t bcast = ccv_nnc_graph_exec_symbol_new(graph, CMD_COMM_BROADCAST_FORWARD(), inputs, 1, outputs, parallel_count, 0);
330
0
          ccv_nnc_graph_exec_symbol_concat(graph, source, bcast);
331
0
          assert(!broadcast_exec_idx[idx]);
332
0
          broadcast_exec_idx[idx] = bcast.d + 1;
333
0
        }
334
0
      }
335
8
    } else if (exec_flags[d] == CCV_NNC_PARALLEL_REDUCER) {
336
8
      // Gather is a bit more sophisticated, we need to use the new tensor to hold the summed value.
337
8
      // This is what we have right now, I will use NCCL later.
338
8
      ccv_nnc_tensor_symbol_t* const inputs = max_io;
339
8
      ccv_nnc_tensor_symbol_t* const outputs = max_io + parallel_count;
340
8
      ccv_nnc_graph_exec_symbol_info_t* exec_symbol = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, d);
341
32
      for (j = 0; j < exec_symbol->input_size; 
j++24
)
342
24
      {
343
24
        const int idx = exec_symbol->inputs[j];
344
24
        if (idx >= 0 && tensor_flags[idx] == CCV_NNC_PARALLEL_REDUCER && 
!reduce_exec_idx[idx]8
)
345
8
        {
346
8
          inputs[0] = (ccv_nnc_tensor_symbol_t){
347
8
            .d = idx,
348
8
            .graph = graph,
349
8
          };
350
16
          for (k = 0; k < parallel_count - 1; 
k++8
)
351
8
            inputs[k + 1] = (ccv_nnc_tensor_symbol_t){
352
8
              .d = dup_tensor_idx[idx * (parallel_count - 1) + k],
353
8
              .graph = graph,
354
8
            };
355
8
          outputs[0] = (ccv_nnc_tensor_symbol_t){
356
8
            .d = broadcast_reduce_tensor_idx[idx] - 1,
357
8
            .graph = graph,
358
8
          };
359
8
          // Create new symbol for all other tensors to facilitate copy (this is not useful for NCCL, but useful for REF implementation).
360
8
          ccv_nnc_tensor_symbol_info_t* const tensor_symbol = (ccv_nnc_tensor_symbol_info_t*)ccv_array_get(graph->tensor_symbol_info, idx);
361
8
          ccv_nnc_tensor_param_t info = tensor_symbol->info;
362
8
          const int flags = tensor_symbol->flags;
363
8
          // No alias handling.
364
8
          assert(!tensor_symbol->alias_ref);
365
16
          
for (k = 1; 8
k < parallel_count;
k++8
)
366
8
          {
367
8
            const ccv_nnc_tensor_symbol_t new_symbol = ccv_nnc_tensor_symbol_new(graph, info, 0);
368
8
            ccv_nnc_tensor_symbol_set_flags(graph, new_symbol, flags);
369
8
            outputs[k] = new_symbol;
370
8
          }
371
8
          const ccv_nnc_graph_exec_symbol_t reduce = ccv_nnc_graph_exec_symbol_new(graph, CMD_COMM_REDUCE_FORWARD(), inputs, parallel_count, outputs, parallel_count, 0);
372
8
          // Refresh the pointer to keep it up to date.
373
8
          exec_symbol = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, d);
374
8
          ccv_nnc_graph_exec_symbol_concat(graph, reduce, (ccv_nnc_graph_exec_symbol_t){
375
8
            .d = d,
376
8
            .graph = graph,
377
8
          });
378
8
          reduce_exec_idx[idx] = reduce.d + 1;
379
8
        }
380
24
      }
381
8
      // Update the inputs pointing to the summed value.
382
32
      
for (j = 0; 8
j < exec_symbol->input_size;
j++24
)
383
24
      {
384
24
        const int idx = exec_symbol->inputs[j];
385
24
        if (idx >= 0 && tensor_flags[idx] == CCV_NNC_PARALLEL_REDUCER)
386
8
          exec_symbol->inputs[j] = broadcast_reduce_tensor_idx[idx] - 1;
387
24
      }
388
8
    }
389
8
  }
390
18
  ccv_array_free(broadcast_reduce_execs);
391
18
  // If this tensor is not broadcasted yet, that means there is no exec to generate this tensor. We just generate headless copy.
392
714
  for (i = 0; i < dup_execs->rnum; 
i++696
)
393
696
  {
394
696
    const int idx = *(int*)ccv_array_get(dup_execs, i);
395
696
    ccv_nnc_graph_exec_symbol_info_t* const node = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, idx);
396
696
    if (exec_flags[idx] == CCV_NNC_PARALLEL_REDUCER)
397
0
      continue;
398
696
    // We try to make copy command as compact as possible by having one copy for multiple tensors if they used together.
399
696
    ccv_nnc_tensor_symbol_t* const inputs = max_io;
400
696
    ccv_nnc_tensor_symbol_t* const outputs = max_io + 1;
401
2.92k
    for (j = 0; j < node->input_size; 
j++2.23k
)
402
2.23k
    {
403
2.23k
      const int idx = node->inputs[j];
404
2.23k
      // Now, figure out whether we need to create copy command.
405
2.23k
      if (idx >= 0 && 
idx < tensor_symbol_size1.62k
&&
tensor_flags[idx] == CCV_NNC_PARALLEL_BROADCAST1.62k
)
406
12
      {
407
12
        inputs[0] = (ccv_nnc_tensor_symbol_t){
408
12
          .d = idx,
409
12
          .graph = graph,
410
12
        };
411
12
        // Reset the tensor flags, it is broadcasted now.
412
12
        tensor_flags[idx] = 0;
413
12
        outputs[0] = (ccv_nnc_tensor_symbol_t){
414
12
          .d = broadcast_reduce_tensor_idx[idx] - 1,
415
12
          .graph = graph,
416
12
        };
417
12
        assert(broadcast_reduce_tensor_idx[idx] > 0);
418
24
        
for (k = 0; 12
k < parallel_count - 1;
k++12
)
419
12
          outputs[k + 1] = (ccv_nnc_tensor_symbol_t){
420
12
            .d = dup_tensor_idx[idx * (parallel_count - 1) + k],
421
12
            .graph = graph,
422
12
          };
423
12
        const ccv_nnc_graph_exec_symbol_t bcast = ccv_nnc_graph_exec_symbol_new(graph, CMD_COMM_BROADCAST_FORWARD(), inputs, 1, outputs, parallel_count, 0);
424
12
        broadcast_exec_idx[idx] = bcast.d + 1;
425
12
      }
426
2.23k
    }
427
696
  }
428
18
  // Write reducer_outs last, because it may be the same pointer as reducers.
429
18
  if (reducer_outs)
430
0
    for (i = 0; i < reducer_size; i++)
431
0
    {
432
0
      reducer_outs[i].d = broadcast_reduce_tensor_idx[i + broadcast_size] - 1;
433
0
      reducer_outs[i].graph = graph;
434
0
    }
435
18
  ccfree(broadcast_reduce_tensor_idx);
436
18
  ccv_array_free(dup_execs);
437
18
  // Now everything is dup'ed, connect them all.
438
704
  ccv_nnc_graph_visit_for(visit, (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, 0), node, idx) {
439
2.96k
    for (i = 0; i < node->input_size; 
i++2.25k
)
440
2.25k
    {
441
2.25k
      const int input = node->inputs[i];
442
2.25k
      // If it is broadcast worthy.
443
2.25k
      if (input >= 0 && 
input < tensor_symbol_size1.65k
&&
broadcast_exec_idx[input]1.64k
)
444
27
        ccv_nnc_graph_exec_symbol_concat(graph, (ccv_nnc_graph_exec_symbol_t){
445
27
          .d = broadcast_exec_idx[input] - 1,
446
27
          .graph = graph,
447
27
        }, (ccv_nnc_graph_exec_symbol_t){
448
27
          .d = idx,
449
27
          .graph = graph,
450
27
        });
451
2.25k
    }
452
704
    // Check whether this node has outgoing to the reducer node, if so, replace that to the sum node.
453
704
    if (node->outgoings && 
node->outgoings->rnum527
)
454
1.62k
      
for (i = 0; 527
i < node->outgoings->rnum;
i++1.09k
)
455
1.09k
      {
456
1.09k
        const int outgoing_idx = *(int*)ccv_array_get(node->outgoings, i);
457
1.09k
        if (outgoing_idx >= graph_exec_symbol_size)
458
0
          continue;
459
1.09k
        if (exec_flags[outgoing_idx] == CCV_NNC_PARALLEL_REDUCER)
460
40
          
for (j = 0; 12
j < node->output_size;
j++28
)
461
28
          {
462
28
            const int output_idx = node->outputs[j];
463
28
            if (output_idx >= 0 && 
tensor_flags[output_idx] == CCV_NNC_PARALLEL_REDUCER24
)
464
16
            {
465
16
              assert(reduce_exec_idx[output_idx]);
466
16
              ccv_array_replace_unique_int(node->outgoings, outgoing_idx, reduce_exec_idx[output_idx] - 1);
467
16
            }
468
28
          }
469
1.09k
      }
470
2.71k
    
for (i = 0; 704
i < parallel_count - 1;
i++2.00k
)
471
2.00k
    {
472
2.00k
      const int d = dup_exec_idx[idx * (parallel_count - 1) + i];
473
2.00k
      if (d < 0)
474
8
        continue;
475
2.00k
      const ccv_nnc_graph_exec_symbol_t source = {
476
2.00k
        .d = d,
477
2.00k
        .graph = graph,
478
2.00k
      };
479
2.00k
      // If it is broadcast worthy.
480
8.48k
      for (j = 0; j < node->input_size; 
j++6.48k
)
481
6.48k
      {
482
6.48k
        const int input = node->inputs[j];
483
6.48k
        if (input >= 0 && 
input < tensor_symbol_size4.73k
&&
broadcast_exec_idx[input]4.73k
)
484
19
          ccv_nnc_graph_exec_symbol_concat(graph, (ccv_nnc_graph_exec_symbol_t){
485
19
            .d = broadcast_exec_idx[input] - 1,
486
19
            .graph = graph,
487
19
          }, source);
488
6.48k
      }
489
2.00k
      // If it is reduce worthy.
490
5.42k
      for (j = 0; j < node->output_size; 
j++3.42k
)
491
3.42k
      {
492
3.42k
        const int output = node->outputs[j];
493
3.42k
        if (output >= 0 && 
output < tensor_symbol_size3.29k
&&
reduce_exec_idx[output]3.29k
)
494
8
          ccv_nnc_graph_exec_symbol_concat(graph, source, (ccv_nnc_graph_exec_symbol_t){
495
8
            .d = reduce_exec_idx[output] - 1,
496
8
            .graph = graph,
497
8
          });
498
3.42k
      }
499
2.00k
      if (node->outgoings && 
node->outgoings->rnum1.50k
)
500
4.66k
        
for (j = 0; 1.50k
j < node->outgoings->rnum;
j++3.15k
)
501
3.15k
        {
502
3.15k
          const int outgoing_idx = *(int*)ccv_array_get(node->outgoings, j);
503
3.15k
          if (outgoing_idx > graph_exec_symbol_size)
504
8
            continue;
505
3.15k
          const int outgoing_d = dup_exec_idx[outgoing_idx * (parallel_count - 1) + i];
506
3.15k
          if (outgoing_d < 0)
507
4
            continue;
508
3.14k
          ccv_nnc_graph_exec_symbol_concat(graph, source, (ccv_nnc_graph_exec_symbol_t){
509
3.14k
            .d = outgoing_d,
510
3.14k
            .graph = graph,
511
3.14k
          });
512
3.14k
        }
513
2.00k
    }
514
704
  } ccv_nnc_graph_visit_endfor
515
18
  ccfree(broadcast_exec_idx);
516
18
  ccfree(reduce_exec_idx);
517
18
  ccfree(tensor_flags);
518
18
  ccv_nnc_graph_visit_free(visit);
519
18
  // Allreduce is easier to do, we do that the last. It consists of two steps:
520
18
  // 1. Generate allreduce node for each symbol;
521
18
  // 2. Disconnect them from source and connect them through all reduce nodes.
522
178
  for (i = 0; i < allreducer_size; 
i++160
)
523
160
  {
524
160
    ccv_nnc_tensor_symbol_t* const outputs = max_io + parallel_count;
525
160
    outputs[0] = allreducers[i];
526
160
    // Copy over allreducers output symbols (as the old symbol).
527
632
    for (j = 0; j < parallel_count - 1; 
j++472
)
528
472
    {
529
472
      const int d = allreducers[i].d;
530
472
      outputs[j + 1].graph = graph;
531
472
      assert(dup_tensor_idx[d * (parallel_count - 1) + j] >= 0);
532
472
      outputs[j + 1].d = dup_tensor_idx[d * (parallel_count - 1) + j];
533
472
    }
534
160
    ccv_nnc_tensor_symbol_t* const inputs = max_io;
535
160
    inputs[0].graph = graph;
536
160
    inputs[0].d = allreduce_inputs[i];
537
160
    // Create identical new tensor symbols
538
632
    for (j = 0; j < parallel_count - 1; 
j++472
)
539
472
    {
540
472
      if (dup_tensor_idx[allreduce_inputs[i] * (parallel_count - 1) + j] < 0)
541
472
        dup_tensor_idx[allreduce_inputs[i] * (parallel_count - 1) + j] = ccv_nnc_tensor_symbol_new(graph, ccv_nnc_tensor_symbol_params(graph, outputs[j + 1]), 0).d;
542
472
      inputs[j + 1].graph = graph;
543
472
      inputs[j + 1].d = dup_tensor_idx[allreduce_inputs[i] * (parallel_count - 1) + j];
544
472
    }
545
160
    // Create allreduce node.
546
160
    const ccv_nnc_graph_exec_symbol_t allreduce = ccv_nnc_graph_exec_symbol_new(graph, CMD_COMM_ALLREDUCE_FORWARD(), inputs, parallel_count, outputs, parallel_count, 0);
547
160
    const int exec_idx = allreduce_producers[allreducers[i].d] - 1;
548
160
    assert(exec_idx >= 0);
549
160
    ccv_nnc_graph_exec_symbol_info_t* const node = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, exec_idx);
550
682
    for (j = 0; j < node->output_size; 
j++522
)
551
522
      if (node->outputs[j] == outputs[0].d)
552
160
        node->outputs[j] = inputs[0].d;
553
160
    ccv_nnc_graph_exec_symbol_concat(graph, (ccv_nnc_graph_exec_symbol_t){
554
160
      .graph = graph,
555
160
      .d = exec_idx,
556
160
    }, allreduce);
557
160
    // Remove connections from current node directly to its following nodes (these should follow allreduce node now).
558
768
    for (j = 0; j < node->outgoings->rnum;)
559
608
    {
560
608
      const int d = *(int*)ccv_array_get(node->outgoings, j);
561
608
      if (d == allreduce.d)
562
160
      {
563
160
        ++j;
564
160
        continue;
565
160
      }
566
448
      // Get the destination nodes, and check whether they have inputs matches our outputs.
567
448
      ccv_nnc_graph_exec_symbol_info_t* const outgoing_node = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, d);
568
448
      if (_ccv_nnc_exec_inputs_contain(outgoing_node, allreducers[i].d))
569
160
      {
570
160
        ccv_nnc_graph_exec_symbol_concat(graph, allreduce, (ccv_nnc_graph_exec_symbol_t){
571
160
          .graph = graph,
572
160
          .d = d,
573
160
        });
574
160
        // Remove the connection.
575
160
        if (j < node->outgoings->rnum - 1)
576
160
          *(int*)ccv_array_get(node->outgoings, j) = *(int*)ccv_array_get(node->outgoings, node->outgoings->rnum - 1);
577
160
        --node->outgoings->rnum;
578
160
      } else
579
288
        ++j;
580
448
    }
581
632
    for (j = 0; j < parallel_count - 1; 
j++472
)
582
472
    {
583
472
      const int new_exec_idx = dup_exec_idx[exec_idx * (parallel_count - 1) + j];
584
472
      ccv_nnc_graph_exec_symbol_info_t* const node = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, new_exec_idx);
585
2.01k
      for (k = 0; k < node->output_size; 
k++1.54k
)
586
1.54k
        if (node->outputs[k] == outputs[j + 1].d)
587
472
          node->outputs[k] = inputs[j + 1].d;
588
472
      ccv_nnc_graph_exec_symbol_concat(graph, (ccv_nnc_graph_exec_symbol_t){
589
472
        .graph = graph,
590
472
        .d = new_exec_idx,
591
472
      }, allreduce);
592
2.26k
      for (k = 0; k < node->outgoings->rnum;)
593
1.79k
      {
594
1.79k
        const int d = *(int*)ccv_array_get(node->outgoings, k);
595
1.79k
        if (d == allreduce.d)
596
472
        {
597
472
          ++k;
598
472
          continue;
599
472
        }
600
1.32k
        // Get the destination nodes, and check whether they have inputs matches our outputs.
601
1.32k
        ccv_nnc_graph_exec_symbol_info_t* const outgoing_node = (ccv_nnc_graph_exec_symbol_info_t*)ccv_array_get(graph->exec_symbol_info, d);
602
1.32k
        if (_ccv_nnc_exec_inputs_contain(outgoing_node, outputs[j + 1].d))
603
472
        {
604
472
          ccv_nnc_graph_exec_symbol_concat(graph, allreduce, (ccv_nnc_graph_exec_symbol_t){
605
472
            .graph = graph,
606
472
            .d = d,
607
472
          });
608
472
          // Remove the connection.
609
472
          if (k < node->outgoings->rnum - 1)
610
472
            *(int*)ccv_array_get(node->outgoings, k) = *(int*)ccv_array_get(node->outgoings, node->outgoings->rnum - 1);
611
472
          --node->outgoings->rnum;
612
472
        } else
613
852
          ++k;
614
1.32k
      }
615
472
    }
616
160
  }
617
18
  ccfree(allreduce_producers);
618
18
  // Write allreducer_outs last, because it may be the same pointer as allreducers.
619
18
  if (allreducer_outs)
620
165
    
for (i = 0; 9
i < allreducer_size;
i++156
)
621
156
    {
622
156
      allreducer_outs[i].d = allreduce_inputs[i];
623
156
      allreducer_outs[i].graph = graph;
624
156
    }
625
18
  ccfree(allreduce_inputs);
626
18
}
627
628
ccv_nnc_tensor_symbol_t ccv_nnc_tensor_symbol_copy(const ccv_nnc_symbolic_graph_t* const graph, const ccv_nnc_tensor_symbol_t symbol, const int device_id)
629
5.51k
{
630
5.51k
  if (!graph->data_parallel.tensor_symbol_idx)
631
0
    return NO_TENSOR_SYMBOL;
632
5.51k
  assert(graph->data_parallel.tensor_symbol_idx);
633
5.51k
  assert(symbol.d >= 0);
634
5.51k
  assert(symbol.d < graph->data_parallel.tensor_symbol_size);
635
5.51k
  assert(symbol.graph == graph);
636
5.51k
  if (device_id == 0)
637
0
    return symbol;
638
5.51k
  const int parallel_count = graph->data_parallel.count;
639
5.51k
  if (graph->data_parallel.tensor_symbol_idx[symbol.d * (parallel_count - 1) + device_id - 1] < 0)
640
0
    return NO_TENSOR_SYMBOL;
641
5.51k
  ccv_nnc_tensor_symbol_t tensor = {
642
5.51k
    .d = graph->data_parallel.tensor_symbol_idx[symbol.d * (parallel_count - 1) + device_id - 1],
643
5.51k
    .graph = graph,
644
5.51k
  };
645
5.51k
  return tensor;
646
5.51k
}
647
648
void ccv_nnc_tensor_symbol_set_copy(ccv_nnc_symbolic_graph_t* const graph, const ccv_nnc_tensor_symbol_t symbol, const int device_id, const ccv_nnc_tensor_symbol_t copy)
649
672
{
650
672
  assert(graph->data_parallel.tensor_symbol_idx);
651
672
  assert(symbol.d >= 0);
652
672
  assert(symbol.d < graph->tensor_symbol_info->rnum);
653
672
  assert(symbol.graph == graph);
654
672
  const int parallel_count = graph->data_parallel.count;
655
672
  if (copy.d == CCV_NNC_NO_TENSOR_SYMBOL)
656
0
  {
657
0
    assert(symbol.d < graph->data_parallel.tensor_symbol_size);
658
0
    graph->data_parallel.tensor_symbol_idx[symbol.d * (parallel_count - 1) + device_id - 1] = -1;
659
0
    return;
660
672
  }
661
672
  assert(copy.d >= 0);
662
672
  assert(copy.d < graph->tensor_symbol_info->rnum);
663
672
  assert(copy.graph == graph);
664
672
  assert(parallel_count > 1);
665
672
  if (symbol.d >= graph->data_parallel.tensor_symbol_size)
666
224
  {
667
224
    graph->data_parallel.tensor_symbol_idx = ccrealloc(graph->data_parallel.tensor_symbol_idx, sizeof(int) * (parallel_count - 1) * (symbol.d + 1));
668
224
    int i;
669
8.40k
    for (i = graph->data_parallel.tensor_symbol_size * (parallel_count - 1); i < (symbol.d + 1) * (parallel_count - 1); 
i++8.17k
)
670
8.17k
      graph->data_parallel.tensor_symbol_idx[i] = -1;
671
224
    graph->data_parallel.tensor_symbol_size = symbol.d + 1;
672
224
  }
673
672
  graph->data_parallel.tensor_symbol_idx[symbol.d * (parallel_count - 1) + device_id - 1] = copy.d;
674
672
}
675
676
ccv_nnc_graph_exec_symbol_t ccv_nnc_graph_exec_symbol_copy(const ccv_nnc_symbolic_graph_t* const graph, const ccv_nnc_graph_exec_symbol_t symbol, const int device_id)
677
30.9k
{
678
30.9k
  if (!graph->data_parallel.exec_symbol_idx)
679
102
    return NO_GRAPH_EXEC_SYMBOL;
680
30.8k
  assert(graph->data_parallel.exec_symbol_idx);
681
30.8k
  assert(symbol.d >= 0);
682
30.8k
  assert(symbol.d < graph->data_parallel.exec_symbol_size);
683
30.8k
  assert(symbol.graph == graph);
684
30.8k
  if (device_id == 0)
685
0
    return symbol;
686
30.8k
  const int parallel_count = graph->data_parallel.count;
687
30.8k
  if (graph->data_parallel.exec_symbol_idx[symbol.d * (parallel_count - 1) + device_id - 1] < 0)
688
0
    return NO_GRAPH_EXEC_SYMBOL;
689
30.8k
  ccv_nnc_graph_exec_symbol_t graph_exec = {
690
30.8k
    .d = graph->data_parallel.exec_symbol_idx[symbol.d * (parallel_count - 1) + device_id - 1],
691
30.8k
    .graph = graph,
692
30.8k
  };
693
30.8k
  return graph_exec;
694
30.8k
}
695
696
void ccv_nnc_graph_exec_symbol_set_copy(ccv_nnc_symbolic_graph_t* const graph, const ccv_nnc_graph_exec_symbol_t symbol, const int device_id, const ccv_nnc_graph_exec_symbol_t copy)
697
0
{
698
0
  assert(graph->data_parallel.exec_symbol_idx);
699
0
  assert(symbol.d >= 0);
700
0
  assert(symbol.d < graph->exec_symbol_info->rnum);
701
0
  assert(symbol.graph == graph);
702
0
  const int parallel_count = graph->data_parallel.count;
703
0
  if (copy.d == CCV_NNC_NO_GRAPH_EXEC_SYMBOL)
704
0
  {
705
0
    assert(symbol.d < graph->data_parallel.exec_symbol_size);
706
0
    graph->data_parallel.exec_symbol_idx[symbol.d * (parallel_count - 1) + device_id - 1] = -1;
707
0
    return;
708
0
  }
709
0
  assert(copy.d >= 0);
710
0
  assert(copy.d < graph->exec_symbol_info->rnum);
711
0
  assert(copy.graph == graph);
712
0
  assert(parallel_count > 1);
713
0
  if (symbol.d >= graph->data_parallel.exec_symbol_size)
714
0
  {
715
0
    graph->data_parallel.exec_symbol_idx = ccrealloc(graph->data_parallel.exec_symbol_idx, sizeof(int) * (parallel_count - 1) * (symbol.d + 1));
716
0
    int i;
717
0
    for (i = graph->data_parallel.exec_symbol_size * (parallel_count - 1); i < (symbol.d + 1) * (parallel_count - 1); i++)
718
0
      graph->data_parallel.exec_symbol_idx[i] = -1;
719
0
    graph->data_parallel.exec_symbol_size = symbol.d + 1;
720
0
  }
721
0
  graph->data_parallel.exec_symbol_idx[symbol.d * (parallel_count - 1) + device_id - 1] = copy.d;
722
0
}