Skip to content
This repository was archived by the owner on Sep 30, 2022. It is now read-only.

Fix the grpcomm operations at scale. #952

Merged
merged 1 commit into from
Feb 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion orte/mca/grpcomm/base/grpcomm_base_frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* All rights reserved.
* Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
Expand Down Expand Up @@ -122,6 +122,7 @@ static void ccon(orte_grpcomm_coll_t *p)
OBJ_CONSTRUCT(&p->distance_mask_recv, opal_bitmap_t);
p->dmns = NULL;
p->ndmns = 0;
p->nexpected = 0;
p->nreported = 0;
p->cbfunc = NULL;
p->cbdata = NULL;
Expand Down
50 changes: 47 additions & 3 deletions orte/mca/grpcomm/base/grpcomm_base_stubs.c
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ static void allgather_stub(int fd, short args, void *cbdata)
ret = opal_hash_table_set_value_ptr(&orte_grpcomm_base.sig_table, (void *)cd->sig->signature, cd->sig->sz * sizeof(orte_process_name_t), (void *)&cd->sig->seq_num);
if (OPAL_SUCCESS != ret) {
OPAL_OUTPUT((orte_grpcomm_base_framework.framework_output,
"%s rpcomm:base:allgather can't not add new signature to hash table",
"%s rpcomm:base:allgather cannot add new signature to hash table",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
ORTE_ERROR_LOG(ret);
OBJ_RELEASE(cd);
Expand Down Expand Up @@ -208,6 +208,9 @@ orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig
{
orte_grpcomm_coll_t *coll;
int rc;
orte_namelist_t *nm;
opal_list_t children;
size_t n;

/* search the existing tracker list to see if this already exists */
OPAL_LIST_FOREACH(coll, &orte_grpcomm_base.ongoing, orte_grpcomm_coll_t) {
Expand Down Expand Up @@ -254,6 +257,30 @@ orte_grpcomm_coll_t* orte_grpcomm_base_get_tracker(orte_grpcomm_signature_t *sig
ORTE_ERROR_LOG(rc);
return NULL;
}
/* cycle thru the array of daemons and compare them to our
* children in the routing tree, counting the ones that match
* so we know how many daemons we should receive contributions from */
OBJ_CONSTRUCT(&children, opal_list_t);
orte_routed.get_routing_list(&children);
while (NULL != (nm = (orte_namelist_t*)opal_list_remove_first(&children))) {
for (n=0; n < coll->ndmns; n++) {
if (nm->name.vpid == coll->dmns[n]) {
coll->nexpected++;
break;
}
}
OBJ_RELEASE(nm);
}
OPAL_LIST_DESTRUCT(&children);
/* see if I am in the array of participants - note that I may
* be in the rollup tree even though I'm not participating
* in the collective itself */
for (n=0; n < coll->ndmns; n++) {
if (coll->dmns[n] == ORTE_PROC_MY_NAME->vpid) {
coll->nexpected++;
break;
}
}
return coll;
}

Expand Down Expand Up @@ -292,6 +319,9 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
/* all daemons hosting this jobid are participating */
if (NULL == (jdata = orte_get_job_data_object(sig->signature[0].jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
*ndmns = 0;
*dmns = NULL;
return ORTE_ERR_NOT_FOUND;
}
if (NULL == jdata->map) {
Expand Down Expand Up @@ -321,7 +351,10 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
/* should never happen */
ORTE_ERROR_LOG(ORTE_ERROR);
free(dns);
return ORTE_ERROR;
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
*ndmns = 0;
*dmns = NULL;
return ORTE_ERR_NOT_FOUND;
}
OPAL_OUTPUT_VERBOSE((5, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:base:create_dmns adding daemon %s to array",
Expand All @@ -338,6 +371,9 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
if (NULL == (jdata = orte_get_job_data_object(sig->signature[n].jobid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OPAL_LIST_DESTRUCT(&ds);
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
*ndmns = 0;
*dmns = NULL;
return ORTE_ERR_NOT_FOUND;
}
opal_output_verbose(5, orte_grpcomm_base_framework.framework_output,
Expand All @@ -347,12 +383,17 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
if (NULL == (proc = (orte_proc_t*)opal_pointer_array_get_item(jdata->procs, sig->signature[n].vpid))) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OPAL_LIST_DESTRUCT(&ds);
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
*ndmns = 0;
*dmns = NULL;
return ORTE_ERR_NOT_FOUND;
}
if (NULL == proc->node || NULL == proc->node->daemon) {
ORTE_ERROR_LOG(ORTE_ERR_NOT_FOUND);
OPAL_LIST_DESTRUCT(&ds);
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
*ndmns = 0;
*dmns = NULL;
return ORTE_ERR_NOT_FOUND;
}
vpid = proc->node->daemon->name.vpid;
Expand All @@ -372,7 +413,10 @@ static int create_dmns(orte_grpcomm_signature_t *sig,
if (0 == opal_list_get_size(&ds)) {
ORTE_ERROR_LOG(ORTE_ERR_BAD_PARAM);
OPAL_LIST_DESTRUCT(&ds);
return ORTE_ERR_BAD_PARAM;
ORTE_FORCED_TERMINATE(ORTE_ERR_NOT_FOUND);
*ndmns = 0;
*dmns = NULL;
return ORTE_ERR_NOT_FOUND;
}
dns = (orte_vpid_t*)malloc(opal_list_get_size(&ds) * sizeof(orte_vpid_t));
nds = 0;
Expand Down
Empty file.
108 changes: 57 additions & 51 deletions orte/mca/grpcomm/direct/grpcomm_direct.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011-2013 Los Alamos National Security, LLC. All
* rights reserved.
* Copyright (c) 2014-2015 Intel, Inc. All rights reserved.
* Copyright (c) 2014-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
Expand Down Expand Up @@ -124,7 +124,7 @@ static int xcast(orte_vpid_t *vpids,
static int allgather(orte_grpcomm_coll_t *coll,
opal_buffer_t *buf)
{
int rc, ret;
int rc;
opal_buffer_t *relay;

OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
Expand All @@ -143,35 +143,16 @@ static int allgather(orte_grpcomm_coll_t *coll,
return rc;
}

/* if we are the HNP and nobody else is participating,
* then just execute the xcast */
if (ORTE_PROC_IS_HNP && 1 == coll->ndmns) {
/* pack the status - success since the allgather completed. This
* would be an error if we timeout instead */
ret = ORTE_SUCCESS;
if (OPAL_SUCCESS != (rc = opal_dss.pack(relay, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(relay);
return rc;
}
/* pass along the payload */
opal_dss.copy_payload(relay, buf);
orte_grpcomm.xcast(coll->sig, ORTE_RML_TAG_COLL_RELEASE, relay);
OBJ_RELEASE(relay);
return ORTE_SUCCESS;
}

/* pass along the payload */
opal_dss.copy_payload(relay, buf);

/* otherwise, we need to send this to the HNP for
* processing */
/* send this to ourselves for processing */
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct:allgather sending to HNP",
"%s grpcomm:direct:allgather sending to ourself",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));

/* send the info to the HNP for tracking */
rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, relay,
/* send the info to ourselves for tracking */
rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_NAME, relay,
ORTE_RML_TAG_ALLGATHER_DIRECT,
orte_rml_send_callback, NULL);
return rc;
Expand Down Expand Up @@ -212,35 +193,60 @@ static void allgather_recv(int status, orte_process_name_t* sender,
opal_dss.copy_payload(&coll->bucket, buffer);

OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct allgather recv ndmns %d nrep %d",
"%s grpcomm:direct allgather recv nexpected %d nrep %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
(int)coll->ndmns, (int)coll->nreported));

/* if all participating daemons have reported */
if (coll->ndmns == coll->nreported) {
reply = OBJ_NEW(opal_buffer_t);
/* pack the signature */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
return;
}
/* pack the status - success since the allgather completed. This
* would be an error if we timeout instead */
ret = ORTE_SUCCESS;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
(int)coll->nexpected, (int)coll->nreported));

/* see if everyone has reported */
if (coll->nreported == coll->nexpected) {
if (ORTE_PROC_IS_HNP) {
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct allgather HNP reports complete",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
/* the allgather is complete - send the xcast */
reply = OBJ_NEW(opal_buffer_t);
/* pack the signature */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
return;
}
/* pack the status - success since the allgather completed. This
* would be an error if we timeout instead */
ret = ORTE_SUCCESS;
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &ret, 1, OPAL_INT))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
return;
}
/* transfer the collected bucket */
opal_dss.copy_payload(reply, &coll->bucket);
/* send the release via xcast */
(void)orte_grpcomm.xcast(sig, ORTE_RML_TAG_COLL_RELEASE, reply);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
return;
} else {
OPAL_OUTPUT_VERBOSE((1, orte_grpcomm_base_framework.framework_output,
"%s grpcomm:direct allgather rollup complete - sending to %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(ORTE_PROC_MY_PARENT)));
/* relay the bucket upward */
reply = OBJ_NEW(opal_buffer_t);
/* pack the signature */
if (OPAL_SUCCESS != (rc = opal_dss.pack(reply, &sig, 1, ORTE_SIGNATURE))) {
ORTE_ERROR_LOG(rc);
OBJ_RELEASE(reply);
OBJ_RELEASE(sig);
return;
}
/* transfer the collected bucket */
opal_dss.copy_payload(reply, &coll->bucket);
/* send the info to our parent */
rc = orte_rml.send_buffer_nb(ORTE_PROC_MY_PARENT, reply,
ORTE_RML_TAG_ALLGATHER_DIRECT,
orte_rml_send_callback, NULL);
}
/* transfer the collected bucket */
opal_dss.copy_payload(reply, &coll->bucket);

/* send the release via xcast */
(void)orte_grpcomm.xcast(sig, ORTE_RML_TAG_COLL_RELEASE, reply);
OBJ_RELEASE(reply);
}
OBJ_RELEASE(sig);
}
Expand Down
4 changes: 2 additions & 2 deletions orte/mca/grpcomm/direct/grpcomm_direct_component.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2011-2015 Los Alamos National Security, LLC. All rights
* Copyright (c) 2011-2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014 Intel, Inc. All rights reserved.
* $COPYRIGHT$
Expand Down Expand Up @@ -55,7 +55,7 @@ static int direct_register(void)
/* make the priority adjustable so users can select
* direct for use by apps without affecting daemons
*/
my_priority = 1;
my_priority = 85;
(void) mca_base_component_var_register(c, "priority",
"Priority of the grpcomm direct component",
MCA_BASE_VAR_TYPE_INT, NULL, 0, 0,
Expand Down
2 changes: 2 additions & 0 deletions orte/mca/grpcomm/grpcomm.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ typedef struct {
size_t ndmns;
/** my index in the dmns array */
unsigned long my_rank;
/* number of buckets expected */
size_t nexpected;
/* number reported in */
size_t nreported;
/* distance masks for receive */
Expand Down
Empty file.
8 changes: 6 additions & 2 deletions orte/mca/odls/base/odls_base_default_fns.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* Copyright (c) 2011-2015 Los Alamos National Security, LLC.
* All rights reserved.
* Copyright (c) 2011-2013 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013-2015 Intel, Inc. All rights reserved.
* Copyright (c) 2013-2016 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
Expand Down Expand Up @@ -240,6 +240,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
orte_app_context_t *app;
bool found;
orte_node_t *node;
bool newmap = false;

OPAL_OUTPUT_VERBOSE((5, orte_odls_base_framework.framework_output,
"%s odls:constructing child list",
Expand Down Expand Up @@ -389,6 +390,7 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
/* ensure the map object is present */
if (NULL == jdata->map) {
jdata->map = OBJ_NEW(orte_job_map_t);
newmap = true;
}

/* if we have a file map, then we need to load it */
Expand Down Expand Up @@ -446,7 +448,9 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *data,
if (!found) {
OBJ_RETAIN(dmn->node);
opal_pointer_array_add(jdata->map->nodes, dmn->node);
jdata->map->num_nodes++;
if (newmap) {
jdata->map->num_nodes++;
}
}

/* see if it belongs to us */
Expand Down