Skip to content

Commit 944dc79

Browse files
committed
coll/acoll: Remove use of cid as array index
The use of cid as array index is removed. Instead, now coll_acoll_subcomms_t is dynamically allocated for each new communicator. Signed-off-by: Nithya V S <[email protected]>
1 parent adee1a4 commit 944dc79

9 files changed

+179
-135
lines changed

ompi/mca/coll/acoll/coll_acoll.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ BEGIN_C_DECLS
3333
/* Globally exported variables */
3434
OMPI_DECLSPEC extern const mca_coll_base_component_3_0_0_t mca_coll_acoll_component;
3535
extern int mca_coll_acoll_priority;
36+
extern int mca_coll_acoll_max_comms;
3637
extern int mca_coll_acoll_sg_size;
3738
extern int mca_coll_acoll_sg_scale;
3839
extern int mca_coll_acoll_node_size;
@@ -75,7 +76,6 @@ int mca_coll_acoll_barrier_intra(struct ompi_communicator_t *comm, mca_coll_base
7576

7677
END_C_DECLS
7778

78-
#define MCA_COLL_ACOLL_MAX_CID 100
7979
#define MCA_COLL_ACOLL_ROOT_CHANGE_THRESH 10
8080

8181
typedef enum MCA_COLL_ACOLL_SG_SIZES {
@@ -208,8 +208,10 @@ struct mca_coll_acoll_module_t {
208208
int mnode_log2_sg_size;
209209
int allg_lin;
210210
int allg_ring;
211-
coll_acoll_subcomms_t subc[MCA_COLL_ACOLL_MAX_CID];
211+
int max_comms;
212+
coll_acoll_subcomms_t **subc;
212213
coll_acoll_reserve_mem_t reserve_mem_s;
214+
int num_subc;
213215
};
214216

215217
#ifdef HAVE_XPMEM_H

ompi/mca/coll/acoll/coll_acoll_allgather.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -481,21 +481,21 @@ int mca_coll_acoll_allgather(const void *sbuf, size_t scount, struct ompi_dataty
481481
int brank, last_brank;
482482
int use_rd_base;
483483
mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module;
484-
coll_acoll_subcomms_t *subc;
485-
int cid = ompi_comm_get_local_cid(comm);
484+
coll_acoll_subcomms_t *subc = NULL;
486485
char *local_rbuf;
487486
ompi_communicator_t *intra_comm;
488487

489-
/* Fallback to ring if cid is beyond supported limit */
490-
if (cid >= MCA_COLL_ACOLL_MAX_CID) {
488+
/* Obtain the subcomms structure */
489+
err = check_and_create_subc(comm, acoll_module, &subc);
490+
/* Fallback to ring if subc is not obtained */
491+
if (subc == NULL) {
491492
return ompi_coll_base_allgather_intra_ring(sbuf, scount, sdtype, rbuf, rcount, rdtype, comm,
492493
module);
493494
}
494495

495-
subc = &acoll_module->subc[cid];
496496
size = ompi_comm_size(comm);
497497
if (!subc->initialized && size > 2) {
498-
err = mca_coll_acoll_comm_split_init(comm, acoll_module, 0);
498+
err = mca_coll_acoll_comm_split_init(comm, acoll_module, subc, 0);
499499
if (MPI_SUCCESS != err) {
500500
return err;
501501
}

ompi/mca/coll/acoll/coll_acoll_allreduce.c

Lines changed: 25 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ void mca_coll_acoll_sync(coll_acoll_data_t *data, int offset, int *group, int gp
2828
int mca_coll_acoll_allreduce_small_msgs_h(const void *sbuf, void *rbuf, size_t count,
2929
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
3030
struct ompi_communicator_t *comm,
31-
mca_coll_base_module_t *module, int intra);
31+
mca_coll_base_module_t *module,
32+
coll_acoll_subcomms_t *subc, int intra);
3233

3334

3435
static inline int coll_allreduce_decision_fixed(int comm_size, size_t msg_size)
@@ -52,16 +53,13 @@ static inline int coll_allreduce_decision_fixed(int comm_size, size_t msg_size)
5253
static inline int mca_coll_acoll_reduce_xpmem_h(const void *sbuf, void *rbuf, size_t count,
5354
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
5455
struct ompi_communicator_t *comm,
55-
mca_coll_base_module_t *module)
56+
mca_coll_base_module_t *module,
57+
coll_acoll_subcomms_t *subc)
5658
{
5759
int size;
5860
size_t total_dsize, dsize;
59-
mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module;
6061

61-
coll_acoll_subcomms_t *subc;
62-
int cid = ompi_comm_get_local_cid(comm);
63-
subc = &acoll_module->subc[cid];
64-
coll_acoll_init(module, comm, subc->data);
62+
coll_acoll_init(module, comm, subc->data, subc);
6563
coll_acoll_data_t *data = subc->data;
6664
if (NULL == data) {
6765
return -1;
@@ -188,16 +186,13 @@ static inline int mca_coll_acoll_allreduce_xpmem_f(const void *sbuf, void *rbuf,
188186
struct ompi_datatype_t *dtype,
189187
struct ompi_op_t *op,
190188
struct ompi_communicator_t *comm,
191-
mca_coll_base_module_t *module)
189+
mca_coll_base_module_t *module,
190+
coll_acoll_subcomms_t *subc)
192191
{
193192
int size;
194193
size_t total_dsize, dsize;
195-
mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module;
196194

197-
coll_acoll_subcomms_t *subc;
198-
int cid = ompi_comm_get_local_cid(comm);
199-
subc = &acoll_module->subc[cid];
200-
coll_acoll_init(module, comm, subc->data);
195+
coll_acoll_init(module, comm, subc->data, subc);
201196
coll_acoll_data_t *data = subc->data;
202197
if (NULL == data) {
203198
return -1;
@@ -361,15 +356,13 @@ void mca_coll_acoll_sync(coll_acoll_data_t *data, int offset, int *group, int gp
361356
int mca_coll_acoll_allreduce_small_msgs_h(const void *sbuf, void *rbuf, size_t count,
362357
struct ompi_datatype_t *dtype, struct ompi_op_t *op,
363358
struct ompi_communicator_t *comm,
364-
mca_coll_base_module_t *module, int intra)
359+
mca_coll_base_module_t *module,
360+
coll_acoll_subcomms_t *subc, int intra)
365361
{
366362
size_t dsize;
367363
int err = MPI_SUCCESS;
368-
mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module;
369-
coll_acoll_subcomms_t *subc;
370-
int cid = ompi_comm_get_local_cid(comm);
371-
subc = &acoll_module->subc[cid];
372-
coll_acoll_init(module, comm, subc->data);
364+
365+
coll_acoll_init(module, comm, subc->data, subc);
373366
coll_acoll_data_t *data = subc->data;
374367
if (NULL == data) {
375368
return -1;
@@ -385,7 +378,6 @@ int mca_coll_acoll_allreduce_small_msgs_h(const void *sbuf, void *rbuf, size_t c
385378

386379
int l1_local_rank = data->l1_local_rank;
387380
int l2_local_rank = data->l2_local_rank;
388-
int comm_id = ompi_comm_get_local_cid(comm);
389381

390382
int offset1 = data->offset[0];
391383
int offset2 = data->offset[1];
@@ -441,8 +433,8 @@ int mca_coll_acoll_allreduce_small_msgs_h(const void *sbuf, void *rbuf, size_t c
441433
}
442434
}
443435

444-
if (intra && (ompi_comm_size(acoll_module->subc[comm_id].numa_comm) > 1)) {
445-
err = mca_coll_acoll_bcast(rbuf, count, dtype, 0, acoll_module->subc[comm_id].numa_comm, module);
436+
if (intra && (ompi_comm_size(subc->numa_comm) > 1)) {
437+
err = mca_coll_acoll_bcast(rbuf, count, dtype, 0, subc->numa_comm, module);
446438
}
447439
return err;
448440
}
@@ -466,25 +458,23 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
466458
return MPI_SUCCESS;
467459
}
468460

469-
coll_acoll_subcomms_t *subc;
470-
int cid = ompi_comm_get_local_cid(comm);
471-
subc = &acoll_module->subc[cid];
472-
473461
/* Falling back to recursivedoubling for non-commutative operators to be safe */
474462
if (!ompi_op_is_commute(op)) {
475463
return ompi_coll_base_allreduce_intra_recursivedoubling(sbuf, rbuf, count, dtype, op, comm,
476464
module);
477465
}
478466

479-
/* Fallback to knomial if cid is beyond supported limit */
480-
if (cid >= MCA_COLL_ACOLL_MAX_CID) {
467+
/* Obtain the subcomms structure */
468+
coll_acoll_subcomms_t *subc = NULL;
469+
err = check_and_create_subc(comm, acoll_module, &subc);
470+
471+
/* Fallback to knomial if subc is not obtained */
472+
if (subc == NULL) {
481473
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op, comm,
482474
module);
483475
}
484-
485-
subc = &acoll_module->subc[cid];
486476
if (!subc->initialized) {
487-
err = mca_coll_acoll_comm_split_init(comm, acoll_module, 0);
477+
err = mca_coll_acoll_comm_split_init(comm, acoll_module, subc, 0);
488478
if (MPI_SUCCESS != err)
489479
return err;
490480
}
@@ -499,7 +489,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
499489
comm, module);
500490
} else if (total_dsize < 512) {
501491
return mca_coll_acoll_allreduce_small_msgs_h(sbuf, rbuf, count, dtype, op, comm, module,
502-
1);
492+
subc, 1);
503493
} else if (total_dsize <= 2048) {
504494
return ompi_coll_base_allreduce_intra_recursivedoubling(sbuf, rbuf, count, dtype, op,
505495
comm, module);
@@ -517,7 +507,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
517507
} else if (total_dsize < 4194304) {
518508
#ifdef HAVE_XPMEM_H
519509
if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1)) {
520-
return mca_coll_acoll_allreduce_xpmem_f(sbuf, rbuf, count, dtype, op, comm, module);
510+
return mca_coll_acoll_allreduce_xpmem_f(sbuf, rbuf, count, dtype, op, comm, module, subc);
521511
} else {
522512
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype,
523513
op, comm, module);
@@ -529,7 +519,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
529519
} else if (total_dsize <= 16777216) {
530520
#ifdef HAVE_XPMEM_H
531521
if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1)) {
532-
mca_coll_acoll_reduce_xpmem_h(sbuf, rbuf, count, dtype, op, comm, module);
522+
mca_coll_acoll_reduce_xpmem_h(sbuf, rbuf, count, dtype, op, comm, module, subc);
533523
return mca_coll_acoll_bcast(rbuf, count, dtype, 0, comm, module);
534524
} else {
535525
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype,
@@ -542,7 +532,7 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
542532
} else {
543533
#ifdef HAVE_XPMEM_H
544534
if (((subc->xpmem_use_sr_buf != 0) || (subc->xpmem_buf_size > 2 * total_dsize)) && (subc->without_xpmem != 1)) {
545-
return mca_coll_acoll_allreduce_xpmem_f(sbuf, rbuf, count, dtype, op, comm, module);
535+
return mca_coll_acoll_allreduce_xpmem_f(sbuf, rbuf, count, dtype, op, comm, module, subc);
546536
} else {
547537
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype,
548538
op, comm, module);

ompi/mca/coll/acoll/coll_acoll_barrier.c

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -130,21 +130,22 @@ int mca_coll_acoll_barrier_intra(struct ompi_communicator_t *comm, mca_coll_base
130130
ompi_request_t **reqs;
131131
int num_nodes;
132132
mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module;
133-
coll_acoll_subcomms_t *subc;
134-
int cid = ompi_comm_get_local_cid(comm);
133+
coll_acoll_subcomms_t *subc = NULL;
135134

136-
/* Fallback to linear if cid is beyond supported limit */
137-
if (cid >= MCA_COLL_ACOLL_MAX_CID) {
135+
/* Obtain the subcomms structure */
136+
err = check_and_create_subc(comm, acoll_module, &subc);
137+
138+
/* Fallback to linear if subcomms structure is not obtained */
139+
if (subc == NULL) {
138140
return ompi_coll_base_barrier_intra_basic_linear(comm, module);
139141
}
140142

141-
subc = &acoll_module->subc[cid];
142143
size = ompi_comm_size(comm);
143144
if (size == 1) {
144145
return err;
145146
}
146147
if (!subc->initialized && size > 1) {
147-
err = mca_coll_acoll_comm_split_init(comm, acoll_module, 0);
148+
err = mca_coll_acoll_comm_split_init(comm, acoll_module, subc, 0);
148149
if (MPI_SUCCESS != err) {
149150
return err;
150151
}

ompi/mca/coll/acoll/coll_acoll_bcast.c

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -444,24 +444,25 @@ int mca_coll_acoll_bcast(void *buff, size_t count, struct ompi_datatype_t *datat
444444
size_t total_dsize, dsize;
445445
mca_coll_acoll_module_t *acoll_module = (mca_coll_acoll_module_t *) module;
446446
bcast_subc_func bcast_func[2] = {&bcast_binomial, &bcast_flat_tree};
447-
coll_acoll_subcomms_t *subc;
447+
coll_acoll_subcomms_t *subc = NULL;
448448
struct ompi_communicator_t *subcomms[MCA_COLL_ACOLL_NUM_SC] = {NULL};
449449
int subc_roots[MCA_COLL_ACOLL_NUM_SC] = {-1};
450-
int cid = ompi_comm_get_local_cid(comm);
451450

452-
/* Fallback to knomial if cid is beyond supported limit */
453-
if (cid >= MCA_COLL_ACOLL_MAX_CID) {
451+
/* Obtain the subcomms structure */
452+
err = check_and_create_subc(comm, acoll_module, &subc);
453+
/* Fallback to knomial if subcomms is not obtained */
454+
if (subc == NULL) {
454455
return ompi_coll_base_bcast_intra_knomial(buff, count, datatype, root, comm, module, 0, 4);
455456
}
456457

457-
subc = &acoll_module->subc[cid];
458458
/* Fallback to knomial if no. of root changes is beyond a threshold */
459-
if (subc->num_root_change > MCA_COLL_ACOLL_ROOT_CHANGE_THRESH) {
459+
if ((subc->num_root_change > MCA_COLL_ACOLL_ROOT_CHANGE_THRESH)
460+
&& (root != subc->prev_init_root)) {
460461
return ompi_coll_base_bcast_intra_knomial(buff, count, datatype, root, comm, module, 0, 4);
461462
}
462463
size = ompi_comm_size(comm);
463464
if ((!subc->initialized || (root != subc->prev_init_root)) && size > 2) {
464-
err = mca_coll_acoll_comm_split_init(comm, acoll_module, root);
465+
err = mca_coll_acoll_comm_split_init(comm, acoll_module, subc, root);
465466
if (MPI_SUCCESS != err) {
466467
return err;
467468
}

ompi/mca/coll/acoll/coll_acoll_component.c

Lines changed: 17 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ const char *mca_coll_acoll_component_version_string
2525
* Global variables
2626
*/
2727
int mca_coll_acoll_priority = 0;
28+
int mca_coll_acoll_max_comms = 10;
2829
int mca_coll_acoll_sg_size = 8;
2930
int mca_coll_acoll_sg_scale = 1;
3031
int mca_coll_acoll_node_size = 128;
@@ -91,6 +92,11 @@ static int acoll_register(void)
9192
MCA_BASE_VAR_SCOPE_READONLY, &mca_coll_acoll_priority);
9293

9394
/* Defaults on topology */
95+
(void)
96+
mca_base_component_var_register(&mca_coll_acoll_component.collm_version, "max_comms",
97+
"Maximum no. of communicators using subgroup based algorithms",
98+
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_9,
99+
MCA_BASE_VAR_SCOPE_READONLY, &mca_coll_acoll_max_comms);
94100
(void)
95101
mca_base_component_var_register(&mca_coll_acoll_component.collm_version, "sg_size",
96102
"Size of subgroup to be used for subgroup based algorithms",
@@ -186,47 +192,10 @@ static int acoll_register(void)
186192
*/
187193
static void mca_coll_acoll_module_construct(mca_coll_acoll_module_t *module)
188194
{
189-
for (int i = 0; i < MCA_COLL_ACOLL_MAX_CID; i++) {
190-
coll_acoll_subcomms_t *subc = &module->subc[i];
191-
subc->initialized = 0;
192-
subc->is_root_node = 0;
193-
subc->is_root_sg = 0;
194-
subc->is_root_numa = 0;
195-
subc->outer_grp_root = -1;
196-
subc->subgrp_root = 0;
197-
subc->num_nodes = 1;
198-
subc->prev_init_root = -1;
199-
subc->num_root_change = 0;
200-
subc->numa_root = 0;
201-
subc->socket_ldr_root = -1;
202-
subc->local_comm = NULL;
203-
subc->local_r_comm = NULL;
204-
subc->leader_comm = NULL;
205-
subc->subgrp_comm = NULL;
206-
subc->socket_comm = NULL;
207-
subc->socket_ldr_comm = NULL;
208-
for (int j = 0; j < MCA_COLL_ACOLL_NUM_LAYERS; j++) {
209-
for (int k = 0; k < MCA_COLL_ACOLL_NUM_BASE_LYRS; k++) {
210-
subc->base_comm[k][j] = NULL;
211-
subc->base_root[k][j] = -1;
212-
}
213-
subc->local_root[j] = 0;
214-
}
215195

216-
subc->numa_comm = NULL;
217-
subc->numa_comm_ldrs = NULL;
218-
subc->node_comm = NULL;
219-
subc->inter_comm = NULL;
220-
subc->cid = -1;
221-
subc->initialized_data = false;
222-
subc->initialized_shm_data = false;
223-
subc->data = NULL;
224-
#ifdef HAVE_XPMEM_H
225-
subc->xpmem_buf_size = mca_coll_acoll_xpmem_buffer_size;
226-
subc->without_xpmem = mca_coll_acoll_without_xpmem;
227-
subc->xpmem_use_sr_buf = mca_coll_acoll_xpmem_use_sr_buf;
228-
#endif
229-
}
196+
/* Set number of subcomms to 0 */
197+
module->num_subc = 0;
198+
module->subc = NULL;
230199

231200
/* Reserve memory init. Lazy allocation of memory when needed. */
232201
(module->reserve_mem_s).reserve_mem = NULL;
@@ -246,9 +215,8 @@ static void mca_coll_acoll_module_construct(mca_coll_acoll_module_t *module)
246215
*/
247216
static void mca_coll_acoll_module_destruct(mca_coll_acoll_module_t *module)
248217
{
249-
250-
for (int i = 0; i < MCA_COLL_ACOLL_MAX_CID; i++) {
251-
coll_acoll_subcomms_t *subc = &module->subc[i];
218+
for (int i = 0; i < module->num_subc; i++) {
219+
coll_acoll_subcomms_t *subc = module->subc[i];
252220
if (subc->initialized_data) {
253221
if (subc->initialized_shm_data) {
254222
if (subc->orig_comm != NULL) {
@@ -334,8 +302,14 @@ static void mca_coll_acoll_module_destruct(mca_coll_acoll_module_t *module)
334302
}
335303
}
336304
subc->initialized = 0;
305+
free(subc);
306+
module->subc[i] = NULL;
337307
}
338308

309+
module->num_subc = 0;
310+
free(module->subc);
311+
module->subc = NULL;
312+
339313
if ((true == (module->reserve_mem_s).reserve_mem_allocate)
340314
&& (NULL != (module->reserve_mem_s).reserve_mem)) {
341315
free((module->reserve_mem_s).reserve_mem);

ompi/mca/coll/acoll/coll_acoll_module.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ mca_coll_base_module_t *mca_coll_acoll_comm_query(struct ompi_communicator_t *co
7777
*priority = mca_coll_acoll_priority;
7878

7979
/* Set topology params */
80+
acoll_module->max_comms = mca_coll_acoll_max_comms;
8081
acoll_module->sg_scale = mca_coll_acoll_sg_scale;
8182
acoll_module->sg_size = mca_coll_acoll_sg_size;
8283
acoll_module->sg_cnt = mca_coll_acoll_sg_size / mca_coll_acoll_sg_scale;

0 commit comments

Comments
 (0)