Skip to content

Commit 840ee22

Browse files
committed
checkpoint
1 parent 760e62a commit 840ee22

File tree

2 files changed

+98
-71
lines changed

2 files changed

+98
-71
lines changed

ompi/communicator/comm_cid.c

Lines changed: 80 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -256,13 +256,13 @@ static ompi_comm_allreduce_context_t *ompi_comm_allreduce_context_alloc (int *in
256256

257257
/* find the next available local cid and start an allreduce */
258258
static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request);
259-
static int ompi_comm_allreduce_getnextcid2 (ompi_comm_request_t *request);
260-
static int ompi_comm_allreduce_getnextcid3 (ompi_comm_request_t *request);
261-
static int ompi_comm_allreduce_getnextcid4 (ompi_comm_request_t *request);
259+
static int ompi_comm_allreduce_getlocalprocs (ompi_comm_request_t *request);
260+
static int ompi_comm_allreduce_nextlocal_cid (ompi_comm_request_t *request);
262261
/* verify that the maximum cid is locally available and start an allreduce */
263262
static int ompi_comm_checkcid (ompi_comm_request_t *request);
264263
/* verify that the cid was available globally */
265264
static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request);
265+
static int ompi_comm_nextcid_setcid (ompi_comm_request_t *request);
266266

267267
static volatile int64_t ompi_comm_cid_lowest_id = INT64_MAX;
268268

@@ -280,7 +280,7 @@ int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *com
280280
return OMPI_ERR_OUT_OF_RESOURCE;
281281
}
282282

283-
context->start = ompi_mpi_communicators.lowest_free;
283+
context->start = 3;
284284
context->participate = (context->newcomm->c_local_group->grp_my_rank != MPI_UNDEFINED)?1:0;
285285

286286
request = ompi_comm_request_get ();
@@ -293,7 +293,7 @@ int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *com
293293

294294

295295
if (OMPI_COMM_IS_INTER(context->comm)) {
296-
context->master = ompi_comm_rank(context->comm);
296+
context->master = -1;
297297
context->children = 0;
298298
participant_t *p = OBJ_NEW(participant_t);
299299
p->super.name = OPAL_PROC_MY_NAME;
@@ -340,7 +340,7 @@ int ompi_comm_nextcid_nb (ompi_communicator_t *newcomm, ompi_communicator_t *com
340340
}
341341
}
342342
}
343-
ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid2, reqs, children);
343+
ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getlocalprocs, reqs, children);
344344
}
345345
}
346346
ompi_comm_request_start (request);
@@ -424,15 +424,9 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request)
424424

425425
ompi_comm_cid_lowest_id = my_id;
426426

427-
if (!OMPI_COMM_IS_INTER(context->comm) && ompi_comm_rank(context->comm) != context->master) {
428-
if (context->participate) {
429-
ompi_request_t *req;
430-
int rc = MCA_PML_CALL(irecv(&context->nextlocal_cid, 1, MPI_INT, context->master, OMPI_COMM_LOCAL_TAG, context->comm, &req));
431-
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid4, &req, 1);
432-
} else {
433-
context->nextlocal_cid = 0;
434-
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid4, NULL, 0);
435-
}
427+
if (0 <= context->master) {
428+
context->nextlocal_cid = 0;
429+
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_nextlocal_cid, NULL, 0);
436430
}
437431
/**
438432
* This is the real algorithm described in the doc
@@ -443,14 +437,14 @@ static int ompi_comm_allreduce_getnextcid (ompi_comm_request_t *request)
443437
subreq->cid = &context->nextlocal_cid;
444438
opal_pmix.cid_nb(&context->localprocs, context->start, 0, cid_cbfunc, subreq);
445439
ompi_request_t *req = &subreq->super;
446-
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid3, &req, 1);
440+
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_nextlocal_cid, &req, 1);
447441
} else {
448442
context->nextlocal_cid = 0;
449-
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid4, NULL, 0);
443+
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_nextlocal_cid, NULL, 0);
450444
}
451445
}
452446

453-
static int ompi_comm_allreduce_getnextcid2 (ompi_comm_request_t *request)
447+
static int ompi_comm_allreduce_getlocalprocs (ompi_comm_request_t *request)
454448
{
455449
ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
456450
participant_t *proc, *next;
@@ -474,34 +468,12 @@ static int ompi_comm_allreduce_getnextcid2 (ompi_comm_request_t *request)
474468

475469
}
476470

477-
static int ompi_comm_allreduce_getnextcid3 (ompi_comm_request_t *request)
478-
{
479-
ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
480-
ompi_request_t ** reqs = NULL;
481-
int j = 0;
482-
participant_t *p;
483-
484-
reqs = (ompi_request_t **)alloca(opal_list_get_size(&context->localprocs) * sizeof(ompi_request_t *));
485-
OPAL_LIST_FOREACH(p, &context->localprocs, participant_t) {
486-
if (ompi_comm_rank(context->comm) != p->rank) {
487-
MCA_PML_CALL(isend(&context->nextlocal_cid, 1, MPI_INT, p->rank, OMPI_COMM_LOCAL_TAG,
488-
MCA_PML_BASE_SEND_STANDARD, context->comm, reqs + j));
489-
j++;
490-
}
491-
}
492-
493-
return ompi_comm_request_schedule_append (request, ompi_comm_allreduce_getnextcid4, reqs, j);
494-
}
495-
496-
static int ompi_comm_allreduce_getnextcid4 (ompi_comm_request_t *request)
471+
static int ompi_comm_allreduce_nextlocal_cid (ompi_comm_request_t *request)
497472
{
498473
ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
499474
int ret;
500475

501476
ompi_request_t *subreq;
502-
if (!context->participate) {
503-
context->nextlocal_cid = 0;
504-
}
505477
ret = context->allreduce_fn (&context->nextlocal_cid, &context->nextcid, 1, MPI_MAX,
506478
context, &subreq);
507479
/* there was a failure during non-blocking collective
@@ -540,16 +512,17 @@ static int ompi_comm_checkcid (ompi_comm_request_t *request)
540512
return ompi_comm_request_schedule_append (request, ompi_comm_checkcid, NULL, 0);
541513
}
542514

543-
if( context->participate ){
515+
if (0 <= context->master || 0 == opal_list_get_size(&context->localprocs)) {
516+
context->flag = 1;
517+
} else {
544518
context->flag = (context->nextcid == context->nextlocal_cid);
545519
if ( context->participate && !context->flag) {
546520
opal_pointer_array_set_item(&ompi_mpi_communicators, context->nextlocal_cid, NULL);
547521

522+
/* FIXME */
548523
context->flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators,
549524
context->nextcid, context->comm);
550525
}
551-
} else {
552-
context->flag = 1;
553526
}
554527

555528
++context->iter;
@@ -577,39 +550,37 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request)
577550
}
578551

579552
if (1 == context->rflag) {
580-
if( !context->participate ) {
581-
/* we need to provide something sane here
582-
* but we cannot use `nextcid` as we may have it
583-
* in-use, go ahead with next locally-available CID
584-
*/
585-
context->nextlocal_cid = mca_pml.pml_max_contextid;
586-
for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) {
587-
bool flag;
588-
flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i,
589-
context->comm);
590-
if (true == flag) {
591-
context->nextlocal_cid = i;
592-
break;
553+
ompi_request_t ** reqs = NULL;
554+
int j = 0;
555+
if (0 <= context->master) {
556+
reqs = (ompi_request_t **)alloca(sizeof(ompi_request_t *));
557+
if (context->participate) {
558+
MCA_PML_CALL(irecv(&context->nextlocal_cid, 1, MPI_INT, context->master, OMPI_COMM_LOCAL_TAG,
559+
context->comm, reqs));
560+
j++;
561+
}
562+
} else {
563+
if (context->children > 0) {
564+
participant_t *p;
565+
reqs = (ompi_request_t **)alloca(opal_list_get_size(&context->localprocs) * sizeof(ompi_request_t *));
566+
OPAL_LIST_FOREACH(p, &context->localprocs, participant_t) {
567+
if (ompi_comm_rank(context->comm) != p->rank) {
568+
MCA_PML_CALL(isend(&context->nextlocal_cid, 1, MPI_INT, p->rank, OMPI_COMM_LOCAL_TAG,
569+
MCA_PML_BASE_SEND_STANDARD, context->comm, reqs + j));
570+
j++;
571+
}
593572
}
594573
}
595-
context->nextcid = context->nextlocal_cid;
596574
}
597-
598-
/* set the according values to the newcomm */
599-
context->newcomm->c_contextid = context->nextcid;
600-
opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, context->newcomm);
601-
602-
/* unlock the cid generator */
603-
ompi_comm_cid_lowest_id = INT64_MAX;
604-
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
605-
606-
/* done! */
607-
return OMPI_SUCCESS;
575+
return ompi_comm_request_schedule_append (request, ompi_comm_nextcid_setcid, reqs, j);
608576
}
609577

610-
if (context->participate && (1 == context->flag)) {
578+
if (context->flag) {
611579
/* we could use this cid, but other don't agree */
612-
opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, NULL);
580+
/* FIXME */
581+
if (context->participate) {
582+
opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, NULL);
583+
}
613584
context->start = context->nextcid + 1; /* that's where we can start the next round */
614585
}
615586

@@ -621,6 +592,44 @@ static int ompi_comm_nextcid_check_flag (ompi_comm_request_t *request)
621592
return ompi_comm_allreduce_getnextcid (request);
622593
}
623594

595+
static int ompi_comm_nextcid_setcid (ompi_comm_request_t *request)
596+
{
597+
ompi_comm_cid_context_t *context = (ompi_comm_cid_context_t *) request->context;
598+
599+
if (OPAL_THREAD_TRYLOCK(&ompi_cid_lock)) {
600+
return ompi_comm_request_schedule_append (request, ompi_comm_nextcid_setcid, NULL, 0);
601+
}
602+
603+
if( !context->participate ) {
604+
/* we need to provide something sane here
605+
* but we cannot use `nextcid` as we may have it
606+
* in-use, go ahead with next locally-available CID
607+
*/
608+
context->nextlocal_cid = mca_pml.pml_max_contextid;
609+
for (unsigned int i = context->start ; i < mca_pml.pml_max_contextid ; ++i) {
610+
bool flag;
611+
flag = opal_pointer_array_test_and_set_item (&ompi_mpi_communicators, i,
612+
context->comm);
613+
if (true == flag) {
614+
context->nextlocal_cid = i;
615+
break;
616+
}
617+
}
618+
context->nextcid = context->nextlocal_cid;
619+
} else {
620+
/* set the according values to the newcomm */
621+
context->newcomm->c_contextid = context->nextcid;
622+
opal_pointer_array_set_item (&ompi_mpi_communicators, context->nextcid, context->newcomm);
623+
}
624+
625+
/* unlock the cid generator */
626+
ompi_comm_cid_lowest_id = INT64_MAX;
627+
OPAL_THREAD_UNLOCK(&ompi_cid_lock);
628+
629+
/* done! */
630+
return OMPI_SUCCESS;
631+
}
632+
624633
/**************************************************************************/
625634
/**************************************************************************/
626635
/**************************************************************************/

ompi/communicator/comm_init.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
#include "opal/util/bit_ops.h"
3737
#include "opal/util/info_subscriber.h"
3838
#include "opal/mca/pmix/pmix.h"
39+
#include "orte/runtime/orte_wait.h"
3940
#include "ompi/constants.h"
4041
#include "ompi/mca/pml/pml.h"
4142
#include "ompi/mca/coll/base/base.h"
@@ -390,6 +391,11 @@ static void ompi_comm_construct(ompi_communicator_t* comm)
390391
OBJ_CONSTRUCT(&comm->c_lock, opal_mutex_t);
391392
}
392393

394+
static void cid_cbfunc(int status, int cid, void *cbdata) {
395+
bool *active = (bool *)cbdata;
396+
*active = false;
397+
}
398+
393399
static void ompi_comm_destruct(ompi_communicator_t* comm)
394400
{
395401
/* Note that the attributes were already released on this
@@ -454,8 +460,20 @@ static void ompi_comm_destruct(ompi_communicator_t* comm)
454460
if ( MPI_UNDEFINED != (int)comm->c_contextid &&
455461
NULL != opal_pointer_array_get_item(&ompi_mpi_communicators,
456462
comm->c_contextid)) {
463+
opal_list_t procs;
464+
opal_namelist_t proc;
465+
bool active;
457466
opal_pointer_array_set_item ( &ompi_mpi_communicators,
458467
comm->c_contextid, NULL);
468+
OBJ_CONSTRUCT(&procs, opal_list_t);
469+
OBJ_CONSTRUCT(&proc, opal_namelist_t);
470+
proc.name = OPAL_PROC_MY_NAME;
471+
opal_list_append(&procs, &proc.super);
472+
opal_pmix.cid_nb(&procs, 0, comm->c_contextid, cid_cbfunc, &active);
473+
ORTE_WAIT_FOR_COMPLETION(active);
474+
opal_list_remove_item(&procs, &proc.super);
475+
OBJ_DESTRUCT(&procs);
476+
OBJ_DESTRUCT(&proc);
459477
}
460478

461479
/* reset the ompi_comm_f_to_c_table entry */

0 commit comments

Comments
 (0)