diff --git a/opal/mca/pmix/pmix4x/pmix/VERSION b/opal/mca/pmix/pmix4x/pmix/VERSION index 6d4ff14995e..bc99541f154 100644 --- a/opal/mca/pmix/pmix4x/pmix/VERSION +++ b/opal/mca/pmix/pmix4x/pmix/VERSION @@ -23,14 +23,14 @@ release=0 # The only requirement is that it must be entirely printable ASCII # characters and have no white space. -greek= +greek=a1 # If repo_rev is empty, then the repository version number will be # obtained during "make dist" via the "git describe --tags --always" # command, or with the date (if "git describe" fails) in the form of # "date". -repo_rev=gitfae0ee7d +repo_rev=git0be66492 # If tarball_version is not empty, it is used as the version string in # the tarball filename, regardless of all other versions listed in @@ -44,7 +44,7 @@ tarball_version= # The date when this release was created -date="Dec 28, 2018" +date="Jan 07, 2019" # The shared library version of each of PMIx's public libraries. # These versions are maintained in accordance with the "Library diff --git a/opal/mca/pmix/pmix4x/pmix/contrib/pmix.spec b/opal/mca/pmix/pmix4x/pmix/contrib/pmix.spec index f7a5b6bfbe6..fcf9072131d 100644 --- a/opal/mca/pmix/pmix4x/pmix/contrib/pmix.spec +++ b/opal/mca/pmix/pmix4x/pmix/contrib/pmix.spec @@ -192,7 +192,7 @@ Summary: An extended/exascale implementation of PMI Name: %{?_name:%{_name}}%{!?_name:pmix} -Version: 4.0.0 +Version: 4.0.0a1 Release: 1%{?dist} License: BSD Group: Development/Libraries diff --git a/opal/mca/pmix/pmix4x/pmix/examples/Makefile.am b/opal/mca/pmix/pmix4x/pmix/examples/Makefile.am index c481f883b02..657e271d3f9 100644 --- a/opal/mca/pmix/pmix4x/pmix/examples/Makefile.am +++ b/opal/mca/pmix/pmix4x/pmix/examples/Makefile.am @@ -11,13 +11,14 @@ # All rights reserved. # Copyright (c) 2006-2010 Cisco Systems, Inc. All rights reserved. # Copyright (c) 2012-2013 Los Alamos National Security, Inc. All rights reserved. -# Copyright (c) 2013-2018 Intel, Inc. All rights reserved. +# Copyright (c) 2013-2019 Intel, Inc. All rights reserved. # $COPYRIGHT$ # # Additional copyrights may follow # # $HEADER$ # +headers = examples.h AM_CPPFLAGS = -I$(top_builddir)/src -I$(top_builddir)/src/include -I$(top_builddir)/include -I$(top_builddir)/include/pmix @@ -28,64 +29,64 @@ if !WANT_HIDDEN noinst_PROGRAMS += server endif -client_SOURCES = client.c +client_SOURCES = client.c examples.h client_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) client_LDADD = $(top_builddir)/src/libpmix.la -client2_SOURCES = client2.c +client2_SOURCES = client2.c examples.h client2_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) client2_LDADD = $(top_builddir)/src/libpmix.la -debugger_SOURCES = debugger.c +debugger_SOURCES = debugger.c examples.h debugger_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) debugger_LDADD = $(top_builddir)/src/libpmix.la -debuggerd_SOURCES = debuggerd.c +debuggerd_SOURCES = debuggerd.c examples.h debuggerd_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) debuggerd_LDADD = $(top_builddir)/src/libpmix.la -alloc_SOURCES = alloc.c +alloc_SOURCES = alloc.c examples.h alloc_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) alloc_LDADD = $(top_builddir)/src/libpmix.la -jctrl_SOURCES = jctrl.c +jctrl_SOURCES = jctrl.c examples.h jctrl_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) jctrl_LDADD = $(top_builddir)/src/libpmix.la -dmodex_SOURCES = dmodex.c +dmodex_SOURCES = dmodex.c examples.h dmodex_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) dmodex_LDADD = $(top_builddir)/src/libpmix.la -dynamic_SOURCES = dynamic.c +dynamic_SOURCES = dynamic.c examples.h dynamic_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) dynamic_LDADD = $(top_builddir)/src/libpmix.la -fault_SOURCES = fault.c +fault_SOURCES = fault.c examples.h fault_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) fault_LDADD = $(top_builddir)/src/libpmix.la -pub_SOURCES = pub.c +pub_SOURCES = pub.c examples.h pub_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) pub_LDADD = $(top_builddir)/src/libpmix.la -pubi_SOURCES = pubi.c +pubi_SOURCES = pubi.c examples.h pubi_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) pubi_LDADD = $(top_builddir)/src/libpmix.la -tool_SOURCES = tool.c +tool_SOURCES = tool.c examples.h tool_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) tool_LDADD = $(top_builddir)/src/libpmix.la -group_SOURCES = group.c +group_SOURCES = group.c examples.h group_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) group_LDADD = $(top_builddir)/src/libpmix.la -asyncgroup_SOURCES = asyncgroup.c +asyncgroup_SOURCES = asyncgroup.c examples.h asyncgroup_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) asyncgroup_LDADD = $(top_builddir)/src/libpmix.la if !WANT_HIDDEN -server_SOURCES = server.c +server_SOURCES = server.c examples.h server_LDFLAGS = $(PMIX_PKG_CONFIG_LDFLAGS) server_LDADD = $(top_builddir)/src/libpmix.la endif diff --git a/opal/mca/pmix/pmix4x/pmix/examples/alloc.c b/opal/mca/pmix/pmix4x/pmix/examples/alloc.c index f0cdf43a0ea..6984c1e1e96 100644 --- a/opal/mca/pmix/pmix4x/pmix/examples/alloc.c +++ b/opal/mca/pmix/pmix4x/pmix/examples/alloc.c @@ -13,7 +13,7 @@ * All rights reserved. * Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. - * Copyright (c) 2013-2017 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2019 Intel, Inc. All rights reserved. * Copyright (c) 2015 Mellanox Technologies, Inc. All rights reserved. * $COPYRIGHT$ * @@ -30,16 +30,7 @@ #include #include - -/* define a structure for collecting returned - * info from an allocation request */ -typedef struct { - volatile bool active; - pmix_info_t *info; - size_t ninfo; -} mydata_t; - -static volatile bool waiting_for_allocation = true; +#include "examples.h" /* this is a callback function for the PMIx_Query and * PMIx_Allocate APIs. The query will callback with a status indicating @@ -59,7 +50,7 @@ static void infocbfunc(pmix_status_t status, pmix_release_cbfunc_t release_fn, void *release_cbdata) { - mydata_t *mq = (mydata_t*)cbdata; + myquery_data_t *mq = (myquery_data_t*)cbdata; size_t n; fprintf(stderr, "Allocation request returned %s", PMIx_Error_string(status)); @@ -75,6 +66,9 @@ static void infocbfunc(pmix_status_t status, PMIX_INFO_XFER(&mq->info[n], &info[n]); } } + /* the status returned here indicates whether the requested + * information was found or not - preserve it */ + mq->lock.status = status; /* let the library release the data and cleanup from * the operation */ @@ -83,7 +77,7 @@ static void infocbfunc(pmix_status_t status, } /* release the block */ - mq->active = false; + DEBUG_WAKEUP_THREAD(&mq->lock); } /* this is an event notification function that we explicitly request @@ -100,12 +94,37 @@ static void release_fn(size_t evhdlr_registration_id, pmix_event_notification_cbfunc_fn_t cbfunc, void *cbdata) { + myrel_t *lock; + size_t n; + + /* find the return object */ + lock = NULL; + for (n=0; n < ninfo; n++) { + if (0 == strncmp(info[n].key, PMIX_EVENT_RETURN_OBJECT, PMIX_MAX_KEYLEN)) { + lock = (myrel_t*)info[n].value.data.ptr; + break; + } + } + /* if the object wasn't returned, then that is an error */ + if (NULL == lock) { + fprintf(stderr, "LOCK WASN'T RETURNED IN RELEASE CALLBACK\n"); + /* let the event handler progress */ + if (NULL != cbfunc) { + cbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cbdata); + } + return; + } + /* tell the event handler state machine that we are the last step */ if (NULL != cbfunc) { cbfunc(PMIX_EVENT_ACTION_COMPLETE, NULL, 0, NULL, NULL, cbdata); } - /* flag that the allocation is complete so we can exit */ - waiting_for_allocation = false; + /* the status will be PMIX_ERR_ALLOC_COMPLETE since that is the code + * we registered to receive. The result of the allocation request is + * in the info array - for now, just assume success */ + lock->lock.status = PMIX_SUCCESS; + /* release the lock */ + DEBUG_WAKEUP_THREAD(&lock->lock); } /* event handler registration is done asynchronously because it @@ -114,18 +133,20 @@ static void release_fn(size_t evhdlr_registration_id, * the status of the request (success or an error), plus a numerical index * to the registered event. The index is used later on to deregister * an event handler - if we don't explicitly deregister it, then the - * PMIx server will do so when it see us exit */ + * PMIx server will do so when it sees us exit */ static void evhandler_reg_callbk(pmix_status_t status, size_t evhandler_ref, void *cbdata) { - volatile int *active = (volatile int*)cbdata; + mylock_t *lock = (mylock_t*)cbdata; if (PMIX_SUCCESS != status) { fprintf(stderr, "EVENT HANDLER REGISTRATION FAILED WITH STATUS %d, ref=%lu\n", status, (unsigned long)evhandler_ref); } - *active = status; + lock->status = status; + lock->evhandler_ref = evhandler_ref; + DEBUG_WAKEUP_THREAD(lock); } int main(int argc, char **argv) @@ -138,11 +159,12 @@ int main(int argc, char **argv) uint32_t nprocs; pmix_info_t *info; uint64_t nnodes = 12; - mydata_t mydata; + myquery_data_t mydata; pmix_query_t *query; char *myallocation = "MYALLOCATION"; - volatile int active; - pmix_status_t code = PMIX_NOTIFY_ALLOC_COMPLETE; + mylock_t mylock; + pmix_status_t code; + myrel_t myrel; /* init us */ if (PMIX_SUCCESS != (rc = PMIx_Init(&myproc, NULL, 0))) { @@ -163,63 +185,64 @@ int main(int argc, char **argv) PMIX_VALUE_RELEASE(val); fprintf(stderr, "Client %s:%d universe size %d\n", myproc.nspace, myproc.rank, nprocs); - /* initialize the return info struct */ - mydata.info = NULL; - mydata.ninfo = 0; - if (0 == myproc.rank) { /* try to get an allocation */ - mydata.active = true; + DEBUG_CONSTRUCT_MYQUERY(&mydata); PMIX_INFO_CREATE(info, 2); PMIX_INFO_LOAD(&info[0], PMIX_ALLOC_NUM_NODES, &nnodes, PMIX_UINT64); PMIX_INFO_LOAD(&info[0], PMIX_ALLOC_ID, myallocation, PMIX_STRING); - if (PMIX_SUCCESS != (rc = PMIx_Allocation_request_nb(PMIX_ALLOC_NEW, info, 2, infocbfunc, NULL))) { + if (PMIX_SUCCESS != (rc = PMIx_Allocation_request_nb(PMIX_ALLOC_NEW, info, 2, infocbfunc, &mydata))) { fprintf(stderr, "Client ns %s rank %d: PMIx_Allocation_request_nb failed: %d\n", myproc.nspace, myproc.rank, rc); goto done; } - while (mydata.active) { - usleep(10); - } + DEBUG_WAIT_THREAD(&mydata.lock); PMIX_INFO_FREE(info, 2); - if (NULL != mydata.info) { - PMIX_INFO_FREE(mydata.info, mydata.ninfo); - } + fprintf(stderr, "Client ns %s rank %d: Allocation returned status: %s\n", + myproc.nspace, myproc.rank, PMIx_Error_string(mydata.lock.status)); + DEBUG_DESTRUCT_MYQUERY(&mydata); + } else if (1 == myproc.rank) { - /* register a handler specifically for when the allocation - * operation completes */ - PMIX_INFO_CREATE(info, 1); + /* demonstrate a notification based approach - register a handler + * specifically for when the allocation operation completes */ + DEBUG_CONSTRUCT_MYREL(&myrel); + PMIX_INFO_CREATE(info, 2); PMIX_INFO_LOAD(&info[0], PMIX_ALLOC_ID, myallocation, PMIX_STRING); - active = -1; - PMIx_Register_event_handler(&code, 1, info, 1, - release_fn, evhandler_reg_callbk, (void*)&active); - while (-1 == active) { - usleep(10); - } - if (0 != active) { - exit(active); - } - PMIX_INFO_FREE(info, 1); + PMIX_INFO_LOAD(&info[1], PMIX_EVENT_RETURN_OBJECT, &myrel, PMIX_POINTER); + DEBUG_CONSTRUCT_LOCK(&mylock); + code = PMIX_NOTIFY_ALLOC_COMPLETE; + PMIx_Register_event_handler(&code, 1, info, 2, + release_fn, evhandler_reg_callbk, (void*)&mylock); + DEBUG_WAIT_THREAD(&mylock); + PMIX_INFO_FREE(info, 2); + rc = mylock.status; + DEBUG_DESTRUCT_LOCK(&mylock); + /* now wait to hear that the request is complete */ - while (waiting_for_allocation) { - usleep(10); - } + DEBUG_WAIT_THREAD(&myrel.lock); + fprintf(stderr, "[%s:%d] Allocation returned status: %s\n", + myproc.nspace, myproc.rank, PMIx_Error_string(myrel.lock.status)); + DEBUG_DESTRUCT_MYREL(&myrel); + } else { - /* I am not the root rank, so let me wait a little while and then - * query the status of the allocation request */ + /* demonstrate a query-based approach - wait a little while and ask to + * see if it was done */ usleep(10); + DEBUG_CONSTRUCT_MYQUERY(&mydata); + PMIX_QUERY_CREATE(query, 1); PMIX_ARGV_APPEND(rc, query[0].keys, PMIX_QUERY_ALLOC_STATUS); PMIX_INFO_CREATE(query[0].qualifiers, 1); PMIX_INFO_LOAD(&query[0].qualifiers[0], PMIX_ALLOC_ID, myallocation, PMIX_STRING); - mydata.active = true; + if (PMIX_SUCCESS != (rc = PMIx_Query_info_nb(query, 1, infocbfunc, (void*)&mydata))) { fprintf(stderr, "PMIx_Query_info failed: %d\n", rc); goto done; } - while (mydata.active) { - usleep(10); - } + DEBUG_WAIT_THREAD(&mydata.lock); PMIX_QUERY_FREE(query, 1); + fprintf(stderr, "[%s:%d] Allocation returned status: %s\n", + myproc.nspace, myproc.rank, PMIx_Error_string(mydata.lock.status)); + DEBUG_DESTRUCT_MYQUERY(&mydata); } done: diff --git a/opal/mca/pmix/pmix4x/pmix/examples/client.c b/opal/mca/pmix/pmix4x/pmix/examples/client.c index 7e38608950c..519ef649d62 100644 --- a/opal/mca/pmix/pmix4x/pmix/examples/client.c +++ b/opal/mca/pmix/pmix4x/pmix/examples/client.c @@ -13,7 +13,7 @@ * All rights reserved. * Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. - * Copyright (c) 2013-2018 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2019 Intel, Inc. All rights reserved. * Copyright (c) 2015 Mellanox Technologies, Inc. All rights reserved. * $COPYRIGHT$ * @@ -31,8 +31,8 @@ #include #include +#include "examples.h" -static volatile bool waiting_for_debugger = true; static pmix_proc_t myproc; /* this is the event notification function we pass down below @@ -66,10 +66,36 @@ static void release_fn(size_t evhdlr_registration_id, pmix_event_notification_cbfunc_fn_t cbfunc, void *cbdata) { + myrel_t *lock; + size_t n; + + /* find the return object */ + lock = NULL; + for (n=0; n < ninfo; n++) { + if (0 == strncmp(info[n].key, PMIX_EVENT_RETURN_OBJECT, PMIX_MAX_KEYLEN)) { + lock = (myrel_t*)info[n].value.data.ptr; + break; + } + } + /* if the object wasn't returned, then that is an error */ + if (NULL == lock) { + fprintf(stderr, "LOCK WASN'T RETURNED IN RELEASE CALLBACK\n"); + /* let the event handler progress */ + if (NULL != cbfunc) { + cbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cbdata); + } + return; + } + + /* tell the event handler state machine that we are the last step */ if (NULL != cbfunc) { cbfunc(PMIX_EVENT_ACTION_COMPLETE, NULL, 0, NULL, NULL, cbdata); } - waiting_for_debugger = false; + /* the status will be PMIX_ERR_DEBUGGER_RELEASE since that is the code + * we registered to receive, so just return success */ + lock->lock.status = PMIX_SUCCESS; + /* release the lock */ + DEBUG_WAKEUP_THREAD(&lock->lock); } /* event handler registration is done asynchronously because it @@ -83,27 +109,34 @@ static void evhandler_reg_callbk(pmix_status_t status, size_t evhandler_ref, void *cbdata) { - volatile int *active = (volatile int*)cbdata; + mylock_t *lock = (mylock_t*)cbdata; if (PMIX_SUCCESS != status) { fprintf(stderr, "Client %s:%d EVENT HANDLER REGISTRATION FAILED WITH STATUS %d, ref=%lu\n", myproc.nspace, myproc.rank, status, (unsigned long)evhandler_ref); } - *active = status; + lock->status = status; + lock->evhandler_ref = evhandler_ref; + DEBUG_WAKEUP_THREAD(lock); } int main(int argc, char **argv) { - int rc; + pmix_status_t rc; pmix_value_t value; pmix_value_t *val = &value; - char *tmp, *ptr, *p; + char *tmp; pmix_proc_t proc; - uint32_t nprocs, m, n, local_cnt, *localpeers; + uint32_t nprocs, n; pmix_info_t *info; - bool flag, local; - volatile int active; + bool flag; + mylock_t mylock; + myrel_t myrel; pmix_status_t dbg = PMIX_ERR_DEBUGGER_RELEASE; + pid_t pid; + + pid = getpid(); + fprintf(stderr, "Client %lu: Running\n", (unsigned long)pid); /* init us - note that the call to "init" includes the return of * any job-related info provided by the RM. This includes any @@ -114,20 +147,21 @@ int main(int argc, char **argv) fprintf(stderr, "Client ns %s rank %d: PMIx_Init failed: %d\n", myproc.nspace, myproc.rank, rc); exit(0); } - fprintf(stderr, "Client ns %s rank %d: Running\n", myproc.nspace, myproc.rank); + fprintf(stderr, "Client ns %s rank %d pid %lu: Running\n", myproc.nspace, myproc.rank, (unsigned long)pid); /* register our default event handler - again, this isn't strictly * required, but is generally good practice */ - active = -1; + DEBUG_CONSTRUCT_LOCK(&mylock); PMIx_Register_event_handler(NULL, 0, NULL, 0, - notification_fn, evhandler_reg_callbk, (void*)&active); - while (-1 == active) { - sleep(1); - } - if (0 != active) { + notification_fn, evhandler_reg_callbk, (void*)&mylock); + DEBUG_WAIT_THREAD(&mylock); + rc = mylock.status; + DEBUG_DESTRUCT_LOCK(&mylock); + + if (PMIX_SUCCESS != rc) { fprintf(stderr, "[%s:%d] Default handler registration failed\n", myproc.nspace, myproc.rank); - exit(active); + goto done; } /* job-related info is found in our nspace, assigned to the @@ -145,21 +179,24 @@ int main(int argc, char **argv) * debugger */ if (PMIX_SUCCESS == (rc = PMIx_Get(&proc, PMIX_DEBUG_WAIT_FOR_NOTIFY, NULL, 0, &val))) { /* register for debugger release */ - active = -1; - PMIx_Register_event_handler(&dbg, 1, NULL, 0, - release_fn, evhandler_reg_callbk, (void*)&active); + DEBUG_CONSTRUCT_LOCK(&mylock); + PMIX_INFO_CREATE(info, 1); + DEBUG_CONSTRUCT_MYREL(&myrel); + PMIX_INFO_LOAD(&info[0], PMIX_EVENT_RETURN_OBJECT, &myrel, PMIX_POINTER); + PMIx_Register_event_handler(&dbg, 1, info, 1, + release_fn, evhandler_reg_callbk, (void*)&mylock); /* wait for registration to complete */ - while (-1 == active) { - sleep(1); - } - if (0 != active) { + DEBUG_WAIT_THREAD(&mylock); + rc = mylock.status; + DEBUG_DESTRUCT_LOCK(&mylock); + PMIX_INFO_FREE(info, 1); + if (PMIX_SUCCESS != rc) { fprintf(stderr, "[%s:%d] Debug handler registration failed\n", myproc.nspace, myproc.rank); - exit(active); + goto done; } /* wait for debugger release */ - while (waiting_for_debugger) { - sleep(1); - } + DEBUG_WAIT_THREAD(&myrel.lock); + DEBUG_DESTRUCT_MYREL(&myrel); } /* get our universe size */ @@ -196,7 +233,7 @@ int main(int argc, char **argv) value.type = PMIX_UINT64; value.data.uint64 = 1234; if (PMIX_SUCCESS != (rc = PMIx_Put(PMIX_LOCAL, tmp, &value))) { - fprintf(stderr, "Client ns %s rank %d: PMIx_Put local failed: %d\n", myproc.nspace, myproc.rank, rc); + fprintf(stderr, "Client ns %s rank %d: PMIx_Put internal failed: %d\n", myproc.nspace, myproc.rank, rc); goto done; } free(tmp); @@ -207,7 +244,7 @@ int main(int argc, char **argv) value.type = PMIX_STRING; value.data.string = "1234"; if (PMIX_SUCCESS != (rc = PMIx_Put(PMIX_REMOTE, tmp, &value))) { - fprintf(stderr, "Client ns %s rank %d: PMIx_Put remote failed: %d\n", myproc.nspace, myproc.rank, rc); + fprintf(stderr, "Client ns %s rank %d: PMIx_Put internal failed: %d\n", myproc.nspace, myproc.rank, rc); goto done; } free(tmp); @@ -230,96 +267,53 @@ int main(int argc, char **argv) } PMIX_INFO_FREE(info, 1); - /* get the number of local peers */ - if (PMIX_SUCCESS != (rc = PMIx_Get(&proc, PMIX_LOCAL_SIZE, NULL, 0, &val))) { - fprintf(stderr, "Client ns %s rank %d: PMIx_Get PMIX_LOCAL_SIZE failed: %d", myproc.nspace, myproc.rank, rc); - goto done; - } - local_cnt = val->data.uint32; - PMIX_VALUE_RELEASE(val); - - /* create an array for the peers */ - localpeers = (uint32_t*)malloc(local_cnt * sizeof(int)); - - /* get the list of local peers */ - if (PMIX_SUCCESS != (rc = PMIx_Get(&proc, PMIX_LOCAL_PEERS, NULL, 0, &val))) { - fprintf(stderr, "Client ns %s rank %d: PMIx_Get PMIX_LOCAL_PEERS failed: %d", myproc.nspace, myproc.rank, rc); - goto done; - } - ptr = strdup(val->data.string); - PMIX_VALUE_RELEASE(val); - - /* populate the peers array */ - p = strtok(ptr, ","); - localpeers[0] = strtoul(p, NULL, 10); - for (n=1; n < local_cnt; n++) { - p = strtok(NULL, ","); - localpeers[n] = strtoul(p, NULL, 10); - } - free(ptr); - /* check the returned data */ for (n=0; n < nprocs; n++) { - if (n == myproc.rank) { - continue; + if (0 > asprintf(&tmp, "%s-%d-local", myproc.nspace, myproc.rank)) { + exit(1); + } + if (PMIX_SUCCESS != (rc = PMIx_Get(&myproc, tmp, NULL, 0, &val))) { + fprintf(stderr, "Client ns %s rank %d: PMIx_Get %s failed: %d\n", myproc.nspace, myproc.rank, tmp, rc); + goto done; + } + if (PMIX_UINT64 != val->type) { + fprintf(stderr, "Client ns %s rank %d: PMIx_Get %s returned wrong type: %d\n", myproc.nspace, myproc.rank, tmp, val->type); + PMIX_VALUE_RELEASE(val); + free(tmp); + goto done; + } + if (1234 != val->data.uint64) { + fprintf(stderr, "Client ns %s rank %d: PMIx_Get %s returned wrong value: %d\n", myproc.nspace, myproc.rank, tmp, (int)val->data.uint64); + PMIX_VALUE_RELEASE(val); + free(tmp); + goto done; + } + fprintf(stderr, "Client ns %s rank %d: PMIx_Get %s returned correct\n", myproc.nspace, myproc.rank, tmp); + PMIX_VALUE_RELEASE(val); + free(tmp); + if (0 > asprintf(&tmp, "%s-%d-remote", myproc.nspace, myproc.rank)) { + exit(1); } - proc.rank = n; - local = false; - for (m=0; m < local_cnt; m++) { - if (localpeers[m] == proc.rank) { - local = true; - break; - } + if (PMIX_SUCCESS != (rc = PMIx_Get(&myproc, tmp, NULL, 0, &val))) { + fprintf(stderr, "Client ns %s rank %d: PMIx_Get %s failed: %d\n", myproc.nspace, myproc.rank, tmp, rc); + goto done; } - if (local) { - if (0 > asprintf(&tmp, "%s-%d-local", proc.nspace, proc.rank)) { - exit(1); - } - if (PMIX_SUCCESS != (rc = PMIx_Get(&proc, tmp, NULL, 0, &val))) { - fprintf(stderr, "Client ns %s rank %d: PMIx_Get %s failed: %d\n", myproc.nspace, myproc.rank, tmp, rc); - goto done; - } - if (PMIX_UINT64 != val->type) { - fprintf(stderr, "Client ns %s rank %d: PMIx_Get %s returned wrong type: %d\n", myproc.nspace, myproc.rank, tmp, val->type); - PMIX_VALUE_RELEASE(val); - free(tmp); - goto done; - } - if (1234 != val->data.uint64) { - fprintf(stderr, "Client ns %s rank %d: PMIx_Get %s returned wrong value: %d\n", myproc.nspace, myproc.rank, tmp, (int)val->data.uint64); - PMIX_VALUE_RELEASE(val); - free(tmp); - goto done; - } - fprintf(stderr, "Client ns %s rank %d: PMIx_Get %s returned correct\n", myproc.nspace, myproc.rank, tmp); + if (PMIX_STRING != val->type) { + fprintf(stderr, "Client ns %s rank %d: PMIx_Get %s returned wrong type: %d\n", myproc.nspace, myproc.rank, tmp, val->type); PMIX_VALUE_RELEASE(val); free(tmp); - } else { - if (0 > asprintf(&tmp, "%s-%d-remote", proc.nspace, proc.rank)) { - exit(1); - } - if (PMIX_SUCCESS != (rc = PMIx_Get(&proc, tmp, NULL, 0, &val))) { - fprintf(stderr, "Client ns %s rank %d: PMIx_Get %s failed: %d\n", myproc.nspace, myproc.rank, tmp, rc); - goto done; - } - if (PMIX_STRING != val->type) { - fprintf(stderr, "Client ns %s rank %d: PMIx_Get %s returned wrong type: %d\n", myproc.nspace, myproc.rank, tmp, val->type); - PMIX_VALUE_RELEASE(val); - free(tmp); - goto done; - } - if (0 != strcmp(val->data.string, "1234")) { - fprintf(stderr, "Client ns %s rank %d: PMIx_Get %s returned wrong value: %s\n", myproc.nspace, myproc.rank, tmp, val->data.string); - PMIX_VALUE_RELEASE(val); - free(tmp); - goto done; - } - fprintf(stderr, "Client ns %s rank %d: PMIx_Get %s returned correct\n", myproc.nspace, myproc.rank, tmp); + goto done; + } + if (0 != strcmp(val->data.string, "1234")) { + fprintf(stderr, "Client ns %s rank %d: PMIx_Get %s returned wrong value: %s\n", myproc.nspace, myproc.rank, tmp, val->data.string); PMIX_VALUE_RELEASE(val); free(tmp); + goto done; } + fprintf(stderr, "Client ns %s rank %d: PMIx_Get %s returned correct\n", myproc.nspace, myproc.rank, tmp); + PMIX_VALUE_RELEASE(val); + free(tmp); } - free(localpeers); done: /* finalize us */ diff --git a/opal/mca/pmix/pmix4x/pmix/examples/client2.c b/opal/mca/pmix/pmix4x/pmix/examples/client2.c index 9b7f7e6c0fe..4021c15ec7c 100644 --- a/opal/mca/pmix/pmix4x/pmix/examples/client2.c +++ b/opal/mca/pmix/pmix4x/pmix/examples/client2.c @@ -13,7 +13,7 @@ * All rights reserved. * Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. - * Copyright (c) 2013-2017 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2019 Intel, Inc. All rights reserved. * Copyright (c) 2015 Mellanox Technologies, Inc. All rights reserved. * $COPYRIGHT$ * @@ -31,6 +31,7 @@ #include #include +#include "examples.h" static pmix_proc_t myproc; @@ -62,25 +63,27 @@ static void evhandler_reg_callbk(pmix_status_t status, size_t evhandler_ref, void *cbdata) { - volatile int *active = (volatile int*)cbdata; + mylock_t *lock = (mylock_t*)cbdata; if (PMIX_SUCCESS != status) { fprintf(stderr, "Client %s:%d EVENT HANDLER REGISTRATION FAILED WITH STATUS %d, ref=%lu\n", myproc.nspace, myproc.rank, status, (unsigned long)evhandler_ref); } - *active = status; + lock->status = status; + lock->evhandler_ref = evhandler_ref; + DEBUG_WAKEUP_THREAD(lock); } int main(int argc, char **argv) { - int rc; + pmix_status_t rc; pmix_value_t value; pmix_value_t *val, *vptr; pmix_proc_t proc; uint32_t nprocs, n, k; pmix_info_t *info; bool flag; - volatile int active; + mylock_t mylock; pmix_data_array_t da, *dptr; /* init us - note that the call to "init" includes the return of @@ -97,15 +100,16 @@ int main(int argc, char **argv) /* register our default event handler - again, this isn't strictly * required, but is generally good practice */ - active = -1; + DEBUG_CONSTRUCT_LOCK(&mylock); PMIx_Register_event_handler(NULL, 0, NULL, 0, - notification_fn, evhandler_reg_callbk, (void*)&active); - while (-1 == active) { - sleep(1); - } - if (0 != active) { + notification_fn, evhandler_reg_callbk, (void*)&mylock); + DEBUG_WAIT_THREAD(&mylock); + rc = mylock.status; + DEBUG_DESTRUCT_LOCK(&mylock); + + if (PMIX_SUCCESS != rc) { fprintf(stderr, "[%s:%d] Default handler registration failed\n", myproc.nspace, myproc.rank); - exit(active); + goto done; } /* job-related info is found in our nspace, assigned to the diff --git a/opal/mca/pmix/pmix4x/pmix/examples/dmodex.c b/opal/mca/pmix/pmix4x/pmix/examples/dmodex.c index c093d5bc829..76a1ac8ca0c 100644 --- a/opal/mca/pmix/pmix4x/pmix/examples/dmodex.c +++ b/opal/mca/pmix/pmix4x/pmix/examples/dmodex.c @@ -13,7 +13,7 @@ * All rights reserved. * Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. - * Copyright (c) 2013-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2019 Intel, Inc. All rights reserved. * Copyright (c) 2015 Mellanox Technologies, Inc. All rights reserved. * $COPYRIGHT$ * @@ -33,24 +33,19 @@ #include #include +#include "examples.h" static uint32_t nprocs; static pmix_proc_t myproc; static uint32_t getcount = 0; -#define WAIT_FOR_COMPLETION(a) \ - do { \ - while ((a)) { \ - usleep(10); \ - } \ - } while (0) - static void opcbfunc(pmix_status_t status, void *cbdata) { - bool *active = (bool*)cbdata; + mylock_t *lock = (mylock_t*)cbdata; fprintf(stderr, "%s:%d completed fence_nb\n", myproc.nspace, myproc.rank); - *active = false; + lock->status = status; + DEBUG_WAKEUP_THREAD(lock); } static void valcbfunc(pmix_status_t status, @@ -98,7 +93,7 @@ int main(int argc, char **argv) char *tmp; pmix_proc_t proc; uint32_t n, num_gets; - bool active; + mylock_t mylock; /* init us */ if (PMIX_SUCCESS != (rc = PMIx_Init(&myproc, NULL, 0))) { @@ -170,9 +165,10 @@ int main(int argc, char **argv) PMIX_PROC_CONSTRUCT(&proc); (void)strncpy(proc.nspace, myproc.nspace, PMIX_MAX_NSLEN); proc.rank = PMIX_RANK_WILDCARD; - active = true; - if (PMIX_SUCCESS != (rc = PMIx_Fence_nb(&proc, 1, NULL, 0, opcbfunc, &active))) { + DEBUG_CONSTRUCT_LOCK(&mylock); + if (PMIX_SUCCESS != (rc = PMIx_Fence_nb(&proc, 1, NULL, 0, opcbfunc, &mylock))) { fprintf(stderr, "Client ns %s rank %d: PMIx_Fence failed: %d\n", myproc.nspace, myproc.rank, rc); + DEBUG_DESTRUCT_LOCK(&mylock); goto done; } @@ -203,7 +199,7 @@ int main(int argc, char **argv) } /* wait for the first fence to finish */ - WAIT_FOR_COMPLETION(active); + DEBUG_WAIT_THREAD(&mylock); /* wait for all my "get" calls to complete */ while (getcount < num_gets) { diff --git a/opal/mca/pmix/pmix4x/pmix/examples/dynamic.c b/opal/mca/pmix/pmix4x/pmix/examples/dynamic.c index 4ffe501f02f..d7c0d3701ff 100644 --- a/opal/mca/pmix/pmix4x/pmix/examples/dynamic.c +++ b/opal/mca/pmix/pmix4x/pmix/examples/dynamic.c @@ -13,7 +13,7 @@ * All rights reserved. * Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. - * Copyright (c) 2013-2018 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2019 Intel, Inc. All rights reserved. * Copyright (c) 2015 Mellanox Technologies, Inc. All rights reserved. * Copyright (c) 2016 Research Organization for Information Science * and Technology (RIST). All rights reserved. @@ -35,7 +35,7 @@ #include #include - +#include "examples.h" static pmix_proc_t myproc; diff --git a/opal/mca/pmix/pmix4x/pmix/examples/examples.h b/opal/mca/pmix/pmix4x/pmix/examples/examples.h new file mode 100644 index 00000000000..d2e5ab7696d --- /dev/null +++ b/opal/mca/pmix/pmix4x/pmix/examples/examples.h @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2011 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2005 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2006-2013 Los Alamos National Security, LLC. + * All rights reserved. + * Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. + * Copyright (c) 2013-2019 Intel, Inc. All rights reserved. + * Copyright (c) 2015 Mellanox Technologies, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + * + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include + +#include + +typedef struct { + pthread_mutex_t mutex; + pthread_cond_t cond; + volatile bool active; + pmix_status_t status; + int count; + size_t evhandler_ref; +} mylock_t; + +#define DEBUG_CONSTRUCT_LOCK(l) \ + do { \ + pthread_mutex_init(&(l)->mutex, NULL); \ + pthread_cond_init(&(l)->cond, NULL); \ + (l)->active = true; \ + (l)->status = PMIX_SUCCESS; \ + (l)->count = 0; \ + (l)->evhandler_ref = 0; \ + } while(0) + +#define DEBUG_DESTRUCT_LOCK(l) \ + do { \ + pthread_mutex_destroy(&(l)->mutex); \ + pthread_cond_destroy(&(l)->cond); \ + } while(0) + +#define DEBUG_WAIT_THREAD(lck) \ + do { \ + pthread_mutex_lock(&(lck)->mutex); \ + while ((lck)->active) { \ + pthread_cond_wait(&(lck)->cond, &(lck)->mutex); \ + } \ + pthread_mutex_unlock(&(lck)->mutex); \ + } while(0) + +#define DEBUG_WAKEUP_THREAD(lck) \ + do { \ + pthread_mutex_lock(&(lck)->mutex); \ + (lck)->active = false; \ + pthread_cond_broadcast(&(lck)->cond); \ + pthread_mutex_unlock(&(lck)->mutex); \ + } while(0) + +/* define a structure for collecting returned + * info from a query */ +typedef struct { + mylock_t lock; + pmix_info_t *info; + size_t ninfo; +} myquery_data_t; + +#define DEBUG_CONSTRUCT_MYQUERY(q) \ + do { \ + DEBUG_CONSTRUCT_LOCK(&((q)->lock)); \ + (q)->info = NULL; \ + (q)->ninfo = 0; \ + } while(0) + +#define DEBUG_DESTRUCT_MYQUERY(q) \ + do { \ + DEBUG_DESTRUCT_LOCK(&((q)->lock)); \ + if (NULL != (q)->info) { \ + PMIX_INFO_FREE((q)->info, (q)->ninfo); \ + } \ + } while(0) + +/* define a structure for releasing when a given + * nspace terminates */ +typedef struct { + mylock_t lock; + char *nspace; + int exit_code; + bool exit_code_given; +} myrel_t; + + +#define DEBUG_CONSTRUCT_MYREL(r) \ + do { \ + DEBUG_CONSTRUCT_LOCK(&((r)->lock)); \ + (r)->nspace = NULL; \ + (r)->exit_code = 0; \ + (r)->exit_code_given = false; \ + } while(0) + +#define DEBUG_DESTRUCT_MYREL(r) \ + do { \ + DEBUG_DESTRUCT_LOCK(&((r)->lock)); \ + if (NULL != (r)->nspace) { \ + free((r)->nspace); \ + } \ + } while(0) diff --git a/opal/mca/pmix/pmix4x/pmix/examples/fault.c b/opal/mca/pmix/pmix4x/pmix/examples/fault.c index a197f589619..abab3886816 100644 --- a/opal/mca/pmix/pmix4x/pmix/examples/fault.c +++ b/opal/mca/pmix/pmix4x/pmix/examples/fault.c @@ -13,7 +13,7 @@ * All rights reserved. * Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. - * Copyright (c) 2013-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2019 Intel, Inc. All rights reserved. * Copyright (c) 2015 Mellanox Technologies, Inc. All rights reserved. * $COPYRIGHT$ * @@ -28,11 +28,12 @@ #include #include #include +#include #include +#include "examples.h" static pmix_proc_t myproc; -static bool completed; static void notification_fn(size_t evhdlr_registration_id, pmix_status_t status, @@ -42,22 +43,67 @@ static void notification_fn(size_t evhdlr_registration_id, pmix_event_notification_cbfunc_fn_t cbfunc, void *cbdata) { - fprintf(stderr, "Client %s:%d NOTIFIED with status %d\n", myproc.nspace, myproc.rank, status); - completed = true; + myrel_t *lock; + bool found; + int exit_code; + size_t n; + pmix_proc_t *affected = NULL; + + /* find our return object */ + lock = NULL; + found = false; + for (n=0; n < ninfo; n++) { + if (0 == strncmp(info[n].key, PMIX_EVENT_RETURN_OBJECT, PMIX_MAX_KEYLEN)) { + lock = (myrel_t*)info[n].value.data.ptr; + /* not every RM will provide an exit code, but check if one was given */ + } else if (0 == strncmp(info[n].key, PMIX_EXIT_CODE, PMIX_MAX_KEYLEN)) { + exit_code = info[n].value.data.integer; + found = true; + } else if (0 == strncmp(info[n].key, PMIX_EVENT_AFFECTED_PROC, PMIX_MAX_KEYLEN)) { + affected = info[n].value.data.proc; + } + } + /* if the object wasn't returned, then that is an error */ + if (NULL == lock) { + fprintf(stderr, "LOCK WASN'T RETURNED IN RELEASE CALLBACK\n"); + /* let the event handler progress */ + if (NULL != cbfunc) { + cbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cbdata); + } + return; + } + + /* tell the event handler state machine that we are the last step */ + if (NULL != cbfunc) { + cbfunc(PMIX_EVENT_ACTION_COMPLETE, NULL, 0, NULL, NULL, cbdata); + } + fprintf(stderr, "DEBUGGER DAEMON NOTIFIED TERMINATED - AFFECTED %s\n", + (NULL == affected) ? "NULL" : affected->nspace); + + if (found) { + lock->exit_code = exit_code; + lock->exit_code_given = true; + } + DEBUG_WAKEUP_THREAD(&lock->lock); } static void op_callbk(pmix_status_t status, void *cbdata) { + mylock_t *lock = (mylock_t*)cbdata; fprintf(stderr, "Client %s:%d OP CALLBACK CALLED WITH STATUS %d\n", myproc.nspace, myproc.rank, status); + DEBUG_WAKEUP_THREAD(lock); } -static void errhandler_reg_callbk(pmix_status_t status, +static void evhandler_reg_callbk(pmix_status_t status, size_t errhandler_ref, void *cbdata) { + mylock_t *lock = (mylock_t*)cbdata; + fprintf(stderr, "Client %s:%d ERRHANDLER REGISTRATION CALLBACK CALLED WITH STATUS %d, ref=%lu\n", myproc.nspace, myproc.rank, status, (unsigned long)errhandler_ref); + DEBUG_WAKEUP_THREAD(lock); } int main(int argc, char **argv) @@ -67,6 +113,10 @@ int main(int argc, char **argv) pmix_value_t *val = &value; pmix_proc_t proc; uint32_t nprocs; + pmix_info_t *info; + mylock_t mylock; + myrel_t myrel; + pmix_status_t code[2] = {PMIX_ERR_PROC_ABORTED, PMIX_ERR_JOB_TERMINATED}; /* init us */ if (PMIX_SUCCESS != (rc = PMIx_Init(&myproc, NULL, 0))) { @@ -87,11 +137,27 @@ int main(int argc, char **argv) nprocs = val->data.uint32; PMIX_VALUE_RELEASE(val); fprintf(stderr, "Client %s:%d universe size %d\n", myproc.nspace, myproc.rank, nprocs); - completed = false; - /* register our errhandler */ - PMIx_Register_event_handler(NULL, 0, NULL, 0, - notification_fn, errhandler_reg_callbk, NULL); + /* register another handler specifically for when the target + * job completes */ + DEBUG_CONSTRUCT_MYREL(&myrel); + PMIX_INFO_CREATE(info, 2); + PMIX_INFO_LOAD(&info[0], PMIX_EVENT_RETURN_OBJECT, &myrel, PMIX_POINTER); + /* only call me back when one of us terminates */ + PMIX_INFO_LOAD(&info[1], PMIX_NSPACE, myproc.nspace, PMIX_STRING); + + DEBUG_CONSTRUCT_LOCK(&mylock); + PMIx_Register_event_handler(code, 2, info, 2, + notification_fn, evhandler_reg_callbk, (void*)&mylock); + DEBUG_WAIT_THREAD(&mylock); + if (PMIX_SUCCESS != mylock.status) { + rc = mylock.status; + DEBUG_DESTRUCT_LOCK(&mylock); + PMIX_INFO_FREE(info, 2); + goto done; + } + DEBUG_DESTRUCT_LOCK(&mylock); + PMIX_INFO_FREE(info, 2); /* call fence to sync */ PMIX_PROC_CONSTRUCT(&proc); @@ -109,17 +175,16 @@ int main(int argc, char **argv) exit(1); } /* everyone simply waits */ - while (!completed) { - struct timespec ts; - ts.tv_sec = 0; - ts.tv_nsec = 100000; - nanosleep(&ts, NULL); - } + DEBUG_WAIT_THREAD(&myrel.lock); + DEBUG_DESTRUCT_MYREL(&myrel); done: /* finalize us */ fprintf(stderr, "Client ns %s rank %d: Finalizing\n", myproc.nspace, myproc.rank); - PMIx_Deregister_event_handler(1, op_callbk, NULL); + DEBUG_CONSTRUCT_LOCK(&mylock); + PMIx_Deregister_event_handler(1, op_callbk, &mylock); + DEBUG_WAIT_THREAD(&mylock); + DEBUG_DESTRUCT_LOCK(&mylock); if (PMIX_SUCCESS != (rc = PMIx_Finalize(NULL, 0))) { fprintf(stderr, "Client ns %s rank %d:PMIx_Finalize failed: %d\n", myproc.nspace, myproc.rank, rc); diff --git a/opal/mca/pmix/pmix4x/pmix/examples/jctrl.c b/opal/mca/pmix/pmix4x/pmix/examples/jctrl.c index 5c1c1d1f73d..ee090bd4db4 100644 --- a/opal/mca/pmix/pmix4x/pmix/examples/jctrl.c +++ b/opal/mca/pmix/pmix4x/pmix/examples/jctrl.c @@ -13,7 +13,7 @@ * All rights reserved. * Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. - * Copyright (c) 2013-2017 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2019 Intel, Inc. All rights reserved. * Copyright (c) 2015 Mellanox Technologies, Inc. All rights reserved. * $COPYRIGHT$ * @@ -32,6 +32,7 @@ #include #include +#include "examples.h" static pmix_proc_t myproc; @@ -63,13 +64,15 @@ static void evhandler_reg_callbk(pmix_status_t status, size_t evhandler_ref, void *cbdata) { - volatile int *active = (volatile int*)cbdata; + mylock_t *lock = (mylock_t*)cbdata; if (PMIX_SUCCESS != status) { fprintf(stderr, "Client %s:%d EVENT HANDLER REGISTRATION FAILED WITH STATUS %d, ref=%lu\n", myproc.nspace, myproc.rank, status, (unsigned long)evhandler_ref); } - *active = status; + lock->status = status; + lock->evhandler_ref = evhandler_ref; + DEBUG_WAKEUP_THREAD(lock); } static void infocbfunc(pmix_status_t status, @@ -78,26 +81,27 @@ static void infocbfunc(pmix_status_t status, pmix_release_cbfunc_t release_fn, void *release_cbdata) { - volatile int *active = (volatile int*)cbdata; + mylock_t *lock = (mylock_t*)cbdata; /* release the caller */ if (NULL != release_fn) { release_fn(release_cbdata); } - *active = status; + lock->status = status; + DEBUG_WAKEUP_THREAD(lock); } int main(int argc, char **argv) { - int rc; + pmix_status_t rc; pmix_value_t value; pmix_value_t *val = &value; pmix_proc_t proc; uint32_t nprocs, n; pmix_info_t *info, *iptr; bool flag; - volatile int active; + mylock_t mylock; pmix_data_array_t *dptr; /* init us - note that the call to "init" includes the return of @@ -111,15 +115,16 @@ int main(int argc, char **argv) /* register our default event handler - again, this isn't strictly * required, but is generally good practice */ - active = -1; + DEBUG_CONSTRUCT_LOCK(&mylock); PMIx_Register_event_handler(NULL, 0, NULL, 0, - notification_fn, evhandler_reg_callbk, (void*)&active); - while (-1 == active) { - sleep(1); - } - if (0 != active) { + notification_fn, evhandler_reg_callbk, (void*)&mylock); + /* wait for registration to complete */ + DEBUG_WAIT_THREAD(&mylock); + rc = mylock.status; + DEBUG_DESTRUCT_LOCK(&mylock); + if (PMIX_SUCCESS != rc) { fprintf(stderr, "[%s:%d] Default handler registration failed\n", myproc.nspace, myproc.rank); - exit(active); + goto done; } /* job-related info is found in our nspace, assigned to the @@ -159,18 +164,19 @@ int main(int argc, char **argv) /* since this is informational and not a requested operation, the target parameter * doesn't mean anything and can be ignored */ - active = -1; - if (PMIX_SUCCESS != (rc = PMIx_Job_control_nb(NULL, 0, info, 2, infocbfunc, (void*)&active))) { + DEBUG_CONSTRUCT_LOCK(&mylock); + if (PMIX_SUCCESS != (rc = PMIx_Job_control_nb(NULL, 0, info, 2, infocbfunc, (void*)&mylock))) { fprintf(stderr, "Client ns %s rank %d: PMIx_Job_control_nb failed: %d\n", myproc.nspace, myproc.rank, rc); + DEBUG_DESTRUCT_LOCK(&mylock); goto done; } - while (-1 == active) { - sleep(1); - } + DEBUG_WAIT_THREAD(&mylock); PMIX_INFO_FREE(info, 2); - if (0 != active) { + rc = mylock.status; + DEBUG_DESTRUCT_LOCK(&mylock); + if (PMIX_SUCCESS != rc) { fprintf(stderr, "Client ns %s rank %d: PMIx_Job_control_nb failed: %d\n", myproc.nspace, myproc.rank, rc); - exit(active); + goto done; } /* now request that this process be monitored using heartbeats */ @@ -185,20 +191,21 @@ int main(int argc, char **argv) PMIX_INFO_LOAD(&info[2], PMIX_MONITOR_HEARTBEAT_DROPS, &n, PMIX_UINT32); /* make the request */ - active = -1; + DEBUG_CONSTRUCT_LOCK(&mylock); if (PMIX_SUCCESS != (rc = PMIx_Process_monitor_nb(iptr, PMIX_MONITOR_HEARTBEAT_ALERT, - info, 3, infocbfunc, (void*)&active))) { + info, 3, infocbfunc, (void*)&mylock))) { fprintf(stderr, "Client ns %s rank %d: PMIx_Process_monitor_nb failed: %d\n", myproc.nspace, myproc.rank, rc); + DEBUG_DESTRUCT_LOCK(&mylock); goto done; } - while (-1 == active) { - sleep(1); - } + DEBUG_WAIT_THREAD(&mylock); PMIX_INFO_FREE(iptr, 1); PMIX_INFO_FREE(info, 3); - if (0 != active) { + rc = mylock.status; + DEBUG_DESTRUCT_LOCK(&mylock); + if (PMIX_SUCCESS != rc) { fprintf(stderr, "Client ns %s rank %d: PMIx_Process_monitor_nb failed: %d\n", myproc.nspace, myproc.rank, rc); - exit(active); + goto done; } /* send a heartbeat */ diff --git a/opal/mca/pmix/pmix4x/pmix/examples/tool.c b/opal/mca/pmix/pmix4x/pmix/examples/tool.c index 6818e49c5a3..b8a2f247ec4 100644 --- a/opal/mca/pmix/pmix4x/pmix/examples/tool.c +++ b/opal/mca/pmix/pmix4x/pmix/examples/tool.c @@ -13,7 +13,7 @@ * All rights reserved. * Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved. - * Copyright (c) 2013-2016 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2019 Intel, Inc. All rights reserved. * Copyright (c) 2015 Mellanox Technologies, Inc. All rights reserved. * $COPYRIGHT$ * @@ -29,6 +29,7 @@ #include #include +#include "examples.h" static void cbfunc(pmix_status_t status, pmix_info_t *info, size_t ninfo, @@ -36,15 +37,16 @@ static void cbfunc(pmix_status_t status, pmix_release_cbfunc_t release_fn, void *release_cbdata) { - volatile bool *active = (volatile bool*)cbdata; + myquery_data_t *mydata = (myquery_data_t*)cbdata; /* do something with the returned info - it will be * released in the release_fn */ + fprintf(stderr, "Query returned %s\n", PMIx_Error_string(status)); if (NULL != release_fn) { release_fn(release_cbdata); } - *active = false; + DEBUG_WAKEUP_THREAD(&mydata->lock); } int main(int argc, char **argv) @@ -53,13 +55,23 @@ int main(int argc, char **argv) pmix_proc_t myproc; pmix_query_t *query; size_t nq; - volatile bool active; + myquery_data_t mydata; + pmix_info_t info; + + if (argc != 2) { + fprintf(stderr, "Must provide server URI as argument\n"); + exit(1); + } + + PMIX_INFO_LOAD(&info, PMIX_SERVER_URI, argv[1], PMIX_STRING); + fprintf(stderr, "Connecting to %s\n", argv[1]); /* init us */ - if (PMIX_SUCCESS != (rc = PMIx_tool_init(&myproc, NULL, 0))) { + if (PMIX_SUCCESS != (rc = PMIx_tool_init(&myproc, &info, 1))) { fprintf(stderr, "PMIx_tool_init failed: %d\n", rc); exit(rc); } + fprintf(stderr, "Connected\n"); /* query something */ nq = 2; @@ -70,14 +82,12 @@ int main(int argc, char **argv) query[1].keys = (char**)malloc(2 * sizeof(char*)); query[1].keys[0] = strdup("spastic"); query[1].keys[1] = NULL; - active = true; - if (PMIX_SUCCESS != (rc = PMIx_Query_info_nb(query, nq, cbfunc, (void*)&active))) { + DEBUG_CONSTRUCT_MYQUERY(&mydata); + if (PMIX_SUCCESS != (rc = PMIx_Query_info_nb(query, nq, cbfunc, (void*)&mydata))) { fprintf(stderr, "Client ns %s rank %d: PMIx_Query_info failed: %d\n", myproc.nspace, myproc.rank, rc); goto done; } - while(active) { - usleep(10); - } + DEBUG_WAIT_THREAD(&mydata.lock); done: /* finalize us */ diff --git a/opal/mca/pmix/pmix4x/pmix/include/pmix_common.h.in b/opal/mca/pmix/pmix4x/pmix/include/pmix_common.h.in index 26d8a341649..51ce9d39759 100644 --- a/opal/mca/pmix/pmix4x/pmix/include/pmix_common.h.in +++ b/opal/mca/pmix/pmix4x/pmix/include/pmix_common.h.in @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2013-2018 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2019 Intel, Inc. All rights reserved. * Copyright (c) 2016-2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2016 IBM Corporation. All rights reserved. @@ -109,6 +109,9 @@ typedef uint32_t pmix_rank_t; #define PMIX_RANK_LOCAL_PEERS UINT32_MAX-4 // all peers (i.e., all procs within the same nspace) on local node /* define an invalid value */ #define PMIX_RANK_INVALID UINT32_MAX-3 +/* define a boundary for valid ranks */ +#define PMIX_RANK_VALID UINT32_MAX-50 + /**** PMIX ENVIRONMENTAL PARAMETERS ****/ /* There are a few environmental parameters used by PMIx for @@ -445,6 +448,49 @@ typedef uint32_t pmix_rank_t; #define PMIX_QUERY_PSET_NAMES "pmix.qry.psets" // (char*) return a comma-delimited list of the names of the // psets defined in the specified range (defaults to session) +/* information retrieval attributes */ +#define PMIX_SESSION_INFO "pmix.ssn.info" // (bool) Return information about the specified session. If information + // about a session other than the one containing the requesting + // process is desired, then the attribute array must contain a + // PMIX_SESSION_ID attribute identifying the desired target. +#define PMIX_JOB_INFO "pmix.job.info" // (bool) Return information about the specified job or namespace. If + // information about a job or namespace other than the one containing + // the requesting process is desired, then the attribute array must + // contain a PMIX_JOBID or PMIX_NSPACE attribute identifying the + // desired target. Similarly, if information is requested about a + // job or namespace in a session other than the one containing the + // requesting process, then an attribute identifying the target + // session must be provided. +#define PMIX_APP_INFO "pmix.app.info" // (bool) Return information about the specified application. If information + // about an application other than the one containing the requesting + // process is desired, then the attribute array must contain a + // PMIX_APPNUM attribute identifying the desired target. Similarly, + // if information is requested about an application in a job or session + // other than the one containing the requesting process, then attributes + // identifying the target job and/or session must be provided. +#define PMIX_NODE_INFO "pmix.node.info" // (bool) Return information about the specified node. If information about a + // node other than the one containing the requesting process is desired, + // then the attribute array must contain either the PMIX_NODEID or + // PMIX_HOSTNAME attribute identifying the desired target. + +/* information storage attributes */ +#define PMIX_SESSION_INFO_ARRAY "pmix.ssn.arr" // (pmix_data_array_t) Provide an array of pmix_info_t containing + // session-level information. The PMIX_SESSION_ID attribute is required + // to be included in the array. +#define PMIX_JOB_INFO_ARRAY "pmix.job.arr" // (pmix_data_array_t) Provide an array of pmix_info_t containing job-level + // information. Information is registered one job (aka namespace) at a time + // via the PMIx_server_register_nspace API. Thus, there is no requirement that + // the array contain either the PMIX_NSPACE or PMIX_JOBID attributes, though + // either or both of them may be included. +#define PMIX_APP_INFO_ARRAY "pmix.app.arr" // (pmix_data_array_t) Provide an array of pmix_info_t containing app-level + // information. The PMIX_NSPACE or PMIX_JOBID attributes of the job containing + // the appplication, plus its PMIX_APPNUM attribute, are required to be + // included in the array. +#define PMIX_NODE_INFO_ARRAY "pmix.node.arr" // (pmix_data_array_t) Provide an array of pmix_info_t containing node-level + // information. At a minimum, either the PMIX_NODEID or PMIX_HOSTNAME + // attribute is required to be included in the array, though both may be + // included. + /* log attributes */ #define PMIX_LOG_SOURCE "pmix.log.source" // (pmix_proc_t*) ID of source of the log request #define PMIX_LOG_STDERR "pmix.log.stderr" // (char*) log string to stderr @@ -1047,16 +1093,18 @@ typedef struct pmix_byte_object { } \ } while(0) -#define PMIX_BYTE_OBJECT_FREE(m, n) \ - do { \ - size_t _n; \ - for (_n=0; _n < n; _n++) { \ - if (NULL != (m)[_n].bytes) { \ - free((m)[_n].bytes); \ - } \ - } \ - free((m)); \ - (m) = NULL; \ +#define PMIX_BYTE_OBJECT_FREE(m, n) \ + do { \ + size_t _n; \ + if (NULL != (m)) { \ + for (_n=0; _n < n; _n++) { \ + if (NULL != (m)[_n].bytes) { \ + free((m)[_n].bytes); \ + } \ + } \ + free((m)); \ + (m) = NULL; \ + } \ } while(0) #define PMIX_BYTE_OBJECT_LOAD(b, d, s) \ diff --git a/opal/mca/pmix/pmix4x/pmix/src/common/pmix_control.c b/opal/mca/pmix/pmix4x/pmix/src/common/pmix_control.c index 615db82630b..1c2f74308a0 100644 --- a/opal/mca/pmix/pmix4x/pmix/src/common/pmix_control.c +++ b/opal/mca/pmix/pmix4x/pmix/src/common/pmix_control.c @@ -4,6 +4,8 @@ * Copyright (c) 2016 Mellanox Technologies, Inc. * All rights reserved. * Copyright (c) 2016 IBM Corporation. All rights reserved. + * Copyright (c) 2019 Research Organization for Information Science + * and Technology (RIST). All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -105,6 +107,8 @@ static void query_cbfunc(struct pmix_peer_t *peer, /* release the caller */ if (NULL != cd->cbfunc) { cd->cbfunc(results->status, results->info, results->ninfo, cd->cbdata, relcbfunc, results); + } else { + PMIX_RELEASE(results); } PMIX_RELEASE(cd); } diff --git a/opal/mca/pmix/pmix4x/pmix/src/common/pmix_query.c b/opal/mca/pmix/pmix4x/pmix/src/common/pmix_query.c index b16c4f1de40..ce8269c1aee 100644 --- a/opal/mca/pmix/pmix4x/pmix/src/common/pmix_query.c +++ b/opal/mca/pmix/pmix4x/pmix/src/common/pmix_query.c @@ -140,7 +140,7 @@ PMIX_EXPORT pmix_status_t PMIx_Query_info_nb(pmix_query_t queries[], size_t nque pmix_buffer_t *msg; pmix_status_t rc; pmix_cb_t cb; - size_t n, m, p; + size_t n, p; pmix_list_t results; pmix_kval_t *kv, *kvnxt; pmix_proc_t proc; @@ -172,21 +172,19 @@ PMIX_EXPORT pmix_status_t PMIx_Query_info_nb(pmix_query_t queries[], size_t nque memset(proc.nspace, 0, PMIX_MAX_NSLEN+1); proc.rank = PMIX_RANK_INVALID; for (n=0; n < nqueries; n++) { - for (m=0; m < queries[n].nqual; m++) { - if (NULL != queries[n].qualifiers) { - for (p=0; p < queries[n].nqual; p++) { - if (PMIX_CHECK_KEY(&queries[n].qualifiers[p], PMIX_QUERY_REFRESH_CACHE)) { - PMIX_LIST_DESTRUCT(&results); - goto query; - } else if (PMIX_CHECK_KEY(&queries[n].qualifiers[p], PMIX_PROCID)) { - PMIX_LOAD_NSPACE(proc.nspace, queries[n].qualifiers[p].value.data.proc->nspace); - proc.rank = queries[n].qualifiers[p].value.data.proc->rank; - } else if (PMIX_CHECK_KEY(&queries[n].qualifiers[p], PMIX_NSPACE)) { - PMIX_LOAD_NSPACE(proc.nspace, queries[n].qualifiers[p].value.data.string); - } else if (PMIX_CHECK_KEY(&queries[n].qualifiers[p], PMIX_RANK)) { - proc.rank = queries[n].qualifiers[p].value.data.rank; - } + for (p=0; p < queries[n].nqual; p++) { + if (PMIX_CHECK_KEY(&queries[n].qualifiers[p], PMIX_QUERY_REFRESH_CACHE)) { + if (PMIX_INFO_TRUE(&queries[n].qualifiers[p])) { + PMIX_LIST_DESTRUCT(&results); + goto query; } + } else if (PMIX_CHECK_KEY(&queries[n].qualifiers[p], PMIX_PROCID)) { + PMIX_LOAD_NSPACE(proc.nspace, queries[n].qualifiers[p].value.data.proc->nspace); + proc.rank = queries[n].qualifiers[p].value.data.proc->rank; + } else if (PMIX_CHECK_KEY(&queries[n].qualifiers[p], PMIX_NSPACE)) { + PMIX_LOAD_NSPACE(proc.nspace, queries[n].qualifiers[p].value.data.string); + } else if (PMIX_CHECK_KEY(&queries[n].qualifiers[p], PMIX_RANK)) { + proc.rank = queries[n].qualifiers[p].value.data.rank; } } /* we get here if a refresh isn't required - first try a local @@ -255,6 +253,7 @@ PMIX_EXPORT pmix_status_t PMIx_Query_info_nb(pmix_query_t queries[], size_t nque /* regardless of the result of the query, we return * PMIX_SUCCESS here to indicate that the operation * was accepted for processing */ + PMIX_RELEASE_THREAD(&pmix_global_lock); return PMIX_SUCCESS; @@ -270,10 +269,10 @@ PMIX_EXPORT pmix_status_t PMIx_Query_info_nb(pmix_query_t queries[], size_t nque } pmix_output_verbose(2, pmix_globals.debug_output, "pmix:query handed to RM"); - pmix_host_server.query(&pmix_globals.myid, - queries, nqueries, - cbfunc, cbdata); - return PMIX_SUCCESS; + rc = pmix_host_server.query(&pmix_globals.myid, + queries, nqueries, + cbfunc, cbdata); + return rc; } /* if we aren't connected, don't attempt to send */ diff --git a/opal/mca/pmix/pmix4x/pmix/src/event/pmix_event_notification.c b/opal/mca/pmix/pmix4x/pmix/src/event/pmix_event_notification.c index 76ca93b505e..988b94d1fa2 100644 --- a/opal/mca/pmix/pmix4x/pmix/src/event/pmix_event_notification.c +++ b/opal/mca/pmix/pmix4x/pmix/src/event/pmix_event_notification.c @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2018 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2019 Intel, Inc. All rights reserved. * Copyright (c) 2017-2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2017 IBM Corporation. All rights reserved. @@ -134,11 +134,11 @@ static pmix_status_t notify_event_cache(pmix_notify_caddy_t *cd) } /* check the age */ if (0 == j) { - etime = cd->ts; + etime = pk->ts; idx = j; } else { - if (difftime(cd->ts, etime) < 0) { - etime = cd->ts; + if (difftime(pk->ts, etime) < 0) { + etime = pk->ts; idx = j; } } @@ -165,8 +165,9 @@ static pmix_status_t notify_server_of_event(pmix_status_t status, pmix_cmd_t cmd = PMIX_NOTIFY_CMD; pmix_cb_t *cb; pmix_event_chain_t *chain; - size_t n; + size_t n, nleft; pmix_notify_caddy_t *cd; + pmix_namespace_t *nptr, *tmp; pmix_output_verbose(2, pmix_client_globals.event_output, "[%s:%d] client: notifying server %s:%d of status %s for range %s", @@ -252,6 +253,31 @@ static pmix_status_t notify_server_of_event(pmix_status_t status, cd->ntargets = chain->ntargets; PMIX_PROC_CREATE(cd->targets, cd->ntargets); memcpy(cd->targets, chain->targets, cd->ntargets * sizeof(pmix_proc_t)); + /* compute the number of targets that need to be notified */ + nleft = 0; + for (n=0; n < cd->ntargets; n++) { + /* if this is a single proc, then increment by one */ + if (PMIX_RANK_VALID >= cd->targets[n].rank) { + ++nleft; + } else { + /* look up the nspace for this proc */ + nptr = NULL; + PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) { + if (PMIX_CHECK_NSPACE(tmp->nspace, cd->targets[n].nspace)) { + nptr = tmp; + break; + } + } + /* if we don't yet know it, then nothing to do */ + if (NULL == nptr) { + nleft = SIZE_MAX; + break; + } + /* might notify all local members */ + nleft += nptr->nlocalprocs; + } + } + cd->nleft = nleft; } if (NULL != chain->affected) { cd->naffected = chain->naffected; @@ -810,13 +836,16 @@ static void _notify_client_event(int sd, short args, void *cbdata) pmix_regevents_info_t *reginfoptr; pmix_peer_events_info_t *pr; pmix_event_chain_t *chain; - size_t n; + size_t n, nleft; bool matched, holdcd; pmix_buffer_t *bfr; pmix_cmd_t cmd = PMIX_NOTIFY_CMD; pmix_status_t rc; pmix_list_t trk; pmix_namelist_t *nm; + pmix_namespace_t *nptr, *tmp; + pmix_range_trkr_t rngtrk; + pmix_proc_t proc; /* need to acquire the object from its originating thread */ PMIX_ACQUIRE_OBJECT(cd); @@ -871,6 +900,31 @@ static void _notify_client_event(int sd, short args, void *cbdata) cd->ntargets = chain->ntargets; PMIX_PROC_CREATE(cd->targets, cd->ntargets); memcpy(cd->targets, chain->targets, cd->ntargets * sizeof(pmix_proc_t)); + /* compute the number of targets that need to be notified */ + nleft = 0; + for (n=0; n < cd->ntargets; n++) { + /* if this is a single proc, then increment by one */ + if (PMIX_RANK_VALID >= cd->targets[n].rank) { + ++nleft; + } else { + /* look up the nspace for this proc */ + nptr = NULL; + PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) { + if (PMIX_CHECK_NSPACE(tmp->nspace, cd->targets[n].nspace)) { + nptr = tmp; + break; + } + } + /* if we don't yet know it, then nothing to do */ + if (NULL == nptr) { + nleft = SIZE_MAX; + break; + } + /* might notify all local members */ + nleft += nptr->nlocalprocs; + } + } + cd->nleft = nleft; } if (NULL != chain->affected) { cd->naffected = chain->naffected; @@ -944,9 +998,12 @@ static void _notify_client_event(int sd, short args, void *cbdata) memcpy(grp->members, cd->targets, cd->ntargets * sizeof(pmix_proc_t)); pmix_list_append(&pmix_server_globals.groups, &grp->super); } + holdcd = false; if (PMIX_RANGE_PROC_LOCAL != cd->range) { PMIX_CONSTRUCT(&trk, pmix_list_t); + rngtrk.procs = NULL; + rngtrk.nprocs = 0; /* cycle across our registered events and send the message to * any client who registered for it */ PMIX_LIST_FOREACH(reginfoptr, &pmix_server_globals.events, pmix_regevents_info_t) { @@ -970,12 +1027,25 @@ static void _notify_client_event(int sd, short args, void *cbdata) if (matched) { continue; } + /* check the range */ + rngtrk.range = cd->range; + PMIX_LOAD_PROCID(&proc, pr->peer->info->pname.nspace, pr->peer->info->pname.rank); + if (!pmix_notify_check_range(&rngtrk, &proc)) { + continue; + } /* if we were given specific targets, check if this is one */ if (NULL != cd->targets) { matched = false; for (n=0; n < cd->ntargets; n++) { if (PMIX_CHECK_PROCID(&pr->peer->info->pname, &cd->targets[n])) { matched = true; + /* track the number of targets we have left to notify */ + --cd->nleft; + /* if this is the last one, then evict this event + * from the cache */ + if (0 == cd->nleft) { + pmix_hotel_checkout(&pmix_globals.notifications, cd->room); + } break; } } diff --git a/opal/mca/pmix/pmix4x/pmix/src/event/pmix_event_registration.c b/opal/mca/pmix/pmix4x/pmix/src/event/pmix_event_registration.c index b43b5b3c437..85acadda222 100644 --- a/opal/mca/pmix/pmix4x/pmix/src/event/pmix_event_registration.c +++ b/opal/mca/pmix/pmix4x/pmix/src/event/pmix_event_registration.c @@ -388,15 +388,11 @@ static void check_cached_events(pmix_rshift_caddy_t *cd) if (!found) { continue; } - /* if we were given specific targets, check if we are one */ + /* if we were given specific targets, check if we are one */ if (NULL != ncd->targets) { matched = false; for (n=0; n < ncd->ntargets; n++) { - if (0 != strncmp(pmix_globals.myid.nspace, ncd->targets[n].nspace, PMIX_MAX_NSLEN)) { - continue; - } - if (PMIX_RANK_WILDCARD == ncd->targets[n].rank || - pmix_globals.myid.rank == ncd->targets[n].rank) { + if (PMIX_CHECK_PROCID(&pmix_globals.myid, &ncd->targets[n])) { matched = true; break; } @@ -446,6 +442,12 @@ static void check_cached_events(pmix_rshift_caddy_t *cd) } } } + /* check this event out of the cache since we + * are processing it */ + pmix_hotel_checkout(&pmix_globals.notifications, ncd->room); + /* release the storage */ + PMIX_RELEASE(ncd); + /* we don't want this chain to propagate, so indicate it * should only be run as a single-shot */ chain->endchain = true; diff --git a/opal/mca/pmix/pmix4x/pmix/src/include/pmix_globals.c b/opal/mca/pmix/pmix4x/pmix/src/include/pmix_globals.c index e0c6c8d34eb..8959ba56845 100644 --- a/opal/mca/pmix/pmix4x/pmix/src/include/pmix_globals.c +++ b/opal/mca/pmix/pmix4x/pmix/src/include/pmix_globals.c @@ -1,9 +1,9 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2018 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2019 Intel, Inc. All rights reserved. * Copyright (c) 2014-2017 Research Organization for Information Science - * Copyright (c) 2014-2018 Intel, Inc. All rights reserved. - * and Technology (RIST). All rights reserved. + * Copyright (c) 2014-2019 Intel, Inc. All rights reserved. + * and Technology (RIST). All rights reserved. * Copyright (c) 2014-2015 Artem Y. Polyakov . * All rights reserved. * Copyright (c) 2016 IBM Corporation. All rights reserved. @@ -360,32 +360,38 @@ void pmix_execute_epilog(pmix_epilog_t *epi) pmix_cleanup_dir_t *cd, *cdnext; struct stat statbuf; int rc; + char **tmp; + size_t n; /* start with any specified files */ PMIX_LIST_FOREACH_SAFE(cf, cfnext, &epi->cleanup_files, pmix_cleanup_file_t) { /* check the effective uid/gid of the file and ensure it * matches that of the peer - we do this to provide at least * some minimum level of protection */ - rc = stat(cf->path, &statbuf); - if (0 != rc) { - pmix_output_verbose(10, pmix_globals.debug_output, - "File %s failed to stat: %d", cf->path, rc); - continue; - } - if (statbuf.st_uid != epi->uid || - statbuf.st_gid != epi->gid) { - pmix_output_verbose(10, pmix_globals.debug_output, - "File %s uid/gid doesn't match: uid %lu(%lu) gid %lu(%lu)", - cf->path, - (unsigned long)statbuf.st_uid, (unsigned long)epi->uid, - (unsigned long)statbuf.st_gid, (unsigned long)epi->gid); - continue; - } - rc = unlink(cf->path); - if (0 != rc) { - pmix_output_verbose(10, pmix_globals.debug_output, - "File %s failed to unlink: %d", cf->path, rc); + tmp = pmix_argv_split(cf->path, ','); + for (n=0; NULL != tmp[n]; n++) { + rc = stat(tmp[n], &statbuf); + if (0 != rc) { + pmix_output_verbose(10, pmix_globals.debug_output, + "File %s failed to stat: %d", tmp[n], rc); + continue; + } + if (statbuf.st_uid != epi->uid || + statbuf.st_gid != epi->gid) { + pmix_output_verbose(10, pmix_globals.debug_output, + "File %s uid/gid doesn't match: uid %lu(%lu) gid %lu(%lu)", + cf->path, + (unsigned long)statbuf.st_uid, (unsigned long)epi->uid, + (unsigned long)statbuf.st_gid, (unsigned long)epi->gid); + continue; + } + rc = unlink(tmp[n]); + if (0 != rc) { + pmix_output_verbose(10, pmix_globals.debug_output, + "File %s failed to unlink: %d", tmp[n], rc); + } } + pmix_argv_free(tmp); pmix_list_remove_item(&epi->cleanup_files, &cf->super); PMIX_RELEASE(cf); } @@ -395,27 +401,31 @@ void pmix_execute_epilog(pmix_epilog_t *epi) /* check the effective uid/gid of the file and ensure it * matches that of the peer - we do this to provide at least * some minimum level of protection */ - rc = stat(cd->path, &statbuf); - if (0 != rc) { - pmix_output_verbose(10, pmix_globals.debug_output, - "Directory %s failed to stat: %d", cd->path, rc); - continue; - } - if (statbuf.st_uid != epi->uid || - statbuf.st_gid != epi->gid) { - pmix_output_verbose(10, pmix_globals.debug_output, - "Directory %s uid/gid doesn't match: uid %lu(%lu) gid %lu(%lu)", - cd->path, - (unsigned long)statbuf.st_uid, (unsigned long)epi->uid, - (unsigned long)statbuf.st_gid, (unsigned long)epi->gid); - continue; - } - if ((statbuf.st_mode & S_IRWXU) == S_IRWXU) { - dirpath_destroy(cd->path, cd, epi); - } else { - pmix_output_verbose(10, pmix_globals.debug_output, - "Directory %s lacks permissions", cd->path); + tmp = pmix_argv_split(cd->path, ','); + for (n=0; NULL != tmp[n]; n++) { + rc = stat(tmp[n], &statbuf); + if (0 != rc) { + pmix_output_verbose(10, pmix_globals.debug_output, + "Directory %s failed to stat: %d", tmp[n], rc); + continue; + } + if (statbuf.st_uid != epi->uid || + statbuf.st_gid != epi->gid) { + pmix_output_verbose(10, pmix_globals.debug_output, + "Directory %s uid/gid doesn't match: uid %lu(%lu) gid %lu(%lu)", + cd->path, + (unsigned long)statbuf.st_uid, (unsigned long)epi->uid, + (unsigned long)statbuf.st_gid, (unsigned long)epi->gid); + continue; + } + if ((statbuf.st_mode & S_IRWXU) == S_IRWXU) { + dirpath_destroy(tmp[n], cd, epi); + } else { + pmix_output_verbose(10, pmix_globals.debug_output, + "Directory %s lacks permissions", tmp[n]); + } } + pmix_argv_free(tmp); pmix_list_remove_item(&epi->cleanup_dirs, &cd->super); PMIX_RELEASE(cd); } diff --git a/opal/mca/pmix/pmix4x/pmix/src/include/pmix_globals.h b/opal/mca/pmix/pmix4x/pmix/src/include/pmix_globals.h index 522b813d547..c49c3f5d7d7 100644 --- a/opal/mca/pmix/pmix4x/pmix/src/include/pmix_globals.h +++ b/opal/mca/pmix/pmix4x/pmix/src/include/pmix_globals.h @@ -420,6 +420,7 @@ typedef struct { */ pmix_proc_t *targets; size_t ntargets; + size_t nleft; // number of targets left to be notified /* When generating a notification, the originator can * specify the range of procs affected by this event. * For example, when creating a JOB_TERMINATED event, diff --git a/opal/mca/pmix/pmix4x/pmix/src/mca/common/dstore/dstore_base.c b/opal/mca/pmix/pmix4x/pmix/src/mca/common/dstore/dstore_base.c index de10b9df0d3..c0fc676e6c9 100644 --- a/opal/mca/pmix/pmix4x/pmix/src/mca/common/dstore/dstore_base.c +++ b/opal/mca/pmix/pmix4x/pmix/src/mca/common/dstore/dstore_base.c @@ -3,7 +3,7 @@ * Copyright (c) 2016-2018 IBM Corporation. All rights reserved. * Copyright (c) 2016-2018 Mellanox Technologies, Inc. * All rights reserved. - * Copyright (c) 2018 Research Organization for Information Science + * Copyright (c) 2018-2019 Research Organization for Information Science * and Technology (RIST). All rights reserved. * * $COPYRIGHT$ @@ -1782,6 +1782,7 @@ PMIX_EXPORT void pmix_common_dstor_finalize(pmix_common_dstore_ctx_t *ds_ctx) } free(ds_ctx->ds_name); free(ds_ctx->base_path); + free(ds_ctx); } static pmix_status_t _dstore_store_nolock(pmix_common_dstore_ctx_t *ds_ctx, diff --git a/opal/mca/pmix/pmix4x/pmix/src/mca/ptl/base/ptl_base_sendrecv.c b/opal/mca/pmix/pmix4x/pmix/src/mca/ptl/base/ptl_base_sendrecv.c index 62e9ad57075..0b465340bee 100644 --- a/opal/mca/pmix/pmix4x/pmix/src/mca/ptl/base/ptl_base_sendrecv.c +++ b/opal/mca/pmix/pmix4x/pmix/src/mca/ptl/base/ptl_base_sendrecv.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2014-2018 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2019 Intel, Inc. All rights reserved. * Copyright (c) 2014 Artem Y. Polyakov . * All rights reserved. * Copyright (c) 2015-2017 Research Organization for Information Science @@ -62,6 +62,12 @@ static void _timeout(int sd, short args, void *cbdata) PMIX_RELEASE(trk); } +static void lcfn(pmix_status_t status, void *cbdata) +{ + pmix_peer_t *peer = (pmix_peer_t*)cbdata; + PMIX_RELEASE(peer); +} + void pmix_ptl_base_lost_connection(pmix_peer_t *peer, pmix_status_t err) { pmix_server_trkr_t *trk, *tnxt; @@ -71,6 +77,8 @@ void pmix_ptl_base_lost_connection(pmix_peer_t *peer, pmix_status_t err) pmix_buffer_t buf; pmix_ptl_hdr_t hdr; struct timeval tv = {1200, 0}; + pmix_proc_t proc; + pmix_status_t rc; /* stop all events */ if (peer->recv_ev_active) { @@ -197,14 +205,29 @@ void pmix_ptl_base_lost_connection(pmix_peer_t *peer, pmix_status_t err) * we are just seeing their connection go away * when they terminate - so do not generate * an event. If not, then we do */ - PMIX_REPORT_EVENT(err, peer, PMIX_RANGE_NAMESPACE, _notify_complete); + PMIX_REPORT_EVENT(err, peer, PMIX_RANGE_PROC_LOCAL, _notify_complete); } /* now decrease the refcount - might actually free the object */ PMIX_RELEASE(peer->info); + + /* be sure to let the host know that the tool or client + * is gone - otherwise, it won't know to cleanup the + * resources it allocated to it */ + if (NULL != pmix_host_server.client_finalized && !peer->finalized) { + pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN); + proc.rank = peer->info->pname.rank; + /* now tell the host server */ + rc = pmix_host_server.client_finalized(&proc, peer->info->server_object, + lcfn, peer); + if (PMIX_SUCCESS == rc) { + /* we will release the peer when the server calls us back */ + peer->finalized = true; + return; + } + } /* mark the peer as "gone" since a release doesn't guarantee * that the peer object doesn't persist */ peer->finalized = true; - /* Release peer info */ PMIX_RELEASE(peer); } else { @@ -234,7 +257,7 @@ void pmix_ptl_base_lost_connection(pmix_peer_t *peer, pmix_status_t err) PMIX_DESTRUCT(&buf); /* if I called finalize, then don't generate an event */ if (!pmix_globals.mypeer->finalized) { - PMIX_REPORT_EVENT(err, pmix_client_globals.myserver, PMIX_RANGE_LOCAL, _notify_complete); + PMIX_REPORT_EVENT(err, pmix_client_globals.myserver, PMIX_RANGE_PROC_LOCAL, _notify_complete); } } } diff --git a/opal/mca/pmix/pmix4x/pmix/src/mca/ptl/tcp/ptl_tcp.c b/opal/mca/pmix/pmix4x/pmix/src/mca/ptl/tcp/ptl_tcp.c index f40a3c44075..76a4486ba8f 100644 --- a/opal/mca/pmix/pmix4x/pmix/src/mca/ptl/tcp/ptl_tcp.c +++ b/opal/mca/pmix/pmix4x/pmix/src/mca/ptl/tcp/ptl_tcp.c @@ -13,7 +13,7 @@ * Copyright (c) 2011-2014 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2011-2013 Los Alamos National Security, LLC. All rights * reserved. - * Copyright (c) 2013-2018 Intel, Inc. All rights reserved. + * Copyright (c) 2013-2019 Intel, Inc. All rights reserved. * Copyright (c) 2018 IBM Corporation. All rights reserved. * $COPYRIGHT$ * @@ -460,6 +460,55 @@ static pmix_status_t connect_to_peer(struct pmix_peer_t *peer, return PMIX_ERR_UNREACH; } + /* if they asked for system-level first or only, we start there */ + if (system_level || system_level_only) { + if (0 > asprintf(&filename, "%s/pmix.sys.%s", mca_ptl_tcp_component.system_tmpdir, myhost)) { + if (NULL != iptr) { + PMIX_INFO_FREE(iptr, niptr); + } + return PMIX_ERR_NOMEM; + } + pmix_output_verbose(2, pmix_ptl_base_framework.framework_output, + "ptl:tcp:tool looking for system server at %s", + filename); + /* try to read the file */ + rc = parse_uri_file(filename, &suri, &nspace, &rank); + free(filename); + if (PMIX_SUCCESS == rc) { + pmix_output_verbose(2, pmix_ptl_base_framework.framework_output, + "ptl:tcp:tool attempt connect to system server at %s", suri); + /* go ahead and try to connect */ + if (PMIX_SUCCESS == try_connect(suri, &sd, iptr, niptr)) { + /* don't free nspace - we will use it below */ + if (NULL != iptr) { + PMIX_INFO_FREE(iptr, niptr); + } + /* save the URI for storage */ + urikv = PMIX_NEW(pmix_kval_t); + urikv->key = strdup(PMIX_SERVER_URI); + PMIX_VALUE_CREATE(urikv->value, 1); + PMIX_VALUE_LOAD(urikv->value, suri, PMIX_STRING); + goto complete; + } + free(nspace); + } + } + + /* we get here if they either didn't ask for a system-level connection, + * or they asked for it and it didn't succeed. If they _only_ wanted + * a system-level connection, then we are done */ + if (system_level_only) { + pmix_output_verbose(2, pmix_ptl_base_framework.framework_output, + "ptl:tcp: connecting to system failed"); + if (NULL != suri) { + free(suri); + } + if (NULL != iptr) { + PMIX_INFO_FREE(iptr, niptr); + } + return PMIX_ERR_UNREACH; + } + /* if they gave us a pid, then look for it */ if (0 != pid) { if (NULL != server_nspace) { @@ -541,55 +590,6 @@ static pmix_status_t connect_to_peer(struct pmix_peer_t *peer, return PMIX_ERR_UNREACH; } - /* if they asked for system-level, we start there */ - if (system_level || system_level_only) { - if (0 > asprintf(&filename, "%s/pmix.sys.%s", mca_ptl_tcp_component.system_tmpdir, myhost)) { - if (NULL != iptr) { - PMIX_INFO_FREE(iptr, niptr); - } - return PMIX_ERR_NOMEM; - } - pmix_output_verbose(2, pmix_ptl_base_framework.framework_output, - "ptl:tcp:tool looking for system server at %s", - filename); - /* try to read the file */ - rc = parse_uri_file(filename, &suri, &nspace, &rank); - free(filename); - if (PMIX_SUCCESS == rc) { - pmix_output_verbose(2, pmix_ptl_base_framework.framework_output, - "ptl:tcp:tool attempt connect to system server at %s", suri); - /* go ahead and try to connect */ - if (PMIX_SUCCESS == try_connect(suri, &sd, iptr, niptr)) { - /* don't free nspace - we will use it below */ - if (NULL != iptr) { - PMIX_INFO_FREE(iptr, niptr); - } - /* save the URI for storage */ - urikv = PMIX_NEW(pmix_kval_t); - urikv->key = strdup(PMIX_SERVER_URI); - PMIX_VALUE_CREATE(urikv->value, 1); - PMIX_VALUE_LOAD(urikv->value, suri, PMIX_STRING); - goto complete; - } - free(nspace); - } - } - - /* we get here if they either didn't ask for a system-level connection, - * or they asked for it and it didn't succeed. If they _only_ wanted - * a system-level connection, then we are done */ - if (system_level_only) { - pmix_output_verbose(2, pmix_ptl_base_framework.framework_output, - "ptl:tcp: connecting to system failed"); - if (NULL != suri) { - free(suri); - } - if (NULL != iptr) { - PMIX_INFO_FREE(iptr, niptr); - } - return PMIX_ERR_UNREACH; - } - /* they didn't give us a pid, so we will search to see what session-level * tools are available to this user. We will take the first connection * that succeeds - this is based on the likelihood that there is only @@ -1253,9 +1253,13 @@ static pmix_status_t recv_connect_ack(int sd, uint8_t myflag) tv.tv_sec = mca_ptl_tcp_component.handshake_wait_time; tv.tv_usec = 0; if (0 != setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))) { - pmix_output_verbose(2, pmix_ptl_base_framework.framework_output, - "pmix: recv_connect_ack could not setsockopt SO_RCVTIMEO"); - return PMIX_ERR_UNREACH; + if (ENOPROTOOPT == errno || EOPNOTSUPP == errno) { + sockopt = false; + } else { + pmix_output_verbose(2, pmix_ptl_base_framework.framework_output, + "pmix: recv_connect_ack could not setsockopt SO_RCVTIMEO"); + return PMIX_ERR_UNREACH; + } } } diff --git a/opal/mca/pmix/pmix4x/pmix/src/runtime/pmix_params.c b/opal/mca/pmix/pmix4x/pmix/src/runtime/pmix_params.c index fe72bfbd73e..8d49e8bdaad 100644 --- a/opal/mca/pmix/pmix4x/pmix/src/runtime/pmix_params.c +++ b/opal/mca/pmix/pmix4x/pmix/src/runtime/pmix_params.c @@ -21,7 +21,7 @@ * and Technology (RIST). All rights reserved. * Copyright (c) 2015-2018 Mellanox Technologies, Inc. * All rights reserved. - * Copyright (c) 2016-2018 Intel, Inc. All rights reserved. + * Copyright (c) 2016-2019 Intel, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -258,6 +258,14 @@ pmix_status_t pmix_register_params(void) PMIX_INFO_LVL_1, PMIX_MCA_BASE_VAR_SCOPE_ALL, &pmix_globals.event_eviction_time); + /* max number of IOF messages to cache */ + pmix_server_globals.max_iof_cache = 1024 * 1024; + (void) pmix_mca_base_var_register ("pmix", "pmix", "max", "iof_cache", + "Maximum number of IOF messages to cache", + PMIX_MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + PMIX_INFO_LVL_1, PMIX_MCA_BASE_VAR_SCOPE_ALL, + &pmix_server_globals.max_iof_cache); + return PMIX_SUCCESS; } diff --git a/opal/mca/pmix/pmix4x/pmix/src/server/pmix_server.c b/opal/mca/pmix/pmix4x/pmix/src/server/pmix_server.c index 402ca55206b..bf3b95f01c7 100644 --- a/opal/mca/pmix/pmix4x/pmix/src/server/pmix_server.c +++ b/opal/mca/pmix/pmix4x/pmix/src/server/pmix_server.c @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2018 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2019 Intel, Inc. All rights reserved. * Copyright (c) 2014-2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2014-2015 Artem Y. Polyakov . @@ -87,18 +87,8 @@ static char *gds_mode = NULL; static pid_t mypid; // local functions for connection support -static void iof_eviction_cbfunc(struct pmix_hotel_t *hotel, - int room_num, - void *occupant) -{ - pmix_setup_caddy_t *cache = (pmix_setup_caddy_t*)occupant; - PMIX_RELEASE(cache); -} - pmix_status_t pmix_server_initialize(void) { - pmix_status_t rc; - /* setup the server-specific globals */ PMIX_CONSTRUCT(&pmix_server_globals.clients, pmix_pointer_array_t); pmix_pointer_array_init(&pmix_server_globals.clients, 1, INT_MAX, 1); @@ -109,15 +99,7 @@ pmix_status_t pmix_server_initialize(void) PMIX_CONSTRUCT(&pmix_server_globals.local_reqs, pmix_list_t); PMIX_CONSTRUCT(&pmix_server_globals.nspaces, pmix_list_t); PMIX_CONSTRUCT(&pmix_server_globals.groups, pmix_list_t); - PMIX_CONSTRUCT(&pmix_server_globals.iof, pmix_hotel_t); - rc = pmix_hotel_init(&pmix_server_globals.iof, PMIX_IOF_HOTEL_SIZE, - pmix_globals.evbase, PMIX_IOF_MAX_STAY, - iof_eviction_cbfunc); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE_THREAD(&pmix_global_lock); - return rc; - } + PMIX_CONSTRUCT(&pmix_server_globals.iof, pmix_list_t); pmix_output_verbose(2, pmix_server_globals.base_output, "pmix:server init called"); @@ -445,7 +427,6 @@ PMIX_EXPORT pmix_status_t PMIx_server_finalize(void) int i; pmix_peer_t *peer; pmix_namespace_t *ns; - pmix_setup_caddy_t *cd; PMIX_ACQUIRE_THREAD(&pmix_global_lock); if (pmix_globals.init_cntr <= 0) { @@ -473,14 +454,6 @@ PMIX_EXPORT pmix_status_t PMIx_server_finalize(void) pmix_ptl_base_stop_listening(); - /* cleanout any IOF */ - for (i=0; i < PMIX_IOF_HOTEL_SIZE; i++) { - pmix_hotel_checkout_and_return_occupant(&pmix_server_globals.iof, i, (void**)&cd); - if (NULL != cd) { - PMIX_RELEASE(cd); - } - } - PMIX_DESTRUCT(&pmix_server_globals.iof); for (i=0; i < pmix_server_globals.clients.size; i++) { if (NULL != (peer = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, i))) { /* ensure that we do the specified cleanup - if this is an @@ -504,6 +477,7 @@ PMIX_EXPORT pmix_status_t PMIx_server_finalize(void) } PMIX_LIST_DESTRUCT(&pmix_server_globals.nspaces); PMIX_LIST_DESTRUCT(&pmix_server_globals.groups); + PMIX_LIST_DESTRUCT(&pmix_server_globals.iof); pmix_hwloc_cleanup(); @@ -733,6 +707,11 @@ void pmix_server_purge_events(pmix_peer_t *peer, } } } + + if (NULL != peer) { + /* ensure we honor any peer-level epilog requests */ + pmix_execute_epilog(&peer->epilog); + } } static void _deregister_nspace(int sd, short args, void *cbdata) @@ -759,7 +738,10 @@ static void _deregister_nspace(int sd, short args, void *cbdata) /* release this nspace */ PMIX_LIST_FOREACH(tmp, &pmix_server_globals.nspaces, pmix_namespace_t) { - if (0 == strcmp(tmp->nspace, cd->proc.nspace)) { + if (PMIX_CHECK_NSPACE(tmp->nspace, cd->proc.nspace)) { + /* perform any nspace-level epilog */ + pmix_execute_epilog(&tmp->epilog); + /* remove and release it */ pmix_list_remove_item(&pmix_server_globals.nspaces, &tmp->super); PMIX_RELEASE(tmp); break; @@ -1172,6 +1154,8 @@ static void _deregister_client(int sd, short args, void *cbdata) pmix_pnet.child_finalized(&cd->proc); pmix_psensor.stop(peer, NULL); } + /* honor any registered epilogs */ + pmix_execute_epilog(&peer->epilog); /* ensure we close the socket to this peer so we don't * generate "connection lost" events should it be * subsequently "killed" by the host */ @@ -1712,7 +1696,7 @@ static void _iofdeliver(int sd, short args, void *cbdata) pmix_buffer_t *msg; bool found = false; bool cached = false; - int ignore; + pmix_iof_cache_t *iof; pmix_output_verbose(2, pmix_server_globals.iof_output, "PMIX:SERVER delivering IOF from %s on channel %0x", @@ -1726,8 +1710,7 @@ static void _iofdeliver(int sd, short args, void *cbdata) continue; } /* see if the source matches the request */ - if (0 != strncmp(cd->procs->nspace, req->pname.nspace, PMIX_MAX_NSLEN) || - (PMIX_RANK_WILDCARD != req->pname.rank && cd->procs->rank != req->pname.rank)) { + if (!PMIX_CHECK_PROCID(cd->procs, &req->pname)) { continue; } /* never forward back to the source! This can happen if the source @@ -1736,8 +1719,7 @@ static void _iofdeliver(int sd, short args, void *cbdata) if (NULL == req->peer->info || req->peer->finalized) { continue; } - if (0 == strncmp(cd->procs->nspace, req->peer->info->pname.nspace, PMIX_MAX_NSLEN) && - cd->procs->rank == req->peer->info->pname.rank) { + if (PMIX_CHECK_PROCID(cd->procs, &req->peer->info->pname)) { continue; } found = true; @@ -1778,15 +1760,21 @@ static void _iofdeliver(int sd, short args, void *cbdata) /* if nobody has registered for this yet, then cache it */ if (!found) { - /* add this output to our hotel so it is cached until someone + pmix_output_verbose(2, pmix_server_globals.iof_output, + "PMIx:SERVER caching IOF"); + if (pmix_server_globals.max_iof_cache == pmix_list_get_size(&pmix_server_globals.iof)) { + /* remove the oldest cached message */ + iof = (pmix_iof_cache_t*)pmix_list_remove_first(&pmix_server_globals.iof); + PMIX_RELEASE(iof); + } + /* add this output to our cache so it is cached until someone * registers to receive it */ - if (PMIX_SUCCESS != (rc = pmix_hotel_checkin(&pmix_server_globals.iof, cd, &ignore))) { - /* we can't cache it for some reason */ - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(cd); - return; - } - cached = true; + iof = PMIX_NEW(pmix_iof_cache_t); + memcpy(&iof->source, cd->procs, sizeof(pmix_proc_t)); + iof->channel = cd->channels; + iof->bo = cd->bo; + cd->bo = NULL; // protect the data + pmix_list_append(&pmix_server_globals.iof, &iof->super); } @@ -2827,6 +2815,64 @@ static void notifyerror_cbfunc (pmix_status_t status, void *cbdata) PMIX_RELEASE(cd); } +static void alloc_cbfunc(pmix_status_t status, + pmix_info_t *info, size_t ninfo, + void *cbdata, + pmix_release_cbfunc_t release_fn, + void *release_cbdata) +{ + pmix_query_caddy_t *qcd = (pmix_query_caddy_t*)cbdata; + pmix_server_caddy_t *cd = (pmix_server_caddy_t*)qcd->cbdata; + pmix_buffer_t *reply; + pmix_status_t rc; + + pmix_output_verbose(2, pmix_server_globals.base_output, + "pmix:alloc callback with status %d", status); + + reply = PMIX_NEW(pmix_buffer_t); + if (NULL == reply) { + PMIX_ERROR_LOG(PMIX_ERR_NOMEM); + PMIX_RELEASE(cd); + return; + } + PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + goto complete; + } + /* pack the returned data */ + PMIX_BFROPS_PACK(rc, cd->peer, reply, &ninfo, 1, PMIX_SIZE); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + goto complete; + } + if (0 < ninfo) { + PMIX_BFROPS_PACK(rc, cd->peer, reply, info, ninfo, PMIX_INFO); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + } + } + + complete: + // send reply + PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply); + if (PMIX_SUCCESS != rc) { + PMIX_RELEASE(reply); + } + + // cleanup + if (NULL != qcd->queries) { + PMIX_QUERY_FREE(qcd->queries, qcd->nqueries); + } + if (NULL != qcd->info) { + PMIX_INFO_FREE(qcd->info, qcd->ninfo); + } + PMIX_RELEASE(qcd); + PMIX_RELEASE(cd); + if (NULL != release_fn) { + release_fn(release_cbdata); + } +} static void query_cbfunc(pmix_status_t status, pmix_info_t *info, size_t ninfo, @@ -2866,6 +2912,126 @@ static void query_cbfunc(pmix_status_t status, } } + /* cache the data for any future requests */ + + complete: + // send reply + PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply); + if (PMIX_SUCCESS != rc) { + PMIX_RELEASE(reply); + } + + // cleanup + if (NULL != qcd->queries) { + PMIX_QUERY_FREE(qcd->queries, qcd->nqueries); + } + if (NULL != qcd->info) { + PMIX_INFO_FREE(qcd->info, qcd->ninfo); + } + PMIX_RELEASE(qcd); + PMIX_RELEASE(cd); + if (NULL != release_fn) { + release_fn(release_cbdata); + } +} + +static void jctrl_cbfunc(pmix_status_t status, + pmix_info_t *info, size_t ninfo, + void *cbdata, + pmix_release_cbfunc_t release_fn, + void *release_cbdata) +{ + pmix_query_caddy_t *qcd = (pmix_query_caddy_t*)cbdata; + pmix_server_caddy_t *cd = (pmix_server_caddy_t*)qcd->cbdata; + pmix_buffer_t *reply; + pmix_status_t rc; + + pmix_output_verbose(2, pmix_server_globals.base_output, + "pmix:jctrl callback with status %d", status); + + reply = PMIX_NEW(pmix_buffer_t); + if (NULL == reply) { + PMIX_ERROR_LOG(PMIX_ERR_NOMEM); + PMIX_RELEASE(cd); + return; + } + PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + goto complete; + } + /* pack the returned data */ + PMIX_BFROPS_PACK(rc, cd->peer, reply, &ninfo, 1, PMIX_SIZE); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + goto complete; + } + if (0 < ninfo) { + PMIX_BFROPS_PACK(rc, cd->peer, reply, info, ninfo, PMIX_INFO); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + } + } + + complete: + // send reply + PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply); + if (PMIX_SUCCESS != rc) { + PMIX_RELEASE(reply); + } + + // cleanup + if (NULL != qcd->queries) { + PMIX_QUERY_FREE(qcd->queries, qcd->nqueries); + } + if (NULL != qcd->info) { + PMIX_INFO_FREE(qcd->info, qcd->ninfo); + } + PMIX_RELEASE(qcd); + PMIX_RELEASE(cd); + if (NULL != release_fn) { + release_fn(release_cbdata); + } +} + +static void monitor_cbfunc(pmix_status_t status, + pmix_info_t *info, size_t ninfo, + void *cbdata, + pmix_release_cbfunc_t release_fn, + void *release_cbdata) +{ + pmix_query_caddy_t *qcd = (pmix_query_caddy_t*)cbdata; + pmix_server_caddy_t *cd = (pmix_server_caddy_t*)qcd->cbdata; + pmix_buffer_t *reply; + pmix_status_t rc; + + pmix_output_verbose(2, pmix_server_globals.base_output, + "pmix:monitor callback with status %d", status); + + reply = PMIX_NEW(pmix_buffer_t); + if (NULL == reply) { + PMIX_ERROR_LOG(PMIX_ERR_NOMEM); + PMIX_RELEASE(cd); + return; + } + PMIX_BFROPS_PACK(rc, cd->peer, reply, &status, 1, PMIX_STATUS); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + goto complete; + } + /* pack the returned data */ + PMIX_BFROPS_PACK(rc, cd->peer, reply, &ninfo, 1, PMIX_SIZE); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + goto complete; + } + if (0 < ninfo) { + PMIX_BFROPS_PACK(rc, cd->peer, reply, info, ninfo, PMIX_INFO); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + } + } + complete: // send reply PMIX_SERVER_QUEUE_REPLY(rc, cd->peer, cd->hdr.tag, reply); @@ -3314,7 +3480,7 @@ static pmix_status_t server_switchyard(pmix_peer_t *peer, uint32_t tag, if (PMIX_ALLOC_CMD == cmd) { PMIX_GDS_CADDY(cd, peer, tag); - if (PMIX_SUCCESS != (rc = pmix_server_alloc(peer, buf, query_cbfunc, cd))) { + if (PMIX_SUCCESS != (rc = pmix_server_alloc(peer, buf, alloc_cbfunc, cd))) { PMIX_RELEASE(cd); } return rc; @@ -3322,7 +3488,7 @@ static pmix_status_t server_switchyard(pmix_peer_t *peer, uint32_t tag, if (PMIX_JOB_CONTROL_CMD == cmd) { PMIX_GDS_CADDY(cd, peer, tag); - if (PMIX_SUCCESS != (rc = pmix_server_job_ctrl(peer, buf, query_cbfunc, cd))) { + if (PMIX_SUCCESS != (rc = pmix_server_job_ctrl(peer, buf, jctrl_cbfunc, cd))) { PMIX_RELEASE(cd); } return rc; @@ -3330,7 +3496,7 @@ static pmix_status_t server_switchyard(pmix_peer_t *peer, uint32_t tag, if (PMIX_MONITOR_CMD == cmd) { PMIX_GDS_CADDY(cd, peer, tag); - if (PMIX_SUCCESS != (rc = pmix_server_monitor(peer, buf, query_cbfunc, cd))) { + if (PMIX_SUCCESS != (rc = pmix_server_monitor(peer, buf, monitor_cbfunc, cd))) { PMIX_RELEASE(cd); } return rc; diff --git a/opal/mca/pmix/pmix4x/pmix/src/server/pmix_server_ops.c b/opal/mca/pmix/pmix4x/pmix/src/server/pmix_server_ops.c index dc1359cc975..dbd727cbb02 100644 --- a/opal/mca/pmix/pmix4x/pmix/src/server/pmix_server_ops.c +++ b/opal/mca/pmix/pmix4x/pmix/src/server/pmix_server_ops.c @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2018 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2019 Intel, Inc. All rights reserved. * Copyright (c) 2014-2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2014-2015 Artem Y. Polyakov . @@ -1121,10 +1121,9 @@ static void spcbfunc(pmix_status_t status, { pmix_setup_caddy_t *cd = (pmix_setup_caddy_t*)cbdata; pmix_iof_req_t *req; - pmix_setup_caddy_t *occupant; - int i; pmix_buffer_t *msg; pmix_status_t rc; + pmix_iof_cache_t *iof, *ionext; /* if it was successful, and there are IOF requests, then * register them now */ @@ -1142,60 +1141,60 @@ static void spcbfunc(pmix_status_t status, req->channels = cd->channels; pmix_list_append(&pmix_globals.iof_requests, &req->super); /* process any cached IO */ - for (i=0; i < PMIX_IOF_HOTEL_SIZE; i++) { - pmix_hotel_knock(&pmix_server_globals.iof, PMIX_IOF_HOTEL_SIZE-i-1, (void**)&occupant); - if (NULL != occupant) { - if (!(occupant->channels & req->channels)) { - continue; - } - /* if the source matches the request, then forward this along */ - if (0 != strncmp(occupant->procs->nspace, req->pname.nspace, PMIX_MAX_NSLEN) || - (PMIX_RANK_WILDCARD != req->pname.rank && occupant->procs->rank != req->pname.rank)) { - continue; - } - /* never forward back to the source! This can happen if the source - * is a launcher */ - if (0 == strncmp(occupant->procs->nspace, req->peer->info->pname.nspace, PMIX_MAX_NSLEN) && - occupant->procs->rank == req->peer->info->pname.rank) { - continue; - } - /* setup the msg */ - if (NULL == (msg = PMIX_NEW(pmix_buffer_t))) { - PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE); - rc = PMIX_ERR_OUT_OF_RESOURCE; - break; - } - /* provide the source */ - PMIX_BFROPS_PACK(rc, req->peer, msg, occupant->procs, 1, PMIX_PROC); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(msg); - break; - } - /* provide the channel */ - PMIX_BFROPS_PACK(rc, req->peer, msg, &occupant->channels, 1, PMIX_IOF_CHANNEL); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(msg); - break; - } - /* pack the data */ - PMIX_BFROPS_PACK(rc, req->peer, msg, occupant->bo, 1, PMIX_BYTE_OBJECT); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(msg); - break; - } - /* send it to the requestor */ - PMIX_PTL_SEND_ONEWAY(rc, req->peer, msg, PMIX_PTL_TAG_IOF); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(msg); - } - /* remove it from the hotel since it has now been forwarded */ - pmix_hotel_checkout(&pmix_server_globals.iof, PMIX_IOF_HOTEL_SIZE-i-1); - PMIX_RELEASE(occupant); + PMIX_LIST_FOREACH_SAFE(iof, ionext, &pmix_server_globals.iof, pmix_iof_cache_t) { + /* if the channels don't match, then ignore it */ + if (!(iof->channel & req->channels)) { + continue; + } + /* if the source does not match the request, then ignore it */ + if (!PMIX_CHECK_PROCID(&iof->source, &req->pname)) { + continue; + } + /* never forward back to the source! This can happen if the source + * is a launcher */ + if (PMIX_CHECK_PROCID(&iof->source, &req->peer->info->pname)) { + continue; } + pmix_output_verbose(2, pmix_server_globals.iof_output, + "PMIX:SERVER:SPAWN delivering cached IOF from %s:%d to %s:%d", + iof->source.nspace, iof->source.rank, + req->pname.nspace, req->pname.rank); + /* setup the msg */ + if (NULL == (msg = PMIX_NEW(pmix_buffer_t))) { + PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE); + rc = PMIX_ERR_OUT_OF_RESOURCE; + break; + } + /* provide the source */ + PMIX_BFROPS_PACK(rc, req->peer, msg, &iof->source, 1, PMIX_PROC); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(msg); + break; + } + /* provide the channel */ + PMIX_BFROPS_PACK(rc, req->peer, msg, &iof->channel, 1, PMIX_IOF_CHANNEL); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(msg); + break; + } + /* pack the data */ + PMIX_BFROPS_PACK(rc, req->peer, msg, iof->bo, 1, PMIX_BYTE_OBJECT); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(msg); + break; + } + /* send it to the requestor */ + PMIX_PTL_SEND_ONEWAY(rc, req->peer, msg, PMIX_PTL_TAG_IOF); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(msg); + } + /* remove it from the list since it has now been forwarded */ + pmix_list_remove_item(&pmix_server_globals.iof, &iof->super); + PMIX_RELEASE(iof); } } @@ -1294,7 +1293,8 @@ pmix_status_t pmix_server_spawn(pmix_peer_t *peer, } } } - /* we will construct any required iof request tracker upon completion of the spawn */ + /* we will construct any required iof request tracker upon completion of the spawn + * as we need the nspace of the spawned application! */ } /* add the directive to the end */ if (PMIX_PROC_IS_TOOL(peer)) { @@ -1648,6 +1648,8 @@ pmix_status_t pmix_server_register_events(pmix_peer_t *peer, pmix_cmd_t cmd = PMIX_NOTIFY_CMD; pmix_proc_t *affected = NULL; size_t naffected = 0; + pmix_range_trkr_t rngtrk; + pmix_proc_t proc; pmix_output_verbose(2, pmix_server_globals.event_output, "recvd register events for peer %s:%d", @@ -1882,6 +1884,8 @@ pmix_status_t pmix_server_register_events(pmix_peer_t *peer, } /* check if any matching notifications have been cached */ + rngtrk.procs = NULL; + rngtrk.nprocs = 0; for (i=0; i < pmix_globals.max_events; i++) { pmix_hotel_knock(&pmix_globals.notifications, i, (void**)&cd); if (NULL == cd) { @@ -1904,23 +1908,33 @@ pmix_status_t pmix_server_register_events(pmix_peer_t *peer, if (!found) { continue; } - /* if we were given specific targets, check if this is one */ + /* check the range */ + rngtrk.range = cd->range; + PMIX_LOAD_PROCID(&proc, peer->info->pname.nspace, peer->info->pname.rank); + if (!pmix_notify_check_range(&rngtrk, &proc)) { + continue; + } + /* if we were given specific targets, check if this is one */ + found = false; if (NULL != cd->targets) { matched = false; for (n=0; n < cd->ntargets; n++) { - if (0 != strncmp(peer->info->pname.nspace, cd->targets[n].nspace, PMIX_MAX_NSLEN)) { - continue; - } /* if the source of the event is the same peer just registered, then ignore it * as the event notification system will have already locally * processed it */ - if (0 == strncmp(peer->info->pname.nspace, cd->source.nspace, PMIX_MAX_NSLEN) && - peer->info->pname.rank == cd->source.rank) { + if (PMIX_CHECK_PROCID(&cd->source, &peer->info->pname)) { continue; } - if (PMIX_RANK_WILDCARD == cd->targets[n].rank || - peer->info->pname.rank == cd->targets[n].rank) { + if (PMIX_CHECK_PROCID(&peer->info->pname, &cd->targets[n])) { matched = true; + /* track the number of targets we have left to notify */ + --cd->nleft; + /* if this is the last one, then evict this event + * from the cache */ + if (0 == cd->nleft) { + pmix_hotel_checkout(&pmix_globals.notifications, cd->room); + found = true; // mark that we should release cd + } break; } } @@ -1929,6 +1943,7 @@ pmix_status_t pmix_server_register_events(pmix_peer_t *peer, continue; } } + /* if they specified affected proc(s) they wanted to know about, check */ if (!pmix_notify_check_affected(cd->affected, cd->naffected, affected, naffected)) { @@ -1974,7 +1989,11 @@ pmix_status_t pmix_server_register_events(pmix_peer_t *peer, if (PMIX_SUCCESS != ret) { PMIX_RELEASE(relay); } + if (found) { + PMIX_RELEASE(cd); + } } + if (NULL != codes) { free(codes); } @@ -2196,14 +2215,14 @@ pmix_status_t pmix_server_query(pmix_peer_t *peer, pmix_status_t rc; pmix_query_caddy_t *cd; pmix_proc_t proc; + pmix_cb_t cb; + size_t n, p; + pmix_list_t results; + pmix_kval_t *kv, *kvnxt; pmix_output_verbose(2, pmix_server_globals.base_output, "recvd query from client"); - if (NULL == pmix_host_server.query) { - return PMIX_ERR_NOT_SUPPORTED; - } - cd = PMIX_NEW(pmix_query_caddy_t); if (NULL == cd) { return PMIX_ERR_NOMEM; @@ -2214,36 +2233,132 @@ pmix_status_t pmix_server_query(pmix_peer_t *peer, PMIX_BFROPS_UNPACK(rc, peer, buf, &cd->nqueries, &cnt, PMIX_SIZE); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); - goto exit; + PMIX_RELEASE(cd); + return rc; } /* unpack the queries */ if (0 < cd->nqueries) { PMIX_QUERY_CREATE(cd->queries, cd->nqueries); if (NULL == cd->queries) { rc = PMIX_ERR_NOMEM; - goto exit; + PMIX_RELEASE(cd); + return rc; } cnt = cd->nqueries; PMIX_BFROPS_UNPACK(rc, peer, buf, cd->queries, &cnt, PMIX_QUERY); if (PMIX_SUCCESS != rc) { PMIX_ERROR_LOG(rc); - goto exit; + PMIX_RELEASE(cd); + return rc; } } + /** check each query/key to see if we already have the info + * before passing the request up to the host */ + /* check the directives to see if they want us to refresh + * the local cached results - if we wanted to optimize this + * more, we would check each query and allow those that don't + * want to be refreshed to be executed locally, and those that + * did would be sent to the host. However, for now we simply + * */ + memset(proc.nspace, 0, PMIX_MAX_NSLEN+1); + proc.rank = PMIX_RANK_INVALID; + PMIX_CONSTRUCT(&results, pmix_list_t); + + for (n=0; n < cd->nqueries; n++) { + for (p=0; p < cd->queries[n].nqual; p++) { + if (PMIX_CHECK_KEY(&cd->queries[n].qualifiers[p], PMIX_QUERY_REFRESH_CACHE)) { + if (PMIX_INFO_TRUE(&cd->queries[n].qualifiers[p])) { + PMIX_LIST_DESTRUCT(&results); + goto query; + } + } else if (PMIX_CHECK_KEY(&cd->queries[n].qualifiers[p], PMIX_PROCID)) { + PMIX_LOAD_NSPACE(proc.nspace, cd->queries[n].qualifiers[p].value.data.proc->nspace); + proc.rank = cd->queries[n].qualifiers[p].value.data.proc->rank; + } else if (PMIX_CHECK_KEY(&cd->queries[n].qualifiers[p], PMIX_NSPACE)) { + PMIX_LOAD_NSPACE(proc.nspace, cd->queries[n].qualifiers[p].value.data.string); + } else if (PMIX_CHECK_KEY(&cd->queries[n].qualifiers[p], PMIX_RANK)) { + proc.rank = cd->queries[n].qualifiers[p].value.data.rank; + } + } + /* we get here if a refresh isn't required - first try a local + * "get" on the data to see if we already have it */ + PMIX_CONSTRUCT(&cb, pmix_cb_t); + cb.copy = false; + /* set the proc */ + if (PMIX_RANK_INVALID == proc.rank && + 0 == strlen(proc.nspace)) { + /* use our id */ + cb.proc = &pmix_globals.myid; + } else { + if (0 == strlen(proc.nspace)) { + /* use our nspace */ + PMIX_LOAD_NSPACE(cb.proc->nspace, pmix_globals.myid.nspace); + } + if (PMIX_RANK_INVALID == proc.rank) { + /* user the wildcard rank */ + proc.rank = PMIX_RANK_WILDCARD; + } + cb.proc = &proc; + } + for (p=0; NULL != cd->queries[n].keys[p]; p++) { + cb.key = cd->queries[n].keys[p]; + PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb); + if (PMIX_SUCCESS != rc) { + /* needs to be passed to the host */ + PMIX_LIST_DESTRUCT(&results); + PMIX_DESTRUCT(&cb); + goto query; + } + /* need to retain this result */ + PMIX_LIST_FOREACH_SAFE(kv, kvnxt, &cb.kvs, pmix_kval_t) { + pmix_list_remove_item(&cb.kvs, &kv->super); + pmix_list_append(&results, &kv->super); + } + PMIX_DESTRUCT(&cb); + } + } + + /* if we get here, then all queries were completely locally + * resolved, so construct the results for return */ + rc = PMIX_ERR_NOT_FOUND; + if (0 < (cd->ninfo = pmix_list_get_size(&results))) { + PMIX_INFO_CREATE(cd->info, cd->ninfo); + n = 0; + PMIX_LIST_FOREACH_SAFE(kv, kvnxt, &results, pmix_kval_t) { + PMIX_LOAD_KEY(cd->info[n].key, kv->key); + rc = pmix_value_xfer(&cd->info[n].value, kv->value); + if (PMIX_SUCCESS != rc) { + PMIX_INFO_FREE(cd->info, cd->ninfo); + cd->info = NULL; + cd->ninfo = 0; + break; + } + ++n; + } + } + /* done with the list of results */ + PMIX_LIST_DESTRUCT(&results); + /* we can just call the cbfunc here as we are already + * in an event - let our internal cbfunc do a threadshift + * if necessary */ + cbfunc(PMIX_SUCCESS, cd->info, cd->ninfo, cd, NULL, NULL); + return PMIX_SUCCESS; + + query: + if (NULL == pmix_host_server.query) { + PMIX_RELEASE(cd); + return PMIX_ERR_NOT_SUPPORTED; + } + /* setup the requesting peer name */ - pmix_strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN); - proc.rank = peer->info->pname.rank; + PMIX_LOAD_PROCID(&proc, peer->info->pname.nspace, peer->info->pname.rank); /* ask the host for the info */ if (PMIX_SUCCESS != (rc = pmix_host_server.query(&proc, cd->queries, cd->nqueries, cbfunc, cd))) { - goto exit; + PMIX_RELEASE(cd); } - return PMIX_SUCCESS; - - exit: - PMIX_RELEASE(cd); return rc; } @@ -2952,9 +3067,8 @@ pmix_status_t pmix_server_iofreg(pmix_peer_t *peer, pmix_iof_req_t *req; bool notify, match; size_t n; - int i; - pmix_setup_caddy_t *occupant; pmix_buffer_t *msg; + pmix_iof_cache_t *iof, *ionext; pmix_output_verbose(2, pmix_server_globals.iof_output, "recvd IOF PULL request from client"); @@ -3051,54 +3165,60 @@ pmix_status_t pmix_server_iofreg(pmix_peer_t *peer, pmix_list_append(&pmix_globals.iof_requests, &req->super); } /* process any cached IO */ - for (i=0; i < PMIX_IOF_HOTEL_SIZE; i++) { - pmix_hotel_knock(&pmix_server_globals.iof, PMIX_IOF_HOTEL_SIZE-i-1, (void**)&occupant); - if (NULL != occupant) { - if (!(occupant->channels & req->channels)) { - continue; - } - /* if the source matches the request, then forward this along */ - if (0 != strncmp(occupant->procs->nspace, req->pname.nspace, PMIX_MAX_NSLEN) || - (PMIX_RANK_WILDCARD != req->pname.rank && occupant->procs->rank != req->pname.rank)) { - continue; - } - /* setup the msg */ - if (NULL == (msg = PMIX_NEW(pmix_buffer_t))) { - PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE); - rc = PMIX_ERR_OUT_OF_RESOURCE; - break; - } - /* provide the source */ - PMIX_BFROPS_PACK(rc, req->peer, msg, occupant->procs, 1, PMIX_PROC); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(msg); - break; - } - /* provide the channel */ - PMIX_BFROPS_PACK(rc, req->peer, msg, &occupant->channels, 1, PMIX_IOF_CHANNEL); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(msg); - break; - } - /* pack the data */ - PMIX_BFROPS_PACK(rc, req->peer, msg, occupant->bo, 1, PMIX_BYTE_OBJECT); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(msg); - break; - } - /* send it to the requestor */ - PMIX_PTL_SEND_ONEWAY(rc, req->peer, msg, PMIX_PTL_TAG_IOF); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(msg); - } - /* remove it from the hotel since it has now been forwarded */ - pmix_hotel_checkout(&pmix_server_globals.iof, PMIX_IOF_HOTEL_SIZE-i-1); - PMIX_RELEASE(occupant); + PMIX_LIST_FOREACH_SAFE(iof, ionext, &pmix_server_globals.iof, pmix_iof_cache_t) { + /* if the channels don't match, then ignore it */ + if (!(iof->channel & req->channels)) { + continue; + } + /* if the source does not match the request, then ignore it */ + if (!PMIX_CHECK_PROCID(&iof->source, &req->pname)) { + continue; + } + /* never forward back to the source! This can happen if the source + * is a launcher */ + if (PMIX_CHECK_PROCID(&iof->source, &req->peer->info->pname)) { + continue; + } + pmix_output_verbose(2, pmix_server_globals.iof_output, + "PMIX:SERVER:IOFREQ delivering cached IOF from %s:%d to %s:%d", + iof->source.nspace, iof->source.rank, + req->peer->info->pname.nspace, req->peer->info->pname.rank); + /* setup the msg */ + if (NULL == (msg = PMIX_NEW(pmix_buffer_t))) { + PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE); + rc = PMIX_ERR_OUT_OF_RESOURCE; + break; } + /* provide the source */ + PMIX_BFROPS_PACK(rc, req->peer, msg, &iof->source, 1, PMIX_PROC); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(msg); + break; + } + /* provide the channel */ + PMIX_BFROPS_PACK(rc, req->peer, msg, &iof->channel, 1, PMIX_IOF_CHANNEL); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(msg); + break; + } + /* pack the data */ + PMIX_BFROPS_PACK(rc, req->peer, msg, iof->bo, 1, PMIX_BYTE_OBJECT); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(msg); + break; + } + /* send it to the requestor */ + PMIX_PTL_SEND_ONEWAY(rc, req->peer, msg, PMIX_PTL_TAG_IOF); + if (PMIX_SUCCESS != rc) { + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(msg); + } + /* remove it from the list since it has now been forwarded */ + pmix_list_remove_item(&pmix_server_globals.iof, &iof->super); + PMIX_RELEASE(iof); } } if (notify) { @@ -4043,6 +4163,7 @@ static void ncon(pmix_notify_caddy_t *p) p->range = PMIX_RANGE_UNDEF; p->targets = NULL; p->ntargets = 0; + p->nleft = SIZE_MAX; p->affected = NULL; p->naffected = 0; p->nondefault = false; @@ -4192,3 +4313,15 @@ PMIX_CLASS_INSTANCE(pmix_group_t, PMIX_CLASS_INSTANCE(pmix_group_caddy_t, pmix_list_item_t, NULL, NULL); + +static void iocon(pmix_iof_cache_t *p) +{ + p->bo = NULL; +} +static void iodes(pmix_iof_cache_t *p) +{ + PMIX_BYTE_OBJECT_FREE(p->bo, 1); // macro protects against NULL +} +PMIX_CLASS_INSTANCE(pmix_iof_cache_t, + pmix_list_item_t, + iocon, iodes); diff --git a/opal/mca/pmix/pmix4x/pmix/src/server/pmix_server_ops.h b/opal/mca/pmix/pmix4x/pmix/src/server/pmix_server_ops.h index cfa41d69389..6c42fdf0054 100644 --- a/opal/mca/pmix/pmix4x/pmix/src/server/pmix_server_ops.h +++ b/opal/mca/pmix/pmix4x/pmix/src/server/pmix_server_ops.h @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2015-2018 Intel, Inc. All rights reserved. + * Copyright (c) 2015-2019 Intel, Inc. All rights reserved. * Copyright (c) 2015 Artem Y. Polyakov . * All rights reserved. * Copyright (c) 2015 Mellanox Technologies, Inc. @@ -14,6 +14,11 @@ #ifndef PMIX_SERVER_OPS_H #define PMIX_SERVER_OPS_H +#include +#ifdef HAVE_SYS_TYPES_H +#include +#endif + #include #include "src/include/types.h" #include @@ -58,6 +63,11 @@ typedef struct { pmix_iof_channel_t channels; pmix_byte_object_t *bo; size_t nbo; + /* timestamp receipt of the notification so we + * can evict the oldest one if we get overwhelmed */ + time_t ts; + /* what room of the hotel they are in */ + int room; pmix_op_cbfunc_t opcbfunc; pmix_dmodex_response_fn_t cbfunc; pmix_setup_application_cbfunc_t setupcbfunc; @@ -147,6 +157,14 @@ typedef struct { } pmix_group_caddy_t; PMIX_CLASS_DECLARATION(pmix_group_caddy_t); +typedef struct { + pmix_list_item_t super; + pmix_proc_t source; + pmix_iof_channel_t channel; + pmix_byte_object_t *bo; +} pmix_iof_cache_t; +PMIX_CLASS_DECLARATION(pmix_iof_cache_t); + typedef struct { pmix_list_t nspaces; // list of pmix_nspace_t for the nspaces we know about pmix_pointer_array_t clients; // array of pmix_peer_t local clients @@ -156,7 +174,8 @@ typedef struct { pmix_list_t gdata; // cache of data given to me for passing to all clients pmix_list_t events; // list of pmix_regevents_info_t registered events pmix_list_t groups; // list of pmix_group_t group memberships - pmix_hotel_t iof; // IO to be forwarded to clients + pmix_list_t iof; // IO to be forwarded to clients + size_t max_iof_cache; // max number of IOF messages to cache bool tool_connections_allowed; char *tmpdir; // temporary directory for this server char *system_tmpdir; // system tmpdir diff --git a/opal/mca/pmix/pmix4x/pmix/src/tool/pmix_tool.c b/opal/mca/pmix/pmix4x/pmix/src/tool/pmix_tool.c index 6c6c99eb873..1186626a322 100644 --- a/opal/mca/pmix/pmix4x/pmix/src/tool/pmix_tool.c +++ b/opal/mca/pmix/pmix4x/pmix/src/tool/pmix_tool.c @@ -1,6 +1,6 @@ /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2014-2018 Intel, Inc. All rights reserved. + * Copyright (c) 2014-2019 Intel, Inc. All rights reserved. * Copyright (c) 2014-2016 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2014 Artem Y. Polyakov . @@ -1098,7 +1098,6 @@ PMIX_EXPORT pmix_status_t PMIx_tool_finalize(void) struct timeval tv = {5, 0}; int n; pmix_peer_t *peer; - pmix_setup_caddy_t *cd; PMIX_ACQUIRE_THREAD(&pmix_global_lock); if (1 != pmix_globals.init_cntr) { @@ -1183,14 +1182,6 @@ PMIX_EXPORT pmix_status_t PMIx_tool_finalize(void) if (PMIX_PROC_IS_LAUNCHER(pmix_globals.mypeer)) { pmix_ptl_base_stop_listening(); - /* cleanout any IOF */ - for (n=0; n < PMIX_IOF_HOTEL_SIZE; n++) { - pmix_hotel_checkout_and_return_occupant(&pmix_server_globals.iof, n, (void**)&cd); - if (NULL != cd) { - PMIX_RELEASE(cd); - } - } - PMIX_DESTRUCT(&pmix_server_globals.iof); for (n=0; n < pmix_server_globals.clients.size; n++) { if (NULL != (peer = (pmix_peer_t*)pmix_pointer_array_get_item(&pmix_server_globals.clients, n))) { PMIX_RELEASE(peer); @@ -1204,6 +1195,7 @@ PMIX_EXPORT pmix_status_t PMIx_tool_finalize(void) PMIX_LIST_DESTRUCT(&pmix_server_globals.gdata); PMIX_LIST_DESTRUCT(&pmix_server_globals.events); PMIX_LIST_DESTRUCT(&pmix_server_globals.nspaces); + PMIX_LIST_DESTRUCT(&pmix_server_globals.iof); } /* shutdown services */ diff --git a/opal/mca/pmix/pmix4x/pmix/test/simple/simptest.c b/opal/mca/pmix/pmix4x/pmix/test/simple/simptest.c index d21ef6f8572..b1781f9a6cb 100644 --- a/opal/mca/pmix/pmix4x/pmix/test/simple/simptest.c +++ b/opal/mca/pmix/pmix4x/pmix/test/simple/simptest.c @@ -926,6 +926,7 @@ static void lkcbfn(int sd, short args, void *cbdata) lk->cbfunc(PMIX_SUCCESS, lk->pd, lk->n, lk->cbdata); PMIX_PDATA_FREE(lk->pd, lk->n); + free(lk); } static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys, @@ -937,7 +938,7 @@ static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys, size_t i, n; pmix_pdata_t *pd = NULL; pmix_status_t ret = PMIX_ERR_NOT_FOUND; - lkobj_t lk; + lkobj_t *lk; pmix_output(0, "SERVER: LOOKUP"); @@ -971,11 +972,12 @@ static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys, } PMIX_LIST_DESTRUCT(&results); if (PMIX_SUCCESS == ret) { - lk.pd = pd; - lk.n = n; - lk.cbfunc = cbfunc; - lk.cbdata = cbdata; - PMIX_THREADSHIFT(&lk, lkcbfn); + lk = (lkobj_t*)malloc(sizeof(lkobj_t)); + lk->pd = pd; + lk->n = n; + lk->cbfunc = cbfunc; + lk->cbdata = cbdata; + PMIX_THREADSHIFT(lk, lkcbfn); } return ret;