diff --git a/orte/mca/grpcomm/base/grpcomm_base_frame.c b/orte/mca/grpcomm/base/grpcomm_base_frame.c index c6362c7162..242e4410f0 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_frame.c +++ b/orte/mca/grpcomm/base/grpcomm_base_frame.c @@ -12,7 +12,7 @@ * All rights reserved. * Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights * reserved. - * Copyright (c) 2014 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. * Copyright (c) 2015 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ @@ -122,6 +122,7 @@ static void ccon(orte_grpcomm_coll_t *p) OBJ_CONSTRUCT(&p->distance_mask_recv, opal_bitmap_t); p->dmns = NULL; p->ndmns = 0; + p->nexpected = 0; p->nreported = 0; p->cbfunc = NULL; p->cbdata = NULL; diff --git a/orte/mca/grpcomm/base/grpcomm_base_stubs.c b/orte/mca/grpcomm/base/grpcomm_base_stubs.c index 46f5b07364..7a25805568 100644 --- a/orte/mca/grpcomm/base/grpcomm_base_stubs.c +++ b/orte/mca/grpcomm/base/grpcomm_base_stubs.c @@ -157,7 +157,7 @@ static void allgather_stub(int fd, short args, void *cbdata) ret = opal_hash_table_set_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void *)&cd->sig->seq_num); if (OPAL_SUCCESS != ret) { OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output, - "%s rpcomm:base:allgather can't not add new signature to hash table", + "%s rpcomm:base:allgather cannot add new signature to hash table", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); ORTE_ERROR_LOG(ret); OBJ_RELEASE(cd); @@ -208,6 +208,9 @@ orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig { orte_grpcomm_coll_t *coll; int rc; + orte_namelist_t *nm; + opal_list_t children; + size_t n; /* search the existing tracker list to see if this already exists */ OPAL_LIST_FOREACH(coll, &orte_grpcomm_base.ongoing, orte_grpcomm_coll_t) { @@ -254,6 +257,30 @@ orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig ORTE_ERROR_LOG(rc); return NULL; } + /* cycle thru the array of daemons and compare them to our + * children in the routing tree, counting the ones that match + * so we know how many daemons we should receive contributions from */ + OBJ_CONSTRUCT(&children, opal_list_t); + orte_routed.get_routing_list(&children); + while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&children))) { + for (n=0; n < coll->ndmns; n++) { + if (nm->name.vpid == coll->dmns[n]) { + coll->nexpected++; + break; + } + } + OBJ_RELEASE(nm); + } + OPAL_LIST_DESTRUCT(&children); + /* see if I am in the array of participants - note that I may + * be in the rollup tree even though I'm not participating + * in the collective itself */ + for (n=0; n < coll->ndmns; n++) { + if (coll->dmns[n] == ORTE_PROC_MY_NAME->vpid) { + coll->nexpected++; + break; + } + } return coll; } @@ -292,6 +319,9 @@ static int create_dmns(orte_grpcomm_signature_t *sig, /* all daemons hosting this jobid are participating */ if (NULL == (jdata = orte_get_job_data_object(sig->signature[0].jobid))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); + ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND); + *ndmns = 0; + *dmns = NULL; return ORTE_ERR_NOT_FOUND; } if (NULL == jdata->map) { @@ -321,7 +351,10 @@ static int create_dmns(orte_grpcomm_signature_t *sig, /* should never happen */ ORTE_ERROR_LOG(ORTE_ERROR); free(dns); - return ORTE_ERROR; + ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND); + *ndmns = 0; + *dmns = NULL; + return ORTE_ERR_NOT_FOUND; } OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output, "%s grpcomm:base:create_dmns adding daemon %s to array", @@ -338,6 +371,9 @@ static int create_dmns(orte_grpcomm_signature_t *sig, if (NULL == (jdata = orte_get_job_data_object(sig->signature[n].jobid))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); OPAL_LIST_DESTRUCT(&ds); + ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND); + *ndmns = 0; + *dmns = NULL; return ORTE_ERR_NOT_FOUND; } opal_output_verbose(5, orte_grpcomm_base_framework.framework_output, @@ -347,12 +383,17 @@ static int create_dmns(orte_grpcomm_signature_t *sig, if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, sig->signature[n].vpid))) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); OPAL_LIST_DESTRUCT(&ds); + ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND); + *ndmns = 0; + *dmns = NULL; return ORTE_ERR_NOT_FOUND; } if (NULL == proc->node || NULL == proc->node->daemon) { ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND); OPAL_LIST_DESTRUCT(&ds); ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND); + *ndmns = 0; + *dmns = NULL; return ORTE_ERR_NOT_FOUND; } vpid = proc->node->daemon->name.vpid; @@ -372,7 +413,10 @@ static int create_dmns(orte_grpcomm_signature_t *sig, if (0 == opal_list_get_size(&ds)) { ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM); OPAL_LIST_DESTRUCT(&ds); - return ORTE_ERR_BAD_PARAM; + ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND); + *ndmns = 0; + *dmns = NULL; + return ORTE_ERR_NOT_FOUND; } dns = (orte_vpid_t*)malloc(opal_list_get_size(&ds) * sizeof(orte_vpid_t)); nds = 0; diff --git a/orte/mca/grpcomm/brks/.opal_ignore b/orte/mca/grpcomm/brks/.opal_ignore new file mode 100644 index 0000000000..e69de29bb2 diff --git a/orte/mca/grpcomm/direct/grpcomm_direct.c b/orte/mca/grpcomm/direct/grpcomm_direct.c index efc4671230..4fc737865c 100644 --- a/orte/mca/grpcomm/direct/grpcomm_direct.c +++ b/orte/mca/grpcomm/direct/grpcomm_direct.c @@ -5,7 +5,7 @@ * Copyright (c) 2011 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011-2013 Los Alamos National Security, LLC. All * rights reserved. - * Copyright (c) 2014-2015 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2016 Intel, Inc. All rights reserved. * Copyright (c) 2014 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ @@ -124,7 +124,7 @@ static int xcast(orte_vpid_t *vpids, static int allgather(orte_grpcomm_coll_t *coll, opal_buffer_t *buf) { - int rc, ret; + int rc; opal_buffer_t *relay; OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output, @@ -143,35 +143,16 @@ static int allgather(orte_grpcomm_coll_t *coll, return rc; } - /* if we are the HNP and nobody else is participating, - * then just execute the xcast */ - if (ORTE_PROC_IS_HNP && 1 == coll->ndmns) { - /* pack the status - success since the allgather completed. This - * would be an error if we timeout instead */ - ret = ORTE_SUCCESS; - if (OPAL_SUCCESS != (rc = opal_dss.pack(relay, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(relay); - return rc; - } - /* pass along the payload */ - opal_dss.copy_payload(relay, buf); - orte_grpcomm.xcast(coll->sig, ORTE_RML_TAG_COLL_RELEASE, relay); - OBJ_RELEASE(relay); - return ORTE_SUCCESS; - } - /* pass along the payload */ opal_dss.copy_payload(relay, buf); - /* otherwise, we need to send this to the HNP for - * processing */ + /* send this to ourselves for processing */ OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output, - "%s grpcomm:direct:allgather sending to HNP", + "%s grpcomm:direct:allgather sending to ourself", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); - /* send the info to the HNP for tracking */ - rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, relay, + /* send the info to ourselves for tracking */ + rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_NAME, relay, ORTE_RML_TAG_ALLGATHER_DIRECT, orte_rml_send_callback, NULL); return rc; @@ -212,35 +193,60 @@ static void allgather_recv(int status, orte_process_name_t* sender, opal_dss.copy_payload(&coll->bucket, buffer); OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output, - "%s grpcomm:direct allgather recv ndmns %d nrep %d", + "%s grpcomm:direct allgather recv nexpected %d nrep %d", ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), - (int)coll->ndmns, (int)coll->nreported)); - - /* if all participating daemons have reported */ - if (coll->ndmns == coll->nreported) { - reply = OBJ_NEW(opal_buffer_t); - /* pack the signature */ - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) { - ORTE_ERROR_LOG(rc); - OBJ_RELEASE(reply); - OBJ_RELEASE(sig); - return; - } - /* pack the status - success since the allgather completed. This - * would be an error if we timeout instead */ - ret = ORTE_SUCCESS; - if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { - ORTE_ERROR_LOG(rc); + (int)coll->nexpected, (int)coll->nreported)); + + /* see if everyone has reported */ + if (coll->nreported == coll->nexpected) { + if (ORTE_PROC_IS_HNP) { + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output, + "%s grpcomm:direct allgather HNP reports complete", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME))); + /* the allgather is complete - send the xcast */ + reply = OBJ_NEW(opal_buffer_t); + /* pack the signature */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(sig); + return; + } + /* pack the status - success since the allgather completed. This + * would be an error if we timeout instead */ + ret = ORTE_SUCCESS; + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(sig); + return; + } + /* transfer the collected bucket */ + opal_dss.copy_payload(reply, &coll->bucket); + /* send the release via xcast */ + (void)orte_grpcomm.xcast(sig, ORTE_RML_TAG_COLL_RELEASE, reply); OBJ_RELEASE(reply); - OBJ_RELEASE(sig); - return; + } else { + OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output, + "%s grpcomm:direct allgather rollup complete - sending to %s", + ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), + ORTE_NAME_PRINT(ORTE_PROC_MY_PARENT))); + /* relay the bucket upward */ + reply = OBJ_NEW(opal_buffer_t); + /* pack the signature */ + if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) { + ORTE_ERROR_LOG(rc); + OBJ_RELEASE(reply); + OBJ_RELEASE(sig); + return; + } + /* transfer the collected bucket */ + opal_dss.copy_payload(reply, &coll->bucket); + /* send the info to our parent */ + rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_PARENT, reply, + ORTE_RML_TAG_ALLGATHER_DIRECT, + orte_rml_send_callback, NULL); } - /* transfer the collected bucket */ - opal_dss.copy_payload(reply, &coll->bucket); - - /* send the release via xcast */ - (void)orte_grpcomm.xcast(sig, ORTE_RML_TAG_COLL_RELEASE, reply); - OBJ_RELEASE(reply); } OBJ_RELEASE(sig); } diff --git a/orte/mca/grpcomm/direct/grpcomm_direct_component.c b/orte/mca/grpcomm/direct/grpcomm_direct_component.c index ac4b6e693f..3c6cad000d 100644 --- a/orte/mca/grpcomm/direct/grpcomm_direct_component.c +++ b/orte/mca/grpcomm/direct/grpcomm_direct_component.c @@ -1,7 +1,7 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2011 Cisco Systems, Inc. All rights reserved. - * Copyright (c) 2011-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2014 Intel, Inc. All rights reserved. * $COPYRIGHT$ @@ -55,7 +55,7 @@ static int direct_register(void) /* make the priority adjustable so users can select * direct for use by apps without affecting daemons */ - my_priority = 1; + my_priority = 85; (void) mca_base_component_var_register(c, "priority", "Priority of the grpcomm direct component", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, diff --git a/orte/mca/grpcomm/grpcomm.h b/orte/mca/grpcomm/grpcomm.h index f3be029c0e..00ddccacc4 100644 --- a/orte/mca/grpcomm/grpcomm.h +++ b/orte/mca/grpcomm/grpcomm.h @@ -77,6 +77,8 @@ typedef struct { size_t ndmns; /** my index in the dmns array */ unsigned long my_rank; + /* number of buckets expected */ + size_t nexpected; /* number reported in */ size_t nreported; /* distance masks for receive */ diff --git a/orte/mca/grpcomm/rcd/.opal_ignore b/orte/mca/grpcomm/rcd/.opal_ignore new file mode 100644 index 0000000000..e69de29bb2 diff --git a/orte/mca/odls/base/odls_base_default_fns.c b/orte/mca/odls/base/odls_base_default_fns.c index 738bc33d30..6476ff9d15 100644 --- a/orte/mca/odls/base/odls_base_default_fns.c +++ b/orte/mca/odls/base/odls_base_default_fns.c @@ -14,7 +14,7 @@ * Copyright (c) 2011-2015 Los Alamos National Security, LLC. * All rights reserved. * Copyright (c) 2011-2013 Cisco Systems, Inc. All rights reserved. - * Copyright (c) 2013-2015 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2016 Intel, Inc. All rights reserved. * Copyright (c) 2014 Research Organization for Information Science * and Technology (RIST). All rights reserved. * $COPYRIGHT$ @@ -240,6 +240,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, orte_app_context_t *app; bool found; orte_node_t *node; + bool newmap = false; OPAL_OUTPUT_VERBOSE((5, orte_odls_base_framework.framework_output, "%s odls:constructing child list", @@ -389,6 +390,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, /* ensure the map object is present */ if (NULL == jdata->map) { jdata->map = OBJ_NEW(orte_job_map_t); + newmap = true; } /* if we have a file map, then we need to load it */ @@ -446,7 +448,9 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data, if (!found) { OBJ_RETAIN(dmn->node); opal_pointer_array_add(jdata->map->nodes, dmn->node); - jdata->map->num_nodes++; + if (newmap) { + jdata->map->num_nodes++; + } } /* see if it belongs to us */