Skip to content

Commit 3fa6c63

Browse files
author
rhc54
committed
Merge pull request #871 from rhc54/topic/dpm
More dynamic op cleanups
2 parents c404e98 + e6add86 commit 3fa6c63

File tree

9 files changed

+306
-175
lines changed

9 files changed

+306
-175
lines changed

ompi/dpm/dpm.c

Lines changed: 122 additions & 144 deletions
Large diffs are not rendered by default.

ompi/mpi/c/comm_join.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ int MPI_Comm_join(int fd, MPI_Comm *intercomm)
100100
send_first = true;
101101
}
102102

103+
/* ensure the port name is NULL terminated */
104+
memset(port_name, 0, MPI_MAX_PORT_NAME);
105+
103106
/* Assumption: socket_send should not block, even if the socket
104107
is not configured to be non-blocking, because the message length are
105108
so short. */

opal/mca/pmix/pmix1xx/pmix1_client.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ int pmix1_get(const opal_process_name_t *proc,
301301
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
302302
"%s PMIx_client get on proc %s key %s",
303303
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
304-
OPAL_NAME_PRINT(*proc), key);
304+
(NULL == proc) ? "NULL" : OPAL_NAME_PRINT(*proc), key);
305305

306306
/* prep default response */
307307
*val = NULL;
@@ -371,7 +371,7 @@ int pmix1_getnb(const opal_process_name_t *proc, const char *key,
371371
opal_output_verbose(1, opal_pmix_base_framework.framework_output,
372372
"%s PMIx_client get_nb on proc %s key %s",
373373
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
374-
OPAL_NAME_PRINT(*proc), key);
374+
(NULL == proc) ? "NULL" : OPAL_NAME_PRINT(*proc), key);
375375

376376
/* create the caddy */
377377
op = OBJ_NEW(pmix1_opcaddy_t);
@@ -501,7 +501,7 @@ int pmix1_lookup(opal_list_t *data, opal_list_t *info)
501501
++n;
502502
}
503503
} else {
504-
pdata = NULL;
504+
pinfo = NULL;
505505
ninfo = 0;
506506
}
507507

orte/orted/pmix/pmix_server.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,15 @@ static void pmix_server_dmdx_resp(int status, orte_process_name_t* sender,
569569
}
570570
}
571571

572+
static void opcon(orte_pmix_server_op_caddy_t *p)
573+
{
574+
p->procs = NULL;
575+
p->info = NULL;
576+
p->cbdata = NULL;
577+
}
578+
OBJ_CLASS_INSTANCE(orte_pmix_server_op_caddy_t,
579+
opal_object_t,
580+
opcon, NULL);
572581

573582
static void rqcon(pmix_server_req_t *p)
574583
{

orte/orted/pmix/pmix_server_dyn.c

Lines changed: 149 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@
4646
#include "orte/runtime/orte_globals.h"
4747
#include "orte/mca/rml/rml.h"
4848

49-
#include "pmix_server_internal.h"
49+
#include "orte/orted/pmix/pmix_server.h"
50+
#include "orte/orted/pmix/pmix_server_internal.h"
5051

5152
void pmix_server_launch_resp(int status, orte_process_name_t* sender,
5253
opal_buffer_t *buffer,
@@ -327,6 +328,119 @@ int pmix_server_spawn_fn(opal_process_name_t *requestor,
327328
return OPAL_SUCCESS;
328329
}
329330

331+
static void _cnct(int sd, short args, void *cbdata);
332+
333+
static void _cnlk(int status, opal_list_t *data, void *cbdata)
334+
{
335+
orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
336+
int rc, cnt;
337+
opal_pmix_pdata_t *pdat;
338+
orte_job_t *jdata;
339+
opal_buffer_t buf;
340+
341+
/* if we failed to get the required data, then just inform
342+
* the embedded server that the connect cannot succeed */
343+
if (ORTE_SUCCESS != status || NULL == data) {
344+
if (NULL != cd->cbfunc) {
345+
rc = status;
346+
goto release;
347+
}
348+
}
349+
350+
/* register the returned data with the embedded PMIx server */
351+
pdat = (opal_pmix_pdata_t*)opal_list_get_first(data);
352+
if (OPAL_BYTE_OBJECT != pdat->value.type) {
353+
rc = ORTE_ERR_BAD_PARAM;
354+
goto release;
355+
}
356+
/* the data will consist of a packed buffer with the job data in it */
357+
OBJ_CONSTRUCT(&buf, opal_buffer_t);
358+
opal_dss.load(&buf, pdat->value.data.bo.bytes, pdat->value.data.bo.size);
359+
pdat->value.data.bo.bytes = NULL;
360+
pdat->value.data.bo.size = 0;
361+
cnt = 1;
362+
if (OPAL_SUCCESS != (rc = opal_dss.unpack(&buf, &jdata, &cnt, ORTE_JOB))) {
363+
OBJ_DESTRUCT(&buf);
364+
goto release;
365+
}
366+
OBJ_DESTRUCT(&buf);
367+
if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata))) {
368+
OBJ_RELEASE(jdata);
369+
goto release;
370+
}
371+
OBJ_RELEASE(jdata); // no reason to keep this around
372+
373+
/* restart the cnct processor */
374+
ORTE_PMIX_OPERATION(cd->procs, cd->info, _cnct, cd->cbfunc, cd->cbdata);
375+
OBJ_RELEASE(cd);
376+
377+
release:
378+
if (NULL != cd->cbfunc) {
379+
cd->cbfunc(rc, cd->cbdata);
380+
}
381+
OBJ_RELEASE(cd);
382+
}
383+
384+
static void _cnct(int sd, short args, void *cbdata)
385+
{
386+
orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
387+
orte_namelist_t *nm;
388+
char **keys = NULL, *key;
389+
orte_job_t *jdata;
390+
int rc = ORTE_SUCCESS;
391+
392+
/* at some point, we need to add bookeeping to track which
393+
* procs are "connected" so we know who to notify upon
394+
* termination or failure. For now, we have to ensure
395+
* that we have registered all participating nspaces so
396+
* the embedded PMIx server can provide them to the client.
397+
* Otherwise, the client will receive an error as it won't
398+
* be able to resolve any of the required data for the
399+
* missing nspaces */
400+
401+
/* cycle thru the procs */
402+
OPAL_LIST_FOREACH(nm, cd->procs, orte_namelist_t) {
403+
/* see if we have the job object for this job */
404+
if (NULL == (jdata = orte_get_job_data_object(nm->name.jobid))) {
405+
/* we don't know about this job. If our "global" data
406+
* server is just our HNP, then we have no way of finding
407+
* out about it, and all we can do is return an error */
408+
if (orte_pmix_server_globals.server.jobid == ORTE_PROC_MY_HNP->jobid &&
409+
orte_pmix_server_globals.server.vpid == ORTE_PROC_MY_HNP->vpid) {
410+
rc = ORTE_ERR_NOT_SUPPORTED;
411+
goto release;
412+
}
413+
/* ask the global data server for the data - if we get it,
414+
* then we can complete the request */
415+
key = opal_convert_jobid_to_string(nm->name.jobid);
416+
opal_argv_append_nosize(&keys, key);
417+
free(key);
418+
if (ORTE_SUCCESS != (rc = pmix_server_lookup_fn(&nm->name, keys, cd->info, _cnlk, cd))) {
419+
opal_argv_free(keys);
420+
goto release;
421+
}
422+
opal_argv_free(keys);
423+
/* the callback function on this lookup will return us to this
424+
* routine so we can continue the process */
425+
return;
426+
}
427+
/* we know about the job - check to ensure it has been
428+
* registered with the local PMIx server */
429+
if (!orte_get_attribute(&jdata->attributes, ORTE_JOB_NSPACE_REGISTERED, NULL, OPAL_BOOL)) {
430+
/* it hasn't been registered yet, so register it now */
431+
if (ORTE_SUCCESS != (rc = orte_pmix_server_register_nspace(jdata))) {
432+
goto release;
433+
}
434+
}
435+
}
436+
437+
release:
438+
if (NULL != cd->cbfunc) {
439+
cd->cbfunc(rc, cd->cbdata);
440+
}
441+
OBJ_RELEASE(cd);
442+
}
443+
330444
int pmix_server_connect_fn(opal_list_t *procs, opal_list_t *info,
331445
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
332446
{
@@ -335,26 +449,52 @@ int pmix_server_connect_fn(opal_list_t *procs, opal_list_t *info,
335449
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
336450
(int)opal_list_get_size(procs));
337451

338-
/* for now, just ack the call */
339-
if (NULL != cbfunc) {
340-
cbfunc(OPAL_SUCCESS, cbdata);
452+
/* protect ourselves */
453+
if (NULL == procs || 0 == opal_list_get_size(procs)) {
454+
return ORTE_ERR_BAD_PARAM;
341455
}
456+
/* must thread shift this as we will be accessing global data */
457+
ORTE_PMIX_OPERATION(procs, info, _cnct, cbfunc, cbdata);
458+
return ORTE_SUCCESS;
459+
}
342460

343-
return OPAL_SUCCESS;
461+
static void mdxcbfunc(int status,
462+
const char *data, size_t ndata, void *cbdata,
463+
opal_pmix_release_cbfunc_t relcbfunc, void *relcbdata)
464+
{
465+
orte_pmix_server_op_caddy_t *cd = (orte_pmix_server_op_caddy_t*)cbdata;
466+
467+
/* ack the call */
468+
if (NULL != cd->cbfunc) {
469+
cd->cbfunc(status, cd->cbdata);
470+
}
471+
OBJ_RELEASE(cd);
344472
}
345473

346474
int pmix_server_disconnect_fn(opal_list_t *procs, opal_list_t *info,
347475
opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
348476
{
477+
orte_pmix_server_op_caddy_t *cd;
478+
int rc;
479+
349480
opal_output_verbose(2, orte_pmix_server_globals.output,
350481
"%s disconnect called with %d procs",
351482
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
352483
(int)opal_list_get_size(procs));
353484

354-
/* for now, just ack the call */
355-
if (NULL != cbfunc) {
356-
cbfunc(OPAL_SUCCESS, cbdata);
485+
/* at some point, we need to add bookeeping to track which
486+
* procs are "connected" so we know who to notify upon
487+
* termination or failure. For now, just execute a fence
488+
* Note that we do not need to thread-shift here as the
489+
* fence function will do it for us */
490+
cd = OBJ_NEW(orte_pmix_server_op_caddy_t);
491+
cd->cbfunc = cbfunc;
492+
cd->cbdata = cbdata;
493+
494+
if (ORTE_SUCCESS != (rc = pmix_server_fencenb_fn(procs, info, NULL, 0,
495+
mdxcbfunc, cd))) {
496+
OBJ_RELEASE(cd);
357497
}
358498

359-
return OPAL_SUCCESS;
499+
return rc;
360500
}

orte/orted/pmix/pmix_server_internal.h

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,8 @@ OBJ_CLASS_DECLARATION(pmix_server_req_t);
7272
typedef struct {
7373
opal_object_t super;
7474
opal_event_t ev;
75-
orte_job_t *jdata;
76-
orte_process_name_t proc;
77-
int status;
78-
orte_proc_t *object;
75+
opal_list_t *procs;
76+
opal_list_t *info;
7977
opal_pmix_op_cbfunc_t cbfunc;
8078
void *cbdata;
8179
} orte_pmix_server_op_caddy_t;
@@ -115,21 +113,18 @@ do { \
115113
opal_event_active(&(_req->ev), OPAL_EV_WRITE, 1); \
116114
} while(0);
117115

118-
#define ORTE_PMIX_OPERATION(n, r, ob, s, fn, cf, cb) \
119-
do { \
120-
orte_pmix_server_op_caddy_t *_cd; \
121-
_cd = OBJ_NEW(orte_pmix_server_op_caddy_t); \
122-
/* convert the namespace to jobid and create name */ \
123-
orte_util_convert_string_to_jobid(&(_cd->proc.jobid), (n)); \
124-
_cd->proc.vpid = (r); \
125-
_cd->object = (ob); \
126-
_cd->cbfunc = (cf); \
127-
_cd->cbdata = (cb); \
128-
_cd->status = (s); \
129-
opal_event_set(orte_event_base, &(_cd->ev), -1, \
130-
OPAL_EV_WRITE, (fn), _cd); \
131-
opal_event_set_priority(&(_cd->ev), ORTE_MSG_PRI); \
132-
opal_event_active(&(_cd->ev), OPAL_EV_WRITE, 1); \
116+
#define ORTE_PMIX_OPERATION(p, i, fn, cf, cb) \
117+
do { \
118+
orte_pmix_server_op_caddy_t *_cd; \
119+
_cd = OBJ_NEW(orte_pmix_server_op_caddy_t); \
120+
_cd->procs = (p); \
121+
_cd->info = (i); \
122+
_cd->cbfunc = (cf); \
123+
_cd->cbdata = (cb); \
124+
opal_event_set(orte_event_base, &(_cd->ev), -1, \
125+
OPAL_EV_WRITE, (fn), _cd); \
126+
opal_event_set_priority(&(_cd->ev), ORTE_MSG_PRI); \
127+
opal_event_active(&(_cd->ev), OPAL_EV_WRITE, 1); \
133128
} while(0);
134129

135130

orte/orted/pmix/pmix_server_register_fns.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,9 @@ int orte_pmix_server_register_nspace(orte_job_t *jdata)
387387
opal_list_append(pmap, &kv->super);
388388
}
389389

390+
/* mark the job as registered */
391+
orte_set_attribute(&jdata->attributes, ORTE_JOB_NSPACE_REGISTERED, ORTE_ATTR_LOCAL, NULL, OPAL_BOOL);
392+
390393
/* pass it down */
391394
if (OPAL_SUCCESS != opal_pmix.server_register_nspace(jdata->jobid,
392395
jdata->num_local_procs,

orte/util/attr.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,8 @@ const char *orte_attr_key_to_str(orte_attribute_key_t key)
259259
return "JOB-ROOM-NUM";
260260
case ORTE_JOB_LAUNCH_PROXY:
261261
return "JOB-LAUNCH-PROXY";
262+
case ORTE_JOB_NSPACE_REGISTERED:
263+
return "JOB-NSPACE-REGISTERED";
262264

263265
case ORTE_PROC_NOBARRIER:
264266
return "PROC-NOBARRIER";

orte/util/attr.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ typedef uint16_t orte_job_flags_t;
129129
#define ORTE_JOB_NOTIFICATIONS (ORTE_JOB_START_KEY + 38) // string - comma-separated list of desired notifications+methods
130130
#define ORTE_JOB_ROOM_NUM (ORTE_JOB_START_KEY + 39) // int - number of remote request's hotel room
131131
#define ORTE_JOB_LAUNCH_PROXY (ORTE_JOB_START_KEY + 40) // opal_process_name_t - name of spawn requestor
132+
#define ORTE_JOB_NSPACE_REGISTERED (ORTE_JOB_START_KEY + 41) // bool - job has been registered with embedded PMIx server
132133

133134
#define ORTE_JOB_MAX_KEY 300
134135

0 commit comments

Comments
 (0)