diff --git a/opal/mca/btl/vader/btl_vader.h b/opal/mca/btl/vader/btl_vader.h index 5290a7faa78..f0e8ef678f5 100644 --- a/opal/mca/btl/vader/btl_vader.h +++ b/opal/mca/btl/vader/btl_vader.h @@ -12,7 +12,7 @@ * All rights reserved. * Copyright (c) 2006-2007 Voltaire. All rights reserved. * Copyright (c) 2009-2010 Cisco Systems, Inc. All rights reserved. - * Copyright (c) 2010-2015 Los Alamos National Security, LLC. All rights + * Copyright (c) 2010-2017 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2015 Mellanox Technologies. All rights reserved. * @@ -136,6 +136,8 @@ struct mca_btl_vader_component_t { opal_list_t pending_endpoints; /**< list of endpoints with pending fragments */ opal_list_t pending_fragments; /**< fragments pending remote completion */ + char *backing_directory; /**< directory to place shared memory backing files */ + /* knem stuff */ #if OPAL_BTL_VADER_HAVE_KNEM unsigned int knem_dma_min; /**< minimum size to enable DMA for knem transfers (0 disables) */ diff --git a/opal/mca/btl/vader/btl_vader_component.c b/opal/mca/btl/vader/btl_vader_component.c index 38cc5fb987a..28d183c007b 100644 --- a/opal/mca/btl/vader/btl_vader_component.c +++ b/opal/mca/btl/vader/btl_vader_component.c @@ -12,7 +12,7 @@ * All rights reserved. * Copyright (c) 2006-2007 Voltaire. All rights reserved. * Copyright (c) 2009-2010 Cisco Systems, Inc. All rights reserved. - * Copyright (c) 2010-2015 Los Alamos National Security, LLC. + * Copyright (c) 2010-2017 Los Alamos National Security, LLC. * All rights reserved. * Copyright (c) 2011 NVIDIA Corporation. All rights reserved. * Copyright (c) 2014-2015 Intel, Inc. All rights reserved. @@ -211,6 +211,19 @@ static int mca_btl_vader_component_register (void) OPAL_INFO_LVL_3, MCA_BASE_VAR_SCOPE_GROUP, &mca_btl_vader_component.single_copy_mechanism); OBJ_RELEASE(new_enum); + if (0 == access ("/dev/shm", W_OK)) { + mca_btl_vader_component.backing_directory = "/dev/shm"; + } else { + mca_btl_vader_component.backing_directory = opal_process_info.proc_session_dir; + } + (void) mca_base_component_var_register (&mca_btl_vader_component.super.btl_version, "backing_directory", + "Directory to place backing files for shared memory communication. " + "This directory should be on a local filesystem such as /tmp or " + "/dev/shm (default: (linux) /dev/shm, (others) session directory)", + MCA_BASE_VAR_TYPE_STRING, NULL, 0, 0, OPAL_INFO_LVL_3, + MCA_BASE_VAR_SCOPE_READONLY, &mca_btl_vader_component.backing_directory); + + #if OPAL_BTL_VADER_HAVE_KNEM /* Currently disabling DMA mode by default; it's not clear that this is useful in all applications and architectures. */ mca_btl_vader_component.knem_dma_min = 0; @@ -491,13 +504,17 @@ static mca_btl_base_module_t **mca_btl_vader_component_init (int *num_btls, if (MCA_BTL_VADER_XPMEM != mca_btl_vader_component.single_copy_mechanism) { char *sm_file; - rc = asprintf(&sm_file, "%s" OPAL_PATH_SEP "vader_segment.%s.%d", opal_process_info.proc_session_dir, + rc = asprintf(&sm_file, "%s" OPAL_PATH_SEP "vader_segment.%s.%d", mca_btl_vader_component.backing_directory, opal_process_info.nodename, MCA_BTL_VADER_LOCAL_RANK); if (0 > rc) { free (btls); return NULL; } + if (NULL != opal_pmix.register_cleanup) { + opal_pmix.register_cleanup (sm_file); + } + rc = opal_shmem_segment_create (&component->seg_ds, sm_file, component->segment_size); free (sm_file); if (OPAL_SUCCESS != rc) { diff --git a/opal/mca/pmix/base/base.h b/opal/mca/pmix/base/base.h index d1eeb68e109..e533e026720 100644 --- a/opal/mca/pmix/base/base.h +++ b/opal/mca/pmix/base/base.h @@ -65,6 +65,7 @@ typedef struct { opal_mutex_t mutex; opal_pmix_condition_t cond; volatile bool active; + int status; } opal_pmix_lock_t; diff --git a/opal/mca/pmix/pmix.h b/opal/mca/pmix/pmix.h index 53e04571ab5..c266855e179 100644 --- a/opal/mca/pmix/pmix.h +++ b/opal/mca/pmix/pmix.h @@ -867,6 +867,9 @@ typedef int (*opal_pmix_base_process_monitor_fn_t)(opal_list_t *monitor, opal_list_t *directives, opal_pmix_info_cbfunc_t cbfunc, void *cbdata); +/* register cleanup */ +typedef int (*opal_pmix_base_register_cleanup_fn_t)(char *path); + /* * the standard public API data structure */ @@ -901,6 +904,7 @@ typedef struct { opal_pmix_base_alloc_fn_t allocate; opal_pmix_base_job_control_fn_t job_control; opal_pmix_base_process_monitor_fn_t monitor; + opal_pmix_base_register_cleanup_fn_t register_cleanup; /* server APIs */ opal_pmix_base_module_server_init_fn_t server_init; opal_pmix_base_module_server_finalize_fn_t server_finalize; diff --git a/opal/mca/pmix/pmix3x/pmix/VERSION b/opal/mca/pmix/pmix3x/pmix/VERSION index 3b0f60b307a..93b4afb0c98 100644 --- a/opal/mca/pmix/pmix3x/pmix/VERSION +++ b/opal/mca/pmix/pmix3x/pmix/VERSION @@ -30,7 +30,7 @@ greek= # command, or with the date (if "git describe" fails) in the form of # "date". -repo_rev=gitf56d30e +repo_rev=git5c0b64b # If tarball_version is not empty, it is used as the version string in # the tarball filename, regardless of all other versions listed in @@ -44,7 +44,7 @@ tarball_version= # The date when this release was created -date="Nov 11, 2017" +date="Dec 11, 2017" # The shared library version of each of PMIx's public libraries. # These versions are maintained in accordance with the "Library diff --git a/opal/mca/pmix/pmix3x/pmix/include/pmix_common.h.in b/opal/mca/pmix/pmix3x/pmix/include/pmix_common.h.in index 897c5f43a3e..bb15579141a 100644 --- a/opal/mca/pmix/pmix3x/pmix/include/pmix_common.h.in +++ b/opal/mca/pmix/pmix3x/pmix/include/pmix_common.h.in @@ -462,6 +462,16 @@ typedef uint32_t pmix_rank_t; #define PMIX_JOB_CTRL_PROVISION_IMAGE "pmix.jctrl.pvnimg" // (char*) name of the image that is to be provisioned #define PMIX_JOB_CTRL_PREEMPTIBLE "pmix.jctrl.preempt" // (bool) job can be pre-empted #define PMIX_JOB_CTRL_TERMINATE "pmix.jctrl.term" // (bool) politely terminate the specified procs +#define PMIX_REGISTER_CLEANUP "pmix.reg.cleanup" // (char*) comma-delimited list of files/directories to + // be removed upon process termination +#define PMIX_CLEANUP_RECURSIVE "pmix.clnup.recurse" // (bool) recursively cleanup all subdirectories under the + // specified one(s) +#define PMIX_CLEANUP_EMPTY "pmix.clnup.empty" // (bool) only remove empty subdirectories +#define PMIX_CLEANUP_IGNORE "pmix.clnup.ignore" // (char*) comma-delimited list of filenames that are not + // to be removed +#define PMIX_CLEANUP_LEAVE_TOPDIR "pmix.clnup.lvtop" // (bool) when recursively cleaning subdirs, do not remove + // the top-level directory (the one given in the + // cleanup request) /* monitoring attributes */ #define PMIX_MONITOR_ID "pmix.monitor.id" // (char*) provide a string identifier for this request diff --git a/opal/mca/pmix/pmix3x/pmix/src/atomics/sys/powerpc/atomic.h b/opal/mca/pmix/pmix3x/pmix/src/atomics/sys/powerpc/atomic.h index 9682b9e62af..4e39a43ee33 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/atomics/sys/powerpc/atomic.h +++ b/opal/mca/pmix/pmix3x/pmix/src/atomics/sys/powerpc/atomic.h @@ -84,7 +84,7 @@ void pmix_atomic_rmb(void) static inline void pmix_atomic_wmb(void) { - PMIXRMB(); + PMIXWMB(); } static inline @@ -110,7 +110,7 @@ void pmix_atomic_isync(void) #pragma mc_func pmix_atomic_rmb { "7c2004ac" } /* lwsync */ #pragma reg_killed_by pmix_atomic_rmb /* none */ -#pragma mc_func pmix_atomic_wmb { "7c0006ac" } /* eieio */ +#pragma mc_func pmix_atomic_wmb { "7c2004ac" } /* lwsync */ #pragma reg_killed_by pmix_atomic_wmb /* none */ #endif diff --git a/opal/mca/pmix/pmix3x/pmix/src/include/pmix_globals.c b/opal/mca/pmix/pmix3x/pmix/src/include/pmix_globals.c index 15d56e6268b..6626fced37e 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/include/pmix_globals.c +++ b/opal/mca/pmix/pmix3x/pmix/src/include/pmix_globals.c @@ -36,11 +36,22 @@ #endif #include #include PMIX_EVENT_HEADER +#if HAVE_SYS_STAT_H +#include +#endif /* HAVE_SYS_STAT_H */ +#ifdef HAVE_DIRENT_H +#include +#endif /* HAVE_DIRENT_H */ + +#include #include "src/mca/bfrops/bfrops_types.h" #include "src/class/pmix_hash_table.h" #include "src/class/pmix_list.h" #include "src/threads/threads.h" +#include "src/util/argv.h" +#include "src/util/error.h" +#include "src/util/os_path.h" PMIX_EXPORT pmix_lock_t pmix_global_lock = { .mutex = PMIX_MUTEX_STATIC_INIT, @@ -124,9 +135,15 @@ static void pcon(pmix_peer_t *p) PMIX_CONSTRUCT(&p->send_queue, pmix_list_t); p->send_msg = NULL; p->recv_msg = NULL; + PMIX_CONSTRUCT(&p->epilogs, pmix_list_t); } + +static void cleanup(pmix_info_caddy_t *epi); + static void pdes(pmix_peer_t *p) { + pmix_info_caddy_t *epi; + if (0 <= p->sd) { CLOSE_THE_SOCKET(p->sd); } @@ -148,6 +165,16 @@ static void pdes(pmix_peer_t *p) if (NULL != p->recv_msg) { PMIX_RELEASE(p->recv_msg); } + PMIX_LIST_FOREACH(epi, &p->epilogs, pmix_info_caddy_t) { + /* execute the epilog step */ + if (0 == strncmp(epi->info[0].key, PMIX_REGISTER_CLEANUP, PMIX_MAX_KEYLEN)) { + cleanup(epi); + } + /* free the epilog contents as the info_caddy_t + * destructor does not do so */ + PMIX_INFO_FREE(epi->info, epi->ninfo); + } + PMIX_LIST_DESTRUCT(&p->epilogs); } PMIX_EXPORT PMIX_CLASS_INSTANCE(pmix_peer_t, pmix_object_t, @@ -252,3 +279,211 @@ static void qdes(pmix_query_caddy_t *p) PMIX_EXPORT PMIX_CLASS_INSTANCE(pmix_query_caddy_t, pmix_object_t, qcon, qdes); + +static void dirpath_destroy(const char *path, + bool recursive, bool empty, + bool leave_top, char **ignores); +static bool dirpath_is_empty(const char *path ); +static int dirpath_access(const char *path, const mode_t in_mode ); + +static void cleanup(pmix_info_caddy_t *epi) +{ + char **targets = NULL, **ignores = NULL; + bool recurse = false, empty = false, leave_top = false; + size_t n; + + /* the targets for cleanup are in the first info struct */ + targets = pmix_argv_split(epi->info[0].value.data.string, ','); + /* cycle over any modifiers */ + for (n=1; n < epi->ninfo; n++) { + if (0 == strncmp(epi->info[n].key, PMIX_CLEANUP_RECURSIVE, PMIX_MAX_KEYLEN)) { + recurse = PMIX_INFO_TRUE(&epi->info[n]); + } else if (0 == strncmp(epi->info[n].key, PMIX_CLEANUP_EMPTY, PMIX_MAX_KEYLEN)) { + empty = PMIX_INFO_TRUE(&epi->info[n]); + } else if (0 == strncmp(epi->info[n].key, PMIX_CLEANUP_IGNORE, PMIX_MAX_KEYLEN)) { + if (PMIX_STRING != epi->info[n].value.type) { + PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); + continue; + } + ignores = pmix_argv_split(epi->info[n].value.data.string, ','); + } else if (0 == strncmp(epi->info[n].key, PMIX_CLEANUP_LEAVE_TOPDIR, PMIX_MAX_KEYLEN)) { + leave_top = PMIX_INFO_TRUE(&epi->info[n]); + } + } + /* cleanup the targets */ + for (n=0; NULL != targets[n]; n++) { + dirpath_destroy(targets[n], recurse, empty, leave_top, ignores); + } + pmix_argv_free(targets); + if (NULL != ignores) { + pmix_argv_free(ignores); + } +} + +static void dirpath_destroy(const char *path, + bool recursive, bool empty, + bool leave_top, char **ignores) +{ + int rc; + bool is_dir = false, ignore; + DIR *dp; + struct dirent *ep; + char *filenm; + struct stat buf; + size_t n, len; + + if (NULL == path) { /* protect against error */ + return; + } + + /* + * Make sure we have access to the the base directory + */ + if (PMIX_SUCCESS != (rc = dirpath_access(path, 0))) { + goto cleanup; + } + + /* Open up the directory */ + dp = opendir(path); + if (NULL == dp) { + return; + } + + while (NULL != (ep = readdir(dp))) { + /* skip: + * - . and .. + */ + if ((0 == strcmp(ep->d_name, ".")) || + (0 == strcmp(ep->d_name, ".."))) { + continue; + } + + /* Check to see if it is a directory */ + is_dir = false; + + /* Create a pathname. This is not always needed, but it makes + * for cleaner code just to create it here. Note that we are + * allocating memory here, so we need to free it later on. + */ + filenm = pmix_os_path(false, path, ep->d_name, NULL); + + rc = stat(filenm, &buf); + if (0 > rc) { + /* Handle a race condition. filenm might have been deleted by an + * other process running on the same node. That typically occurs + * when one task is removing the job_session_dir and an other task + * is still removing its proc_session_dir. + */ + free(filenm); + continue; + } + if (S_ISDIR(buf.st_mode)) { + is_dir = true; + } + + /* + * If not recursively decending, then if we find a directory then fail + * since we were not told to remove it. + */ + if (is_dir && !recursive) { + /* continue removing files */ + free(filenm); + continue; + } + + /* Will the caller allow us to remove this file/directory? */ + if (NULL != ignores) { + ignore = false; + for (n=0; NULL != ignores[n]; n++) { + if ('*' == ignores[n][strlen(ignores[n]-1)]) { + len = strlen(ignores[n]) - 1; + } else { + len = strlen(ignores[n]); + } + if (0 == strncmp(ep->d_name, ignores[n], len)) { + /* ignore this one */ + ignore = true; + break; + } + } + /* + * Caller does not wish to remove this file/directory, + * continue with the rest of the entries + */ + if (ignore) { + free(filenm); + continue; + } + } + /* Directories are recursively destroyed */ + if (is_dir && recursive) { + dirpath_destroy(filenm, recursive, empty, leave_top, ignores); + free(filenm); + } else { + /* Files are removed right here */ + unlink(filenm); + free(filenm); + } + } + + /* Done with this directory */ + closedir(dp); + + cleanup: + + /* + * If the directory is empty, them remove it + */ + if(empty && dirpath_is_empty(path)) { + rmdir(path); + } +} + +static bool dirpath_is_empty(const char *path ) +{ + DIR *dp; + struct dirent *ep; + + if (NULL != path) { /* protect against error */ + dp = opendir(path); + if (NULL != dp) { + while ((ep = readdir(dp))) { + if ((0 != strcmp(ep->d_name, ".")) && + (0 != strcmp(ep->d_name, ".."))) { + closedir(dp); + return false; + } + } + closedir(dp); + return true; + } + return false; + } + + return true; +} + +static int dirpath_access(const char *path, const mode_t in_mode ) +{ + struct stat buf; + mode_t loc_mode = S_IRWXU; /* looking for full rights */ + + /* + * If there was no mode specified, use the default mode + */ + if (0 != in_mode) { + loc_mode = in_mode; + } + + if (0 == stat(path, &buf)) { /* exists - check access */ + if ((buf.st_mode & loc_mode) == loc_mode) { /* okay, I can work here */ + return(PMIX_SUCCESS); + } else { + /* Don't have access rights to the existing path */ + return(PMIX_ERROR); + } + } else { + /* We could not find the path */ + return( PMIX_ERR_NOT_FOUND ); + } +} diff --git a/opal/mca/pmix/pmix3x/pmix/src/include/pmix_globals.h b/opal/mca/pmix/pmix3x/pmix/src/include/pmix_globals.h index 34f12a5dfeb..7536c58018d 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/include/pmix_globals.h +++ b/opal/mca/pmix/pmix3x/pmix/src/include/pmix_globals.h @@ -156,6 +156,17 @@ typedef struct pmix_rank_info_t { } pmix_rank_info_t; PMIX_CLASS_DECLARATION(pmix_rank_info_t); + +/* define a very simple caddy for dealing with pmix_info_t + * objects when transferring portions of arrays */ +typedef struct { + pmix_list_item_t super; + pmix_info_t *info; + size_t ninfo; +} pmix_info_caddy_t; +PMIX_CLASS_DECLARATION(pmix_info_caddy_t); + + /* object for tracking peers - each peer can have multiple * connections. This can occur if the initial app executes * a fork/exec, and the child initiates its own connection @@ -177,6 +188,8 @@ typedef struct pmix_peer_t { pmix_list_t send_queue; /**< list of messages to send */ pmix_ptl_send_t *send_msg; /**< current send in progress */ pmix_ptl_recv_t *recv_msg; /**< current recv in progress */ + pmix_list_t epilogs; /**< list of pmix_info_caddy_t to be performed upon + termination of this peer */ } pmix_peer_t; PMIX_CLASS_DECLARATION(pmix_peer_t); @@ -305,14 +318,6 @@ typedef struct { } pmix_cb_t; PMIX_CLASS_DECLARATION(pmix_cb_t); -/* define a very simple caddy for dealing with pmix_info_t - * objects when transferring portions of arrays */ -typedef struct { - pmix_list_item_t super; - pmix_info_t *info; -} pmix_info_caddy_t; -PMIX_CLASS_DECLARATION(pmix_info_caddy_t); - #define PMIX_THREADSHIFT(r, c) \ do { \ pmix_event_assign(&((r)->ev), pmix_globals.evbase, \ diff --git a/opal/mca/pmix/pmix3x/pmix/src/mca/gds/ds12/Makefile.am b/opal/mca/pmix/pmix3x/pmix/src/mca/gds/ds12/Makefile.am deleted file mode 100644 index ac62d8a9aad..00000000000 --- a/opal/mca/pmix/pmix3x/pmix/src/mca/gds/ds12/Makefile.am +++ /dev/null @@ -1,56 +0,0 @@ -# -*- makefile -*- -# -# Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana -# University Research and Technology -# Corporation. All rights reserved. -# Copyright (c) 2004-2005 The University of Tennessee and The University -# of Tennessee Research Foundation. All rights -# reserved. -# Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, -# University of Stuttgart. All rights reserved. -# Copyright (c) 2004-2005 The Regents of the University of California. -# All rights reserved. -# Copyright (c) 2012 Los Alamos National Security, Inc. All rights reserved. -# Copyright (c) 2013-2017 Intel, Inc. All rights reserved. -# Copyright (c) 2017 Research Organization for Information Science -# and Technology (RIST). All rights reserved. -# Copyright (c) 2017 Mellanox Technologies, Inc. -# All rights reserved. -# $COPYRIGHT$ -# -# Additional copyrights may follow -# -# $HEADER$ -# - -headers = \ - gds_dstore.h - -sources = \ - gds_dstore.c \ - gds_dstore_component.c - -# Make the output library in this directory, and name it either -# mca__.la (for DSO builds) or libmca__.la -# (for static builds). - -if MCA_BUILD_pmix_gds_ds12_DSO -lib = -lib_sources = -component = mca_gds_ds12.la -component_sources = $(headers) $(sources) -else -lib = libmca_gds_ds12.la -lib_sources = $(headers) $(sources) -component = -component_sources = -endif - -mcacomponentdir = $(pmixlibdir) -mcacomponent_LTLIBRARIES = $(component) -mca_gds_ds12_la_SOURCES = $(component_sources) -mca_gds_ds12_la_LDFLAGS = -module -avoid-version - -noinst_LTLIBRARIES = $(lib) -libmca_gds_ds12_la_SOURCES = $(lib_sources) -libmca_gds_ds12_la_LDFLAGS = -module -avoid-version diff --git a/opal/mca/pmix/pmix3x/pmix/src/mca/gds/ds12/gds_dstore.c b/opal/mca/pmix/pmix3x/pmix/src/mca/gds/ds12/gds_dstore.c deleted file mode 100644 index cdebc3fe8a6..00000000000 --- a/opal/mca/pmix/pmix3x/pmix/src/mca/gds/ds12/gds_dstore.c +++ /dev/null @@ -1,3092 +0,0 @@ -/* - * Copyright (c) 2015-2017 Intel, Inc. All rights reserved. - * Copyright (c) 2016 IBM Corporation. All rights reserved. - * Copyright (c) 2016-2017 Mellanox Technologies, Inc. - * All rights reserved. - * - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#include - -#include -#include -#include -#include -#include -#include -#ifdef HAVE_UNISTD_H -#include -#endif -#ifdef HAVE_SYS_TYPES_H -#include -#endif -#ifdef HAVE_SYS_STAT_H -#include -#endif -#ifdef HAVE_FCNTL_H -#include -#endif -#include - -#include - -#include "src/include/pmix_globals.h" -#include "src/class/pmix_list.h" -#include "src/client/pmix_client_ops.h" -#include "src/server/pmix_server_ops.h" -#include "src/util/argv.h" -#include "src/util/compress.h" -#include "src/util/error.h" -#include "src/util/output.h" -#include "src/util/pmix_environ.h" -#include "src/util/hash.h" -#include "src/mca/preg/preg.h" - -#include "src/mca/gds/base/base.h" -#include "gds_dstore.h" -#include "src/mca/pshmem/base/base.h" - -#define ESH_REGION_EXTENSION "EXTENSION_SLOT" -#define ESH_REGION_INVALIDATED "INVALIDATED" -#define ESH_ENV_INITIAL_SEG_SIZE "INITIAL_SEG_SIZE" -#define ESH_ENV_NS_META_SEG_SIZE "NS_META_SEG_SIZE" -#define ESH_ENV_NS_DATA_SEG_SIZE "NS_DATA_SEG_SIZE" -#define ESH_ENV_LINEAR "SM_USE_LINEAR_SEARCH" - -#define ESH_MIN_KEY_LEN (sizeof(ESH_REGION_INVALIDATED)) - -#define ESH_KV_SIZE(addr) \ -__extension__ ({ \ - size_t sz; \ - memcpy(&sz, addr, sizeof(size_t)); \ - sz; \ -}) - -#define ESH_KNAME_PTR(addr) \ -__extension__ ({ \ - char *name_ptr = (char *)addr + sizeof(size_t); \ - name_ptr; \ -}) - -#define ESH_KNAME_LEN(key) \ -__extension__ ({ \ - size_t kname_len = strlen(key) + 1; \ - size_t len = (kname_len < ESH_MIN_KEY_LEN) ? \ - ESH_MIN_KEY_LEN : kname_len; \ - len; \ -}) - -#define ESH_DATA_PTR(addr) \ -__extension__ ({ \ - size_t kname_len = ESH_KNAME_LEN(ESH_KNAME_PTR(addr)); \ - uint8_t *data_ptr = addr + sizeof(size_t) + kname_len; \ - data_ptr; \ -}) - -#define ESH_DATA_SIZE(addr, data_ptr) \ -__extension__ ({ \ - size_t sz = ESH_KV_SIZE(addr); \ - size_t data_size = sz - (data_ptr - addr); \ - data_size; \ -}) - -#define ESH_KEY_SIZE(key, size) \ -__extension__ ({ \ - size_t len = sizeof(size_t) + ESH_KNAME_LEN(key) + size;\ - len; \ -}) - -/* in ext slot new offset will be stored in case if - * new data were added for the same process during - * next commit - */ -#define EXT_SLOT_SIZE() \ - (ESH_KEY_SIZE(ESH_REGION_EXTENSION, sizeof(size_t))) - - -#define ESH_PUT_KEY(addr, key, buffer, size) \ -__extension__ ({ \ - size_t sz = ESH_KEY_SIZE(key, size); \ - memcpy(addr, &sz, sizeof(size_t)); \ - memset(addr + sizeof(size_t), 0, ESH_KNAME_LEN(key)); \ - strncpy((char *)addr + sizeof(size_t), \ - key, ESH_KNAME_LEN(key)); \ - memcpy(addr + sizeof(size_t) + ESH_KNAME_LEN(key), \ - buffer, size); \ -}) - -#ifdef ESH_PTHREAD_LOCK -#define _ESH_LOCK(rwlock, func) \ -__extension__ ({ \ - pmix_status_t ret = PMIX_SUCCESS; \ - int rc; \ - rc = pthread_rwlock_##func(rwlock); \ - if (0 != rc) { \ - switch (errno) { \ - case EINVAL: \ - ret = PMIX_ERR_INIT; \ - break; \ - case EPERM: \ - ret = PMIX_ERR_NO_PERMISSIONS; \ - break; \ - } \ - } \ - if (ret) { \ - pmix_output(0, "%s %d:%s lock failed: %s", \ - __FILE__, __LINE__, __func__, strerror(errno)); \ - } \ - ret; \ -}) - -#define _ESH_WRLOCK(rwlock) _ESH_LOCK(rwlock, wrlock) -#define _ESH_RDLOCK(rwlock) _ESH_LOCK(rwlock, rdlock) -#define _ESH_UNLOCK(rwlock) _ESH_LOCK(rwlock, unlock) -#endif - -#ifdef ESH_FCNTL_LOCK -#define _ESH_LOCK(lockfd, operation) \ -__extension__ ({ \ - pmix_status_t ret = PMIX_SUCCESS; \ - int i; \ - struct flock fl = {0}; \ - fl.l_type = operation; \ - fl.l_whence = SEEK_SET; \ - for(i = 0; i < 10; i++) { \ - if( 0 > fcntl(lockfd, F_SETLKW, &fl) ) { \ - switch( errno ){ \ - case EINTR: \ - continue; \ - case ENOENT: \ - case EINVAL: \ - ret = PMIX_ERR_NOT_FOUND; \ - break; \ - case EBADF: \ - ret = PMIX_ERR_BAD_PARAM; \ - break; \ - case EDEADLK: \ - case EFAULT: \ - case ENOLCK: \ - ret = PMIX_ERR_RESOURCE_BUSY; \ - break; \ - default: \ - ret = PMIX_ERROR; \ - break; \ - } \ - } \ - break; \ - } \ - if (ret) { \ - pmix_output(0, "%s %d:%s lock failed: %s", \ - __FILE__, __LINE__, __func__, strerror(errno)); \ - } \ - ret; \ -}) - -#define _ESH_WRLOCK(lock) _ESH_LOCK(lock, F_WRLCK) -#define _ESH_RDLOCK(lock) _ESH_LOCK(lock, F_RDLCK) -#define _ESH_UNLOCK(lock) _ESH_LOCK(lock, F_UNLCK) -#endif - -#define ESH_INIT_SESSION_TBL_SIZE 2 -#define ESH_INIT_NS_MAP_TBL_SIZE 2 - -static int _store_data_for_rank(ns_track_elem_t *ns_info, pmix_rank_t rank, pmix_buffer_t *buf); -static seg_desc_t *_create_new_segment(segment_type type, const ns_map_data_t *ns_map, uint32_t id); -static seg_desc_t *_attach_new_segment(segment_type type, const ns_map_data_t *ns_map, uint32_t id); -static int _update_ns_elem(ns_track_elem_t *ns_elem, ns_seg_info_t *info); -static int _put_ns_info_to_initial_segment(const ns_map_data_t *ns_map, pmix_pshmem_seg_t *metaseg, pmix_pshmem_seg_t *dataseg); -static ns_seg_info_t *_get_ns_info_from_initial_segment(const ns_map_data_t *ns_map); -static ns_track_elem_t *_get_track_elem_for_namespace(ns_map_data_t *ns_map); -static rank_meta_info *_get_rank_meta_info(pmix_rank_t rank, seg_desc_t *segdesc); -static uint8_t *_get_data_region_by_offset(seg_desc_t *segdesc, size_t offset); -static void _update_initial_segment_info(const ns_map_data_t *ns_map); -static void _set_constants_from_env(void); -static void _delete_sm_desc(seg_desc_t *desc); -static int _pmix_getpagesize(void); -static inline ssize_t _get_univ_size(const char *nspace); - -static inline ns_map_data_t * _esh_session_map_search_server(const char *nspace); -static inline ns_map_data_t * _esh_session_map_search_client(const char *nspace); -static inline ns_map_data_t * _esh_session_map(const char *nspace, size_t tbl_idx); -static inline void _esh_session_map_clean(ns_map_t *m); -static inline int _esh_jobuid_tbl_search(uid_t jobuid, size_t *tbl_idx); -static inline int _esh_session_tbl_add(size_t *tbl_idx); -static inline int _esh_session_init(size_t idx, ns_map_data_t *m, size_t jobuid, int setjobuid); -static inline void _esh_session_release(session_t *s); -static inline void _esh_ns_track_cleanup(void); -static inline void _esh_sessions_cleanup(void); -static inline void _esh_ns_map_cleanup(void); -static inline int _esh_dir_del(const char *dirname); -static inline void _client_compat_save(pmix_peer_t *peer); -static inline pmix_peer_t * _client_peer(void); - -static inline int _my_client(const char *nspace, pmix_rank_t rank); - -static pmix_status_t dstore_init(pmix_info_t info[], size_t ninfo); - -static void dstore_finalize(void); - -static pmix_status_t dstore_setup_fork(const pmix_proc_t *peer, char ***env); - -static pmix_status_t dstore_cache_job_info(struct pmix_nspace_t *ns, - pmix_info_t info[], size_t ninfo); - -static pmix_status_t dstore_register_job_info(struct pmix_peer_t *pr, - pmix_buffer_t *reply); - -static pmix_status_t dstore_store_job_info(const char *nspace, - pmix_buffer_t *job_data); - -static pmix_status_t _dstore_store(const char *nspace, - pmix_rank_t rank, - pmix_kval_t *kv); - -static pmix_status_t dstore_store(const pmix_proc_t *proc, - pmix_scope_t scope, - pmix_kval_t *kv); - -static pmix_status_t _dstore_fetch(const char *nspace, - pmix_rank_t rank, - const char *key, pmix_value_t **kvs); - -static pmix_status_t dstore_fetch(const pmix_proc_t *proc, - pmix_scope_t scope, bool copy, - const char *key, - pmix_info_t info[], size_t ninfo, - pmix_list_t *kvs); - -static pmix_status_t dstore_add_nspace(const char *nspace, - pmix_info_t info[], - size_t ninfo); - -static pmix_status_t dstore_del_nspace(const char* nspace); - -static pmix_status_t dstore_assign_module(pmix_info_t *info, size_t ninfo, - int *priority); - -static pmix_status_t dstore_store_modex(struct pmix_nspace_t *nspace, - pmix_list_t *cbs, - pmix_byte_object_t *bo); - -pmix_gds_base_module_t pmix_ds12_module = { - .name = "ds12", - .init = dstore_init, - .finalize = dstore_finalize, - .assign_module = dstore_assign_module, - .cache_job_info = dstore_cache_job_info, - .register_job_info = dstore_register_job_info, - .store_job_info = dstore_store_job_info, - .store = dstore_store, - .store_modex = dstore_store_modex, - .fetch = dstore_fetch, - .setup_fork = dstore_setup_fork, - .add_nspace = dstore_add_nspace, - .del_nspace = dstore_del_nspace, -}; - -static char *_base_path = NULL; -static size_t _initial_segment_size = 0; -static size_t _max_ns_num; -static size_t _meta_segment_size = 0; -static size_t _max_meta_elems; -static size_t _data_segment_size = 0; -static size_t _lock_segment_size = 0; -static uid_t _jobuid; -static char _setjobuid = 0; -static pmix_peer_t *_clients_peer = NULL; - -static pmix_value_array_t *_session_array = NULL; -static pmix_value_array_t *_ns_map_array = NULL; -static pmix_value_array_t *_ns_track_array = NULL; - -ns_map_data_t * (*_esh_session_map_search)(const char *nspace) = NULL; -int (*_esh_lock_init)(size_t idx) = NULL; - -#define _ESH_SESSION_path(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].nspace_path) -#define _ESH_SESSION_lockfile(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].lockfile) -#define _ESH_SESSION_setjobuid(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].setjobuid) -#define _ESH_SESSION_jobuid(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].jobuid) -#define _ESH_SESSION_sm_seg_first(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].sm_seg_first) -#define _ESH_SESSION_sm_seg_last(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].sm_seg_last) -#define _ESH_SESSION_ns_info(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].ns_info) - -#ifdef ESH_PTHREAD_LOCK -#define _ESH_SESSION_pthread_rwlock(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].rwlock) -#define _ESH_SESSION_pthread_seg(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].rwlock_seg) -#define _ESH_SESSION_lock(tbl_idx) _ESH_SESSION_pthread_rwlock(tbl_idx) -#endif - -#ifdef ESH_FCNTL_LOCK -#define _ESH_SESSION_lockfd(tbl_idx) (PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t)[tbl_idx].lockfd) -#define _ESH_SESSION_lock(tbl_idx) _ESH_SESSION_lockfd(tbl_idx) -#endif - -/* If _direct_mode is set, it means that we use linear search - * along the array of rank meta info objects inside a meta segment - * to find the requested rank. Otherwise, we do a fast lookup - * based on rank and directly compute offset. - * This mode is called direct because it's effectively used in - * sparse communication patterns when direct modex is usually used. - */ -static int _direct_mode = 0; - -static void ncon(ns_track_elem_t *p) { - memset(&p->ns_map, 0, sizeof(p->ns_map)); - p->meta_seg = NULL; - p->data_seg = NULL; - p->num_meta_seg = 0; - p->num_data_seg = 0; - p->in_use = true; -} - -static void ndes(ns_track_elem_t *p) { - _delete_sm_desc(p->meta_seg); - _delete_sm_desc(p->data_seg); - memset(&p->ns_map, 0, sizeof(p->ns_map)); - p->in_use = false; -} - -PMIX_CLASS_INSTANCE(ns_track_elem_t, - pmix_value_array_t, - ncon, ndes); - -static inline void _esh_session_map_clean(ns_map_t *m) { - memset(m, 0, sizeof(*m)); - m->data.track_idx = -1; -} - -#ifdef ESH_FCNTL_LOCK -static inline int _flock_init(size_t idx) { - pmix_status_t rc = PMIX_SUCCESS; - - if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) { - _ESH_SESSION_lock(idx) = open(_ESH_SESSION_lockfile(idx), O_CREAT | O_RDWR | O_EXCL, 0600); - - /* if previous launch was crashed, the lockfile might not be deleted and unlocked, - * so we delete it and create a new one. */ - if (_ESH_SESSION_lock(idx) < 0) { - unlink(_ESH_SESSION_lockfile(idx)); - _ESH_SESSION_lock(idx) = open(_ESH_SESSION_lockfile(idx), O_CREAT | O_RDWR, 0600); - if (_ESH_SESSION_lock(idx) < 0) { - rc = PMIX_ERROR; - PMIX_ERROR_LOG(rc); - return rc; - } - } - if (_ESH_SESSION_setjobuid(idx) > 0) { - if (0 > chown(_ESH_SESSION_lockfile(idx), (uid_t) _ESH_SESSION_jobuid(idx), (gid_t) -1)) { - rc = PMIX_ERROR; - PMIX_ERROR_LOG(rc); - return rc; - } - if (0 > chmod(_ESH_SESSION_lockfile(idx), S_IRUSR | S_IWGRP | S_IRGRP)) { - rc = PMIX_ERROR; - PMIX_ERROR_LOG(rc); - return rc; - } - } - } - else { - _ESH_SESSION_lock(idx) = open(_ESH_SESSION_lockfile(idx), O_RDONLY); - if (-1 == _ESH_SESSION_lock(idx)) { - rc = PMIX_ERROR; - PMIX_ERROR_LOG(rc); - return rc; - } - } - return rc; -} -#endif - -#ifdef ESH_PTHREAD_LOCK -static inline int _rwlock_init(size_t idx) { - pmix_status_t rc = PMIX_SUCCESS; - size_t size = _lock_segment_size; - pthread_rwlockattr_t attr; - - if ((NULL != _ESH_SESSION_pthread_seg(idx)) || (NULL != _ESH_SESSION_pthread_rwlock(idx))) { - rc = PMIX_ERR_INIT; - return rc; - } - _ESH_SESSION_pthread_seg(idx) = (pmix_pshmem_seg_t *)malloc(sizeof(pmix_pshmem_seg_t)); - if (NULL == _ESH_SESSION_pthread_seg(idx)) { - rc = PMIX_ERR_OUT_OF_RESOURCE; - return rc; - } - if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) { - if (PMIX_SUCCESS != (rc = pmix_pshmem.segment_create(_ESH_SESSION_pthread_seg(idx), _ESH_SESSION_lockfile(idx), size))) { - return rc; - } - memset(_ESH_SESSION_pthread_seg(idx)->seg_base_addr, 0, size); - if (_ESH_SESSION_setjobuid(idx) > 0) { - if (0 > chown(_ESH_SESSION_lockfile(idx), (uid_t) _ESH_SESSION_jobuid(idx), (gid_t) -1)){ - rc = PMIX_ERROR; - PMIX_ERROR_LOG(rc); - return rc; - } - /* set the mode as required */ - if (0 > chmod(_ESH_SESSION_lockfile(idx), S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP )) { - rc = PMIX_ERROR; - PMIX_ERROR_LOG(rc); - return rc; - } - } - _ESH_SESSION_pthread_rwlock(idx) = (pthread_rwlock_t *)_ESH_SESSION_pthread_seg(idx)->seg_base_addr; - - if (0 != pthread_rwlockattr_init(&attr)) { - rc = PMIX_ERR_INIT; - pmix_pshmem.segment_detach(_ESH_SESSION_pthread_seg(idx)); - return rc; - } - if (0 != pthread_rwlockattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) { - rc = PMIX_ERR_INIT; - pmix_pshmem.segment_detach(_ESH_SESSION_pthread_seg(idx)); - pthread_rwlockattr_destroy(&attr); - return rc; - } -#ifdef HAVE_PTHREAD_SETKIND - if (0 != pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP)) { - rc = PMIX_ERR_INIT; - pmix_pshmem.segment_detach(_ESH_SESSION_pthread_seg(idx)); - pthread_rwlockattr_destroy(&attr); - return rc; - } -#endif - if (0 != pthread_rwlock_init(_ESH_SESSION_pthread_rwlock(idx), &attr)) { - rc = PMIX_ERR_INIT; - pmix_pshmem.segment_detach(_ESH_SESSION_pthread_seg(idx)); - pthread_rwlockattr_destroy(&attr); - return rc; - } - if (0 != pthread_rwlockattr_destroy(&attr)) { - rc = PMIX_ERR_INIT; - return rc; - } - - } - else { - _ESH_SESSION_pthread_seg(idx)->seg_size = size; - snprintf(_ESH_SESSION_pthread_seg(idx)->seg_name, PMIX_PATH_MAX, "%s", _ESH_SESSION_lockfile(idx)); - if (PMIX_SUCCESS != (rc = pmix_pshmem.segment_attach(_ESH_SESSION_pthread_seg(idx), PMIX_PSHMEM_RW))) { - return rc; - } - _ESH_SESSION_pthread_rwlock(idx) = (pthread_rwlock_t *)_ESH_SESSION_pthread_seg(idx)->seg_base_addr; - } - - return rc; -} - -static inline void _rwlock_release(session_t *s) { - pmix_status_t rc; - - if (0 != pthread_rwlock_destroy(s->rwlock)) { - rc = PMIX_ERROR; - PMIX_ERROR_LOG(rc); - return; - } - - /* detach & unlink from current desc */ - if (s->rwlock_seg->seg_cpid == getpid()) { - pmix_pshmem.segment_unlink(s->rwlock_seg); - } - pmix_pshmem.segment_detach(s->rwlock_seg); - - free(s->rwlock_seg); - s->rwlock_seg = NULL; - s->rwlock = NULL; -} -#endif - -static inline int _esh_dir_del(const char *path) -{ - DIR *dir; - struct dirent *d_ptr; - struct stat st; - pmix_status_t rc = PMIX_SUCCESS; - - char name[PMIX_PATH_MAX]; - - dir = opendir(path); - if (NULL == dir) { - rc = PMIX_ERR_BAD_PARAM; - return rc; - } - - while (NULL != (d_ptr = readdir(dir))) { - snprintf(name, PMIX_PATH_MAX, "%s/%s", path, d_ptr->d_name); - if ( 0 > lstat(name, &st) ){ - /* No fatal error here - just log this event - * we will hit the error later at rmdir. Keep trying ... - */ - PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND); - continue; - } - - if(S_ISDIR(st.st_mode)) { - if(strcmp(d_ptr->d_name, ".") && strcmp(d_ptr->d_name, "..")) { - rc = _esh_dir_del(name); - if( PMIX_SUCCESS != rc ){ - /* No fatal error here - just log this event - * we will hit the error later at rmdir. Keep trying ... - */ - PMIX_ERROR_LOG(rc); - } - } - } - else { - if( 0 > unlink(name) ){ - /* No fatal error here - just log this event - * we will hit the error later at rmdir. Keep trying ... - */ - PMIX_ERROR_LOG(PMIX_ERR_NO_PERMISSIONS); - } - } - } - closedir(dir); - - /* remove the top dir */ - if( 0 > rmdir(path) ){ - rc = PMIX_ERR_NO_PERMISSIONS; - PMIX_ERROR_LOG(rc); - } - return rc; -} - -static inline int _esh_tbls_init(void) -{ - pmix_status_t rc = PMIX_SUCCESS; - size_t idx; - - /* initial settings */ - _ns_track_array = NULL; - _session_array = NULL; - _ns_map_array = NULL; - - /* Setup namespace tracking array */ - if (NULL == (_ns_track_array = PMIX_NEW(pmix_value_array_t))) { - rc = PMIX_ERR_OUT_OF_RESOURCE; - PMIX_ERROR_LOG(rc); - goto err_exit; - } - if (PMIX_SUCCESS != (rc = pmix_value_array_init(_ns_track_array, sizeof(ns_track_elem_t)))){ - PMIX_ERROR_LOG(rc); - goto err_exit; - } - - /* Setup sessions table */ - if (NULL == (_session_array = PMIX_NEW(pmix_value_array_t))){ - rc = PMIX_ERR_OUT_OF_RESOURCE; - PMIX_ERROR_LOG(rc); - goto err_exit; - } - if (PMIX_SUCCESS != (rc = pmix_value_array_init(_session_array, sizeof(session_t)))) { - PMIX_ERROR_LOG(rc); - goto err_exit; - } - if (PMIX_SUCCESS != (rc = pmix_value_array_set_size(_session_array, ESH_INIT_SESSION_TBL_SIZE))) { - PMIX_ERROR_LOG(rc); - goto err_exit; - } - for (idx = 0; idx < ESH_INIT_SESSION_TBL_SIZE; idx++) { - memset(pmix_value_array_get_item(_session_array, idx), 0, sizeof(session_t)); - } - - /* Setup namespace map array */ - if (NULL == (_ns_map_array = PMIX_NEW(pmix_value_array_t))) { - rc = PMIX_ERR_OUT_OF_RESOURCE; - PMIX_ERROR_LOG(rc); - goto err_exit; - } - if (PMIX_SUCCESS != (rc = pmix_value_array_init(_ns_map_array, sizeof(ns_map_t)))) { - PMIX_ERROR_LOG(rc); - goto err_exit; - } - if (PMIX_SUCCESS != (rc = pmix_value_array_set_size(_ns_map_array, ESH_INIT_NS_MAP_TBL_SIZE))) { - PMIX_ERROR_LOG(rc); - goto err_exit; - } - for (idx = 0; idx < ESH_INIT_NS_MAP_TBL_SIZE; idx++) { - _esh_session_map_clean(pmix_value_array_get_item(_ns_map_array, idx)); - } - - return PMIX_SUCCESS; -err_exit: - if (NULL != _ns_track_array) { - PMIX_RELEASE(_ns_track_array); - } - if (NULL != _session_array) { - PMIX_RELEASE(_session_array); - } - if (NULL != _ns_map_array) { - PMIX_RELEASE(_ns_map_array); - } - return rc; -} - -static inline void _esh_ns_map_cleanup(void) -{ - size_t idx; - size_t size; - ns_map_t *ns_map; - - if (NULL == _ns_map_array) { - return; - } - - size = pmix_value_array_get_size(_ns_map_array); - ns_map = PMIX_VALUE_ARRAY_GET_BASE(_ns_map_array, ns_map_t); - - for (idx = 0; idx < size; idx++) { - if(ns_map[idx].in_use) - _esh_session_map_clean(&ns_map[idx]); - } - - PMIX_RELEASE(_ns_map_array); - _ns_map_array = NULL; -} - -static inline void _esh_sessions_cleanup(void) -{ - size_t idx; - size_t size; - session_t *s_tbl; - - if (NULL == _session_array) { - return; - } - - size = pmix_value_array_get_size(_session_array); - s_tbl = PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t); - - for (idx = 0; idx < size; idx++) { - if(s_tbl[idx].in_use) - _esh_session_release(&s_tbl[idx]); - } - - PMIX_RELEASE(_session_array); - _session_array = NULL; -} - -static inline void _esh_ns_track_cleanup(void) -{ - int size; - ns_track_elem_t *ns_trk; - - if (NULL == _ns_track_array) { - return; - } - - size = pmix_value_array_get_size(_ns_track_array); - ns_trk = PMIX_VALUE_ARRAY_GET_BASE(_ns_track_array, ns_track_elem_t); - - for (int i = 0; i < size; i++) { - ns_track_elem_t *trk = ns_trk + i; - if (trk->in_use) { - PMIX_DESTRUCT(trk); - } - } - - PMIX_RELEASE(_ns_track_array); - _ns_track_array = NULL; -} - -static inline ns_map_data_t * _esh_session_map(const char *nspace, size_t tbl_idx) -{ - size_t map_idx; - size_t size = pmix_value_array_get_size(_ns_map_array);; - ns_map_t *ns_map = PMIX_VALUE_ARRAY_GET_BASE(_ns_map_array, ns_map_t);; - ns_map_t *new_map = NULL; - - if (NULL == nspace) { - PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); - return NULL; - } - - for(map_idx = 0; map_idx < size; map_idx++) { - if (!ns_map[map_idx].in_use) { - ns_map[map_idx].in_use = true; - strncpy(ns_map[map_idx].data.name, nspace, sizeof(ns_map[map_idx].data.name)-1); - ns_map[map_idx].data.tbl_idx = tbl_idx; - return &ns_map[map_idx].data; - } - } - - if (NULL == (new_map = pmix_value_array_get_item(_ns_map_array, map_idx))) { - PMIX_ERROR_LOG(PMIX_ERR_OUT_OF_RESOURCE); - return NULL; - } - - _esh_session_map_clean(new_map); - new_map->in_use = true; - new_map->data.tbl_idx = tbl_idx; - strncpy(new_map->data.name, nspace, sizeof(new_map->data.name)-1); - - return &new_map->data; -} - -static inline int _esh_jobuid_tbl_search(uid_t jobuid, size_t *tbl_idx) -{ - size_t idx, size; - session_t *session_tbl = NULL; - - size = pmix_value_array_get_size(_session_array); - session_tbl = PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t); - - for(idx = 0; idx < size; idx++) { - if (session_tbl[idx].in_use && session_tbl[idx].jobuid == jobuid) { - *tbl_idx = idx; - return PMIX_SUCCESS; - } - } - - return PMIX_ERR_NOT_FOUND; -} - -static inline int _esh_session_tbl_add(size_t *tbl_idx) -{ - size_t idx; - size_t size = pmix_value_array_get_size(_session_array); - session_t *s_tbl = PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t); - session_t *new_sesion; - pmix_status_t rc = PMIX_SUCCESS; - - for(idx = 0; idx < size; idx ++) { - if (0 == s_tbl[idx].in_use) { - s_tbl[idx].in_use = 1; - *tbl_idx = idx; - return PMIX_SUCCESS; - } - } - - if (NULL == (new_sesion = pmix_value_array_get_item(_session_array, idx))) { - rc = PMIX_ERR_OUT_OF_RESOURCE; - PMIX_ERROR_LOG(rc); - return rc; - } - s_tbl[idx].in_use = 1; - *tbl_idx = idx; - - return PMIX_SUCCESS; -} - -static inline ns_map_data_t * _esh_session_map_search_server(const char *nspace) -{ - size_t idx, size = pmix_value_array_get_size(_ns_map_array); - ns_map_t *ns_map = PMIX_VALUE_ARRAY_GET_BASE(_ns_map_array, ns_map_t); - if (NULL == nspace) { - return NULL; - } - - for (idx = 0; idx < size; idx++) { - if (ns_map[idx].in_use && - (0 == strcmp(ns_map[idx].data.name, nspace))) { - return &ns_map[idx].data; - } - } - return NULL; -} - -static inline ns_map_data_t * _esh_session_map_search_client(const char *nspace) -{ - size_t idx, size = pmix_value_array_get_size(_ns_map_array); - ns_map_t *ns_map = PMIX_VALUE_ARRAY_GET_BASE(_ns_map_array, ns_map_t); - - if (NULL == nspace) { - return NULL; - } - - for (idx = 0; idx < size; idx++) { - if (ns_map[idx].in_use && - (0 == strcmp(ns_map[idx].data.name, nspace))) { - return &ns_map[idx].data; - } - } - return _esh_session_map(nspace, 0); -} - -static inline int _esh_session_init(size_t idx, ns_map_data_t *m, size_t jobuid, int setjobuid) -{ - seg_desc_t *seg = NULL; - session_t *s = &(PMIX_VALUE_ARRAY_GET_ITEM(_session_array, session_t, idx)); - pmix_status_t rc = PMIX_SUCCESS; - - s->setjobuid = setjobuid; - s->jobuid = jobuid; - s->nspace_path = strdup(_base_path); - - /* create a lock file to prevent clients from reading while server is writing to the shared memory. - * This situation is quite often, especially in case of direct modex when clients might ask for data - * simultaneously.*/ - if(0 > asprintf(&s->lockfile, "%s/dstore_sm.lock", s->nspace_path)) { - rc = PMIX_ERR_OUT_OF_RESOURCE; - PMIX_ERROR_LOG(rc); - return rc; - } - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s _lockfile_name: %s", __FILE__, __LINE__, __func__, s->lockfile)); - - if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) { - if (0 != mkdir(s->nspace_path, 0770)) { - if (EEXIST != errno) { - pmix_output(0, "session init: can not create session directory \"%s\": %s", - s->nspace_path, strerror(errno)); - rc = PMIX_ERROR; - PMIX_ERROR_LOG(rc); - return rc; - } - } - if (s->setjobuid > 0){ - if (0 > chown(s->nspace_path, (uid_t) s->jobuid, (gid_t) -1)){ - rc = PMIX_ERROR; - PMIX_ERROR_LOG(rc); - return rc; - } - } - seg = _create_new_segment(INITIAL_SEGMENT, m, 0); - if( NULL == seg ){ - rc = PMIX_ERR_OUT_OF_RESOURCE; - PMIX_ERROR_LOG(rc); - return rc; - } - } - else { - seg = _attach_new_segment(INITIAL_SEGMENT, m, 0); - if( NULL == seg ){ - rc = PMIX_ERR_OUT_OF_RESOURCE; - PMIX_ERROR_LOG(rc); - return rc; - } - } - - if (NULL == _esh_lock_init) { - rc = PMIX_ERR_INIT; - PMIX_ERROR_LOG(rc); - return rc; - } - if ( PMIX_SUCCESS != (rc = _esh_lock_init(m->tbl_idx))) { - PMIX_ERROR_LOG(rc); - return rc; - } - - s->sm_seg_first = seg; - s->sm_seg_last = s->sm_seg_first; - return PMIX_SUCCESS; -} - -static inline void _esh_session_release(session_t *s) -{ - if (!s->in_use) { - return; - } - - _delete_sm_desc(s->sm_seg_first); - /* if the lock fd was somehow set, then we - * need to close it */ - if (0 != s->lockfd) { - close(s->lockfd); - } - - if (NULL != s->lockfile) { - if(PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) { - unlink(s->lockfile); - } - free(s->lockfile); - } - if (NULL != s->nspace_path) { - if(PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) { - _esh_dir_del(s->nspace_path); - } - free(s->nspace_path); - } -#ifdef ESH_PTHREAD_LOCK - _rwlock_release(s); -#endif - memset ((char *) s, 0, sizeof(*s)); -} - -static void _set_constants_from_env() -{ - char *str; - int page_size = _pmix_getpagesize(); - - if( NULL != (str = getenv(ESH_ENV_INITIAL_SEG_SIZE)) ) { - _initial_segment_size = strtoul(str, NULL, 10); - if ((size_t)page_size > _initial_segment_size) { - _initial_segment_size = (size_t)page_size; - } - } - if (0 == _initial_segment_size) { - _initial_segment_size = INITIAL_SEG_SIZE; - } - if( NULL != (str = getenv(ESH_ENV_NS_META_SEG_SIZE)) ) { - _meta_segment_size = strtoul(str, NULL, 10); - if ((size_t)page_size > _meta_segment_size) { - _meta_segment_size = (size_t)page_size; - } - } - if (0 == _meta_segment_size) { - _meta_segment_size = NS_META_SEG_SIZE; - } - if( NULL != (str = getenv(ESH_ENV_NS_DATA_SEG_SIZE)) ) { - _data_segment_size = strtoul(str, NULL, 10); - if ((size_t)page_size > _data_segment_size) { - _data_segment_size = (size_t)page_size; - } - } - if (0 == _data_segment_size) { - _data_segment_size = NS_DATA_SEG_SIZE; - } - if (NULL != (str = getenv(ESH_ENV_LINEAR))) { - if (1 == strtoul(str, NULL, 10)) { - _direct_mode = 1; - } - } - - _lock_segment_size = page_size; - _max_ns_num = (_initial_segment_size - sizeof(size_t) * 2) / sizeof(ns_seg_info_t); - _max_meta_elems = (_meta_segment_size - sizeof(size_t)) / sizeof(rank_meta_info); - -} - -static void _delete_sm_desc(seg_desc_t *desc) -{ - seg_desc_t *tmp; - - /* free all global segments */ - while (NULL != desc) { - tmp = desc->next; - /* detach & unlink from current desc */ - if (desc->seg_info.seg_cpid == getpid()) { - pmix_pshmem.segment_unlink(&desc->seg_info); - } - pmix_pshmem.segment_detach(&desc->seg_info); - free(desc); - desc = tmp; - } -} - -static int _pmix_getpagesize(void) -{ -#if defined(_SC_PAGESIZE ) - return sysconf(_SC_PAGESIZE); -#elif defined(_SC_PAGE_SIZE) - return sysconf(_SC_PAGE_SIZE); -#else - return 65536; /* safer to overestimate than under */ -#endif -} - -static seg_desc_t *_create_new_segment(segment_type type, const ns_map_data_t *ns_map, uint32_t id) -{ - pmix_status_t rc; - char file_name[PMIX_PATH_MAX]; - size_t size; - seg_desc_t *new_seg = NULL; - - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s: segment type %d, nspace %s, id %u", - __FILE__, __LINE__, __func__, type, ns_map->name, id)); - - switch (type) { - case INITIAL_SEGMENT: - size = _initial_segment_size; - snprintf(file_name, PMIX_PATH_MAX, "%s/initial-pmix_shared-segment-%u", - _ESH_SESSION_path(ns_map->tbl_idx), id); - break; - case NS_META_SEGMENT: - size = _meta_segment_size; - snprintf(file_name, PMIX_PATH_MAX, "%s/smseg-%s-%u", - _ESH_SESSION_path(ns_map->tbl_idx), ns_map->name, id); - break; - case NS_DATA_SEGMENT: - size = _data_segment_size; - snprintf(file_name, PMIX_PATH_MAX, "%s/smdataseg-%s-%d", - _ESH_SESSION_path(ns_map->tbl_idx), ns_map->name, id); - break; - default: - PMIX_ERROR_LOG(PMIX_ERROR); - return NULL; - } - new_seg = (seg_desc_t*)malloc(sizeof(seg_desc_t)); - if (new_seg) { - new_seg->id = id; - new_seg->next = NULL; - new_seg->type = type; - rc = pmix_pshmem.segment_create(&new_seg->seg_info, file_name, size); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - goto err_exit; - } - memset(new_seg->seg_info.seg_base_addr, 0, size); - - - if (_ESH_SESSION_setjobuid(ns_map->tbl_idx) > 0){ - rc = PMIX_ERR_PERM; - if (0 > chown(file_name, (uid_t) _ESH_SESSION_jobuid(ns_map->tbl_idx), (gid_t) -1)){ - PMIX_ERROR_LOG(rc); - goto err_exit; - } - /* set the mode as required */ - if (0 > chmod(file_name, S_IRUSR | S_IRGRP | S_IWGRP )) { - PMIX_ERROR_LOG(rc); - goto err_exit; - } - } - } - return new_seg; - -err_exit: - if( NULL != new_seg ){ - free(new_seg); - } - return NULL; -} - -static seg_desc_t *_attach_new_segment(segment_type type, const ns_map_data_t *ns_map, uint32_t id) -{ - pmix_status_t rc; - seg_desc_t *new_seg = NULL; - new_seg = (seg_desc_t*)malloc(sizeof(seg_desc_t)); - new_seg->id = id; - new_seg->next = NULL; - new_seg->type = type; - - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s: segment type %d, nspace %s, id %u", - __FILE__, __LINE__, __func__, type, ns_map->name, id)); - - switch (type) { - case INITIAL_SEGMENT: - new_seg->seg_info.seg_size = _initial_segment_size; - snprintf(new_seg->seg_info.seg_name, PMIX_PATH_MAX, "%s/initial-pmix_shared-segment-%u", - _ESH_SESSION_path(ns_map->tbl_idx), id); - break; - case NS_META_SEGMENT: - new_seg->seg_info.seg_size = _meta_segment_size; - snprintf(new_seg->seg_info.seg_name, PMIX_PATH_MAX, "%s/smseg-%s-%u", - _ESH_SESSION_path(ns_map->tbl_idx), ns_map->name, id); - break; - case NS_DATA_SEGMENT: - new_seg->seg_info.seg_size = _data_segment_size; - snprintf(new_seg->seg_info.seg_name, PMIX_PATH_MAX, "%s/smdataseg-%s-%d", - _ESH_SESSION_path(ns_map->tbl_idx), ns_map->name, id); - break; - default: - free(new_seg); - PMIX_ERROR_LOG(PMIX_ERROR); - return NULL; - } - rc = pmix_pshmem.segment_attach(&new_seg->seg_info, PMIX_PSHMEM_RONLY); - if (PMIX_SUCCESS != rc) { - free(new_seg); - new_seg = NULL; - PMIX_ERROR_LOG(rc); - } - return new_seg; -} - -/* This function synchronizes the content of initial shared segment and the local track list. */ -static int _update_ns_elem(ns_track_elem_t *ns_elem, ns_seg_info_t *info) -{ - seg_desc_t *seg, *tmp = NULL; - size_t i, offs; - ns_map_data_t *ns_map = NULL; - pmix_status_t rc; - - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s", - __FILE__, __LINE__, __func__)); - - if (NULL == (ns_map = _esh_session_map_search(info->ns_map.name))) { - rc = PMIX_ERR_NOT_AVAILABLE; - PMIX_ERROR_LOG(rc); - return rc; - } - - tmp = ns_elem->meta_seg; - if (NULL != tmp) { - while(NULL != tmp->next) { - tmp = tmp->next; - } - } - - /* synchronize number of meta segments for the target namespace. */ - for (i = ns_elem->num_meta_seg; i < info->num_meta_seg; i++) { - if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) { - seg = _create_new_segment(NS_META_SEGMENT, &info->ns_map, i); - if (NULL == seg) { - rc = PMIX_ERR_OUT_OF_RESOURCE; - PMIX_ERROR_LOG(rc); - return rc; - } - } else { - seg = _attach_new_segment(NS_META_SEGMENT, &info->ns_map, i); - if (NULL == seg) { - rc = PMIX_ERR_NOT_AVAILABLE; - PMIX_ERROR_LOG(rc); - return rc; - } - } - - if (NULL == tmp) { - ns_elem->meta_seg = seg; - } else { - tmp->next = seg; - } - tmp = seg; - ns_elem->num_meta_seg++; - } - - tmp = ns_elem->data_seg; - if (NULL != tmp) { - while(NULL != tmp->next) { - tmp = tmp->next; - } - } - /* synchronize number of data segments for the target namespace. */ - for (i = ns_elem->num_data_seg; i < info->num_data_seg; i++) { - if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) { - seg = _create_new_segment(NS_DATA_SEGMENT, &info->ns_map, i); - if (NULL == seg) { - rc = PMIX_ERR_OUT_OF_RESOURCE; - PMIX_ERROR_LOG(rc); - return rc; - } - offs = sizeof(size_t);//shift on offset field itself - memcpy(seg->seg_info.seg_base_addr, &offs, sizeof(size_t)); - } else { - seg = _attach_new_segment(NS_DATA_SEGMENT, &info->ns_map, i); - if (NULL == seg) { - rc = PMIX_ERR_NOT_AVAILABLE; - PMIX_ERROR_LOG(rc); - return rc; - } - } - - if (NULL == tmp) { - ns_elem->data_seg = seg; - } else { - tmp->next = seg; - } - tmp = seg; - ns_elem->num_data_seg++; - } - - return PMIX_SUCCESS; -} - -static seg_desc_t *extend_segment(seg_desc_t *segdesc, const ns_map_data_t *ns_map) -{ - seg_desc_t *tmp, *seg; - - PMIX_OUTPUT_VERBOSE((2, pmix_gds_base_framework.framework_output, - "%s:%d:%s", - __FILE__, __LINE__, __func__)); - /* find last segment */ - tmp = segdesc; - while (NULL != tmp->next) { - tmp = tmp->next; - } - /* create another segment, the old one is full. */ - seg = _create_new_segment(segdesc->type, ns_map, tmp->id + 1); - tmp->next = seg; - - return seg; -} - -static int _put_ns_info_to_initial_segment(const ns_map_data_t *ns_map, pmix_pshmem_seg_t *metaseg, pmix_pshmem_seg_t *dataseg) -{ - ns_seg_info_t elem; - size_t num_elems; - num_elems = *((size_t*)(_ESH_SESSION_sm_seg_last(ns_map->tbl_idx)->seg_info.seg_base_addr)); - seg_desc_t *last_seg = _ESH_SESSION_sm_seg_last(ns_map->tbl_idx); - pmix_status_t rc; - - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s", __FILE__, __LINE__, __func__)); - - if (_max_ns_num == num_elems) { - num_elems = 0; - if (NULL == (last_seg = extend_segment(last_seg, ns_map))) { - rc = PMIX_ERROR; - PMIX_ERROR_LOG(rc); - return rc; - } - /* mark previous segment as full */ - size_t full = 1; - memcpy((uint8_t*)(_ESH_SESSION_sm_seg_last(ns_map->tbl_idx)->seg_info.seg_base_addr + sizeof(size_t)), &full, sizeof(size_t)); - _ESH_SESSION_sm_seg_last(ns_map->tbl_idx) = last_seg; - memset(_ESH_SESSION_sm_seg_last(ns_map->tbl_idx)->seg_info.seg_base_addr, 0, _initial_segment_size); - } - memset(&elem.ns_map, 0, sizeof(elem.ns_map)); - strncpy(elem.ns_map.name, ns_map->name, sizeof(elem.ns_map.name)-1); - elem.ns_map.tbl_idx = ns_map->tbl_idx; - elem.num_meta_seg = 1; - elem.num_data_seg = 1; - memcpy((uint8_t*)(_ESH_SESSION_sm_seg_last(ns_map->tbl_idx)->seg_info.seg_base_addr) + sizeof(size_t) * 2 + num_elems * sizeof(ns_seg_info_t), - &elem, sizeof(ns_seg_info_t)); - num_elems++; - memcpy((uint8_t*)(_ESH_SESSION_sm_seg_last(ns_map->tbl_idx)->seg_info.seg_base_addr), &num_elems, sizeof(size_t)); - return PMIX_SUCCESS; -} - -/* clients should sync local info with information from initial segment regularly */ -static void _update_initial_segment_info(const ns_map_data_t *ns_map) -{ - seg_desc_t *tmp; - tmp = _ESH_SESSION_sm_seg_first(ns_map->tbl_idx); - - PMIX_OUTPUT_VERBOSE((2, pmix_gds_base_framework.framework_output, - "%s:%d:%s", __FILE__, __LINE__, __func__)); - - /* go through all global segments */ - do { - /* check if current segment was marked as full but no more next segment is in the chain */ - if (NULL == tmp->next && 1 == *((size_t*)((uint8_t*)(tmp->seg_info.seg_base_addr) + sizeof(size_t)))) { - tmp->next = _attach_new_segment(INITIAL_SEGMENT, ns_map, tmp->id+1); - } - tmp = tmp->next; - } - while (NULL != tmp); -} - -/* this function will be used by clients to get ns data from the initial segment and add them to the tracker list */ -static ns_seg_info_t *_get_ns_info_from_initial_segment(const ns_map_data_t *ns_map) -{ - pmix_status_t rc; - size_t i; - seg_desc_t *tmp; - ns_seg_info_t *elem, *cur_elem; - elem = NULL; - size_t num_elems; - - PMIX_OUTPUT_VERBOSE((2, pmix_gds_base_framework.framework_output, - "%s:%d:%s", __FILE__, __LINE__, __func__)); - - tmp = _ESH_SESSION_sm_seg_first(ns_map->tbl_idx); - - rc = 1; - /* go through all global segments */ - do { - num_elems = *((size_t*)(tmp->seg_info.seg_base_addr)); - for (i = 0; i < num_elems; i++) { - cur_elem = (ns_seg_info_t*)((uint8_t*)(tmp->seg_info.seg_base_addr) + sizeof(size_t) * 2 + i * sizeof(ns_seg_info_t)); - if (0 == (rc = strncmp(cur_elem->ns_map.name, ns_map->name, strlen(ns_map->name)+1))) { - break; - } - } - if (0 == rc) { - elem = cur_elem; - break; - } - tmp = tmp->next; - } - while (NULL != tmp); - return elem; -} - -static ns_track_elem_t *_get_track_elem_for_namespace(ns_map_data_t *ns_map) -{ - ns_track_elem_t *new_elem = NULL; - size_t size = pmix_value_array_get_size(_ns_track_array); - - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s: nspace %s", - __FILE__, __LINE__, __func__, ns_map->name)); - - /* check if this namespace is already being tracked to avoid duplicating data. */ - if (ns_map->track_idx >= 0) { - if ((ns_map->track_idx + 1) > (int)size) { - return NULL; - } - /* data for this namespace should be already stored in shared memory region. */ - /* so go and just put new data. */ - return pmix_value_array_get_item(_ns_track_array, ns_map->track_idx); - } - - /* create shared memory regions for this namespace and store its info locally - * to operate with address and detach/unlink afterwards. */ - if (NULL == (new_elem = pmix_value_array_get_item(_ns_track_array, size))) { - return NULL; - } - PMIX_CONSTRUCT(new_elem, ns_track_elem_t); - strncpy(new_elem->ns_map.name, ns_map->name, sizeof(new_elem->ns_map.name)-1); - /* save latest track idx to info of nspace */ - ns_map->track_idx = size; - - return new_elem; -} - -static rank_meta_info *_get_rank_meta_info(pmix_rank_t rank, seg_desc_t *segdesc) -{ - size_t i; - rank_meta_info *elem = NULL; - seg_desc_t *tmp = segdesc; - size_t num_elems, rel_offset; - int id; - rank_meta_info *cur_elem; - - size_t rcount = rank == PMIX_RANK_WILDCARD ? 0 : rank + 1; - - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s", - __FILE__, __LINE__, __func__)); - - if (1 == _direct_mode) { - /* do linear search to find the requested rank inside all meta segments - * for this namespace. */ - /* go through all existing meta segments for this namespace */ - do { - num_elems = *((size_t*)(tmp->seg_info.seg_base_addr)); - for (i = 0; i < num_elems; i++) { - cur_elem = (rank_meta_info*)((uint8_t*)(tmp->seg_info.seg_base_addr) + sizeof(size_t) + i * sizeof(rank_meta_info)); - if (rcount == cur_elem->rank) { - elem = cur_elem; - break; - } - } - tmp = tmp->next; - } - while (NULL != tmp && NULL == elem); - } else { - /* directly compute index of meta segment (id) and relative offset (rel_offset) - * inside this segment for fast lookup a rank_meta_info object for the requested rank. */ - id = rcount/_max_meta_elems; - rel_offset = (rcount%_max_meta_elems) * sizeof(rank_meta_info) + sizeof(size_t); - /* go through all existing meta segments for this namespace. - * Stop at id number if it exists. */ - while (NULL != tmp->next && 0 != id) { - tmp = tmp->next; - id--; - } - if (0 == id) { - /* the segment is found, looking for data for the target rank. */ - elem = (rank_meta_info*)((uint8_t*)(tmp->seg_info.seg_base_addr) + rel_offset); - if ( 0 == elem->offset) { - /* offset can never be 0, it means that there is no data for this rank yet. */ - elem = NULL; - } - } - } - return elem; -} - -static int set_rank_meta_info(ns_track_elem_t *ns_info, rank_meta_info *rinfo) -{ - /* it's claimed that there is still no meta info for this rank stored */ - seg_desc_t *tmp; - size_t num_elems, rel_offset; - int id, count; - rank_meta_info *cur_elem; - - if (!ns_info || !rinfo) { - PMIX_ERROR_LOG(PMIX_ERROR); - return PMIX_ERROR; - } - - PMIX_OUTPUT_VERBOSE((2, pmix_gds_base_framework.framework_output, - "%s:%d:%s: nspace %s, add rank %lu offset %lu count %lu meta info", - __FILE__, __LINE__, __func__, - ns_info->ns_map.name, (unsigned long)rinfo->rank, - (unsigned long)rinfo->offset, (unsigned long)rinfo->count)); - - tmp = ns_info->meta_seg; - if (1 == _direct_mode) { - /* get the last meta segment to put new rank_meta_info at the end. */ - while (NULL != tmp->next) { - tmp = tmp->next; - } - num_elems = *((size_t*)(tmp->seg_info.seg_base_addr)); - if (_max_meta_elems <= num_elems) { - PMIX_OUTPUT_VERBOSE((2, pmix_gds_base_framework.framework_output, - "%s:%d:%s: extend meta segment for nspace %s", - __FILE__, __LINE__, __func__, ns_info->ns_map.name)); - /* extend meta segment, so create a new one */ - tmp = extend_segment(tmp, &ns_info->ns_map); - if (NULL == tmp) { - PMIX_ERROR_LOG(PMIX_ERROR); - return PMIX_ERROR; - } - ns_info->num_meta_seg++; - memset(tmp->seg_info.seg_base_addr, 0, sizeof(rank_meta_info)); - /* update number of meta segments for namespace in initial_segment */ - ns_seg_info_t *elem = _get_ns_info_from_initial_segment(&ns_info->ns_map); - if (NULL == elem) { - PMIX_ERROR_LOG(PMIX_ERROR); - return PMIX_ERROR; - } - if (ns_info->num_meta_seg != elem->num_meta_seg) { - elem->num_meta_seg = ns_info->num_meta_seg; - } - num_elems = 0; - } - cur_elem = (rank_meta_info*)((uint8_t*)(tmp->seg_info.seg_base_addr) + sizeof(size_t) + num_elems * sizeof(rank_meta_info)); - memcpy(cur_elem, rinfo, sizeof(rank_meta_info)); - num_elems++; - memcpy(tmp->seg_info.seg_base_addr, &num_elems, sizeof(size_t)); - } else { - /* directly compute index of meta segment (id) and relative offset (rel_offset) - * inside this segment for fast lookup a rank_meta_info object for the requested rank. */ - size_t rcount = rinfo->rank == PMIX_RANK_WILDCARD ? 0 : rinfo->rank + 1; - id = rcount/_max_meta_elems; - rel_offset = (rcount % _max_meta_elems) * sizeof(rank_meta_info) + sizeof(size_t); - count = id; - /* go through all existing meta segments for this namespace. - * Stop at id number if it exists. */ - while (NULL != tmp->next && 0 != count) { - tmp = tmp->next; - count--; - } - /* if there is no segment with this id, then create all missing segments till the id number. */ - if ((int)ns_info->num_meta_seg < (id+1)) { - while ((int)ns_info->num_meta_seg != (id+1)) { - /* extend meta segment, so create a new one */ - tmp = extend_segment(tmp, &ns_info->ns_map); - if (NULL == tmp) { - PMIX_ERROR_LOG(PMIX_ERROR); - return PMIX_ERROR; - } - memset(tmp->seg_info.seg_base_addr, 0, sizeof(rank_meta_info)); - ns_info->num_meta_seg++; - } - /* update number of meta segments for namespace in initial_segment */ - ns_seg_info_t *elem = _get_ns_info_from_initial_segment(&ns_info->ns_map); - if (NULL == elem) { - PMIX_ERROR_LOG(PMIX_ERROR); - return PMIX_ERROR; - } - if (ns_info->num_meta_seg != elem->num_meta_seg) { - elem->num_meta_seg = ns_info->num_meta_seg; - } - } - /* store rank_meta_info object by rel_offset. */ - cur_elem = (rank_meta_info*)((uint8_t*)(tmp->seg_info.seg_base_addr) + rel_offset); - memcpy(cur_elem, rinfo, sizeof(rank_meta_info)); - } - return PMIX_SUCCESS; -} - -static uint8_t *_get_data_region_by_offset(seg_desc_t *segdesc, size_t offset) -{ - seg_desc_t *tmp = segdesc; - size_t rel_offset = offset; - uint8_t *dataaddr = NULL; - - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s", - __FILE__, __LINE__, __func__)); - - /* go through all existing data segments for this namespace */ - do { - if (rel_offset >= _data_segment_size) { - rel_offset -= _data_segment_size; - } else { - dataaddr = tmp->seg_info.seg_base_addr + rel_offset; - } - tmp = tmp->next; - } while (NULL != tmp && NULL == dataaddr); - - return dataaddr; -} - -static size_t get_free_offset(seg_desc_t *data_seg) -{ - size_t offset; - seg_desc_t *tmp; - int id = 0; - tmp = data_seg; - /* first find the last data segment */ - while (NULL != tmp->next) { - tmp = tmp->next; - id++; - } - offset = *((size_t*)(tmp->seg_info.seg_base_addr)); - if (0 == offset) { - /* this is the first created data segment, the first 8 bytes are used to place the free offset value itself */ - offset = sizeof(size_t); - } - return (id * _data_segment_size + offset); -} - -static int put_empty_ext_slot(seg_desc_t *dataseg) -{ - size_t global_offset, rel_offset, data_ended, val = 0; - uint8_t *addr; - global_offset = get_free_offset(dataseg); - rel_offset = global_offset % _data_segment_size; - if (rel_offset + EXT_SLOT_SIZE() > _data_segment_size) { - PMIX_ERROR_LOG(PMIX_ERROR); - return PMIX_ERROR; - } - addr = _get_data_region_by_offset(dataseg, global_offset); - ESH_PUT_KEY(addr, ESH_REGION_EXTENSION, (void*)&val, sizeof(size_t)); - - /* update offset at the beginning of current segment */ - data_ended = rel_offset + EXT_SLOT_SIZE(); - addr = (uint8_t*)(addr - rel_offset); - memcpy(addr, &data_ended, sizeof(size_t)); - return PMIX_SUCCESS; -} - -static size_t put_data_to_the_end(ns_track_elem_t *ns_info, seg_desc_t *dataseg, char *key, void *buffer, size_t size) -{ - size_t offset, id = 0; - seg_desc_t *tmp; - size_t global_offset, data_ended; - uint8_t *addr; - - PMIX_OUTPUT_VERBOSE((2, pmix_gds_base_framework.framework_output, - "%s:%d:%s: key %s", - __FILE__, __LINE__, __func__, key)); - - tmp = dataseg; - while (NULL != tmp->next) { - tmp = tmp->next; - id++; - } - global_offset = get_free_offset(dataseg); - offset = global_offset % _data_segment_size; - - /* We should provide additional space at the end of segment to - * place EXTENSION_SLOT to have an ability to enlarge data for this rank.*/ - if ((sizeof(size_t) + ESH_KEY_SIZE(key, size) + EXT_SLOT_SIZE()) > _data_segment_size) { - /* this is an error case: segment is so small that cannot place evem a single key-value pair. - * warn a user about it and fail. */ - offset = 0; /* offset cannot be 0 in normal case, so we use this value to indicate a problem. */ - pmix_output(0, "PLEASE set NS_DATA_SEG_SIZE to value which is larger when %lu.", - (unsigned long)(sizeof(size_t) + strlen(key) + 1 + sizeof(size_t) + size + EXT_SLOT_SIZE())); - return offset; - } - - /* check the corner case that was observed at large scales: - * https://github.com/pmix/master/pull/282#issuecomment-277454198 - * - * if last time we stopped exactly on the border of the segment - * new segment wasn't allocated to us but (global_offset % _data_segment_size) == 0 - * so if offset is 0 here - we need to allocate the segment as well - */ - if ( (0 == offset) || ( (offset + ESH_KEY_SIZE(key, size) + EXT_SLOT_SIZE()) > _data_segment_size) ) { - id++; - /* create a new data segment. */ - tmp = extend_segment(tmp, &ns_info->ns_map); - if (NULL == tmp) { - PMIX_ERROR_LOG(PMIX_ERR_NOMEM); - offset = 0; /* offset cannot be 0 in normal case, so we use this value to indicate a problem. */ - return offset; - } - ns_info->num_data_seg++; - /* update_ns_info_in_initial_segment */ - ns_seg_info_t *elem = _get_ns_info_from_initial_segment(&ns_info->ns_map); - if (NULL == elem) { - PMIX_ERROR_LOG(PMIX_ERR_NOMEM); - offset = 0; /* offset cannot be 0 in normal case, so we use this value to indicate a problem. */ - return offset; - } - elem->num_data_seg++; - offset = sizeof(size_t); - } - global_offset = offset + id * _data_segment_size; - addr = (uint8_t*)(tmp->seg_info.seg_base_addr)+offset; - ESH_PUT_KEY(addr, key, buffer, size); - - /* update offset at the beginning of current segment */ - data_ended = offset + ESH_KEY_SIZE(key, size); - addr = (uint8_t*)(tmp->seg_info.seg_base_addr); - memcpy(addr, &data_ended, sizeof(size_t)); - PMIX_OUTPUT_VERBOSE((1, pmix_gds_base_framework.framework_output, - "%s:%d:%s: key %s, rel start offset %lu, rel end offset %lu, abs shift %lu size %lu", - __FILE__, __LINE__, __func__, - key, (unsigned long)offset, - (unsigned long)data_ended, - (unsigned long)(id * _data_segment_size), - (unsigned long)size)); - return global_offset; -} - -static int pmix_sm_store(ns_track_elem_t *ns_info, pmix_rank_t rank, pmix_kval_t *kval, rank_meta_info **rinfo, int data_exist) -{ - size_t offset, size, kval_cnt; - pmix_buffer_t buffer; - pmix_status_t rc; - seg_desc_t *datadesc; - uint8_t *addr; - - PMIX_OUTPUT_VERBOSE((2, pmix_gds_base_framework.framework_output, - "%s:%d:%s: for rank %u, replace flag %d", - __FILE__, __LINE__, __func__, rank, data_exist)); - - datadesc = ns_info->data_seg; - /* pack value to the buffer */ - PMIX_CONSTRUCT(&buffer, pmix_buffer_t); - PMIX_BFROPS_PACK(rc, _client_peer(), &buffer, kval->value, 1, PMIX_VALUE); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - goto exit; - } - size = buffer.bytes_used; - - if (0 == data_exist) { - /* there is no data blob for this rank yet, so add it. */ - size_t free_offset; - free_offset = get_free_offset(datadesc); - offset = put_data_to_the_end(ns_info, datadesc, kval->key, buffer.base_ptr, size); - if (0 == offset) { - /* this is an error */ - rc = PMIX_ERROR; - PMIX_ERROR_LOG(rc); - goto exit; - } - /* if it's the first time when we put data for this rank, then *rinfo == NULL, - * and even if segment was extended, and data was put into the next segment, - * we don't need to extension slot at the end of previous segment. - * If we try, we might overwrite other segments memory, - * because previous segment is already full. */ - if (free_offset != offset && NULL != *rinfo) { - /* here we compare previous free offset with the offset where we just put data. - * It should be equal in the normal case. It it's not true, then it means that - * segment was extended, and we put data to the next segment, so we now need to - * put extension slot at the end of previous segment with a "reference" to a new_offset */ - addr = _get_data_region_by_offset(datadesc, free_offset); - ESH_PUT_KEY(addr, ESH_REGION_EXTENSION, (void*)&offset, sizeof(size_t)); - } - if (NULL == *rinfo) { - *rinfo = (rank_meta_info*)malloc(sizeof(rank_meta_info)); - (*rinfo)->rank = rank; - (*rinfo)->offset = offset; - (*rinfo)->count = 0; - } - (*rinfo)->count++; - } else if (NULL != *rinfo) { - /* there is data blob for this rank */ - addr = _get_data_region_by_offset(datadesc, (*rinfo)->offset); - if (NULL == addr) { - rc = PMIX_ERROR; - PMIX_ERROR_LOG(rc); - goto exit; - } - /* go through previous data region and find key matches. - * If one is found, then mark this kval as invalidated. - * Then put a new empty offset to the next extension slot, - * and add new kval by this offset. - * no need to update meta info, it's still the same. */ - kval_cnt = (*rinfo)->count; - int add_to_the_end = 1; - while (0 < kval_cnt) { - /* data is stored in the following format: - * size_t size - * key[ESH_KNAME_LEN(addr)] - * byte buffer containing pmix_value, should be loaded to pmix_buffer_t and unpacked. - * next kval pair - * ..... - * extension slot which has key = EXTENSION_SLOT and a size_t value for offset to next data address for this process. - */ - if (0 == strncmp(ESH_KNAME_PTR(addr), ESH_REGION_EXTENSION, ESH_KNAME_LEN(ESH_REGION_EXTENSION))) { - memcpy(&offset, ESH_DATA_PTR(addr), sizeof(size_t)); - if (0 < offset) { - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s: for rank %lu, replace flag %d %s is filled with %lu value", - __FILE__, __LINE__, __func__, - (unsigned long)rank, data_exist, - ESH_REGION_EXTENSION, (unsigned long)offset)); - /* go to next item, updating address */ - addr = _get_data_region_by_offset(datadesc, offset); - if (NULL == addr) { - rc = PMIX_ERROR; - PMIX_ERROR_LOG(rc); - goto exit; - } - } else { - /* should not be, we should be out of cycle when this happens */ - } - } else if (0 == strncmp(ESH_KNAME_PTR(addr), kval->key, ESH_KNAME_LEN(kval->key))) { - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s: for rank %u, replace flag %d found target key %s", - __FILE__, __LINE__, __func__, rank, data_exist, kval->key)); - /* target key is found, compare value sizes */ - if (ESH_DATA_SIZE(addr, ESH_DATA_PTR(addr)) != size) { - //if (1) { /* if we want to test replacing values for existing keys. */ - /* invalidate current value and store another one at the end of data region. */ - strncpy(ESH_KNAME_PTR(addr), ESH_REGION_INVALIDATED, ESH_KNAME_LEN(ESH_REGION_INVALIDATED)); - /* decrementing count, it will be incremented back when we add a new value for this key at the end of region. */ - (*rinfo)->count--; - kval_cnt--; - /* go to next item, updating address */ - addr += ESH_KV_SIZE(addr); - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s: for rank %u, replace flag %d mark key %s regions as invalidated. put new data at the end.", - __FILE__, __LINE__, __func__, rank, data_exist, kval->key)); - } else { - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s: for rank %u, replace flag %d replace data for key %s type %d in place", - __FILE__, __LINE__, __func__, rank, data_exist, kval->key, kval->value->type)); - /* replace old data with new one. */ - memset(ESH_DATA_PTR(addr), 0, ESH_DATA_SIZE(addr, ESH_DATA_PTR(addr))); - memcpy(ESH_DATA_PTR(addr), buffer.base_ptr, size); - addr += ESH_KV_SIZE(addr); - add_to_the_end = 0; - break; - } - } else { - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s: for rank %u, replace flag %d skip %s key, look for %s key", - __FILE__, __LINE__, __func__, rank, data_exist, ESH_KNAME_PTR(addr), kval->key)); - /* Skip it: key is "INVALIDATED" or key is valid but different from target one. */ - if (0 != strncmp(ESH_REGION_INVALIDATED, ESH_KNAME_PTR(addr), ESH_KNAME_LEN(ESH_KNAME_PTR(addr)))) { - /* count only valid items */ - kval_cnt--; - } - /* go to next item, updating address */ - addr += ESH_KV_SIZE(addr); - } - } - if (1 == add_to_the_end) { - /* if we get here, it means that we want to add a new item for the target rank, or - * we mark existing item with the same key as "invalidated" and want to add new item - * for the same key. */ - size_t free_offset; - (*rinfo)->count++; - free_offset = get_free_offset(datadesc); - /* add to the end */ - offset = put_data_to_the_end(ns_info, datadesc, kval->key, buffer.base_ptr, size); - if (0 == offset) { - rc = PMIX_ERROR; - PMIX_ERROR_LOG(rc); - goto exit; - } - /* we just reached the end of data for the target rank, and there can be two cases: - * (1) - we are in the middle of data segment; data for this rank is separated from - * data for different ranks, and that's why next element is EXTENSION_SLOT. - * We put new data to the end of data region and just update EXTENSION_SLOT value by new offset. - */ - if (0 == strncmp(ESH_KNAME_PTR(addr), ESH_REGION_EXTENSION, ESH_KNAME_LEN(ESH_REGION_EXTENSION))) { - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s: for rank %u, replace flag %d %s should be filled with offset %lu value", - __FILE__, __LINE__, __func__, rank, data_exist, ESH_REGION_EXTENSION, offset)); - memcpy(ESH_DATA_PTR(addr), &offset, sizeof(size_t)); - } else { - /* (2) - we point to the first free offset, no more data is stored further in this segment. - * There is no EXTENSION_SLOT by this addr since we continue pushing data for the same rank, - * and there is no need to split it. - * But it's possible that we reached the end of current data region and just jumped to the new region - * to put new data, in that case free_offset != offset and we must put EXTENSION_SLOT by the current addr - * forcibly and store new offset in its value. */ - if (free_offset != offset) { - /* segment was extended, need to put extension slot by free_offset indicating new_offset */ - ESH_PUT_KEY(addr, ESH_REGION_EXTENSION, (void*)&offset, sizeof(size_t)); - } - } - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s: for rank %u, replace flag %d item not found ext slot empty, put key %s to the end", - __FILE__, __LINE__, __func__, rank, data_exist, kval->key)); - } - } -exit: - PMIX_DESTRUCT(&buffer); - return rc; -} - -static int _store_data_for_rank(ns_track_elem_t *ns_info, pmix_rank_t rank, pmix_buffer_t *buf) -{ - pmix_status_t rc; - - pmix_kval_t *kp; - seg_desc_t *metadesc, *datadesc; - int32_t cnt; - - rank_meta_info *rinfo = NULL; - size_t num_elems, free_offset, new_free_offset; - int data_exist; - - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s: for rank %u", __FILE__, __LINE__, __func__, rank)); - - metadesc = ns_info->meta_seg; - datadesc = ns_info->data_seg; - - if (NULL == datadesc || NULL == metadesc) { - rc = PMIX_ERR_BAD_PARAM; - PMIX_ERROR_LOG(rc); - return rc; - } - - num_elems = *((size_t*)(metadesc->seg_info.seg_base_addr)); - data_exist = 0; - /* when we don't use linear search (_direct_mode ==0 ) we don't use num_elems field, - * so anyway try to get rank_meta_info first. */ - if (0 < num_elems || 0 == _direct_mode) { - /* go through all elements in meta segment and look for target rank. */ - rinfo = _get_rank_meta_info(rank, metadesc); - if (NULL != rinfo) { - data_exist = 1; - } - } - /* incoming buffer may contain several inner buffers for different scopes, - * so unpack these buffers, and then unpack kvals from each modex buffer, - * storing them in the shared memory dstore. - */ - free_offset = get_free_offset(datadesc); - cnt = 1; - kp = PMIX_NEW(pmix_kval_t); - PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer, buf, kp, &cnt, PMIX_KVAL); - while(PMIX_SUCCESS == rc) { - pmix_output_verbose(2, pmix_gds_base_framework.framework_output, - "pmix: unpacked key %s", kp->key); - if (PMIX_SUCCESS != (rc = pmix_sm_store(ns_info, rank, kp, &rinfo, data_exist))) { - PMIX_ERROR_LOG(rc); - if (NULL != rinfo) { - free(rinfo); - } - return rc; - } - PMIX_RELEASE(kp); // maintain acctg - hash_store does a retain - cnt = 1; - kp = PMIX_NEW(pmix_kval_t); - PMIX_BFROPS_UNPACK(rc, pmix_globals.mypeer, buf, kp, &cnt, PMIX_KVAL); - } - - PMIX_RELEASE(kp); - - if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - PMIX_ERROR_LOG(rc); - /* TODO: should we error-exit here? */ - } else { - rc = PMIX_SUCCESS; - } - - /* Check if new data was put at the end of data segment. - * It's possible that old data just was replaced with new one, - * in that case we don't reserve space for EXTENSION_SLOT, it's - * already reserved. - * */ - new_free_offset = get_free_offset(datadesc); - if (new_free_offset != free_offset) { - /* Reserve space for EXTENSION_SLOT at the end of data blob. - * We need it to split data for one rank from data for different - * ranks and to allow extending data further. - * We also put EXTENSION_SLOT at the end of each data segment, and - * its value points to the beginning of next data segment. - * */ - rc = put_empty_ext_slot(ns_info->data_seg); - if (PMIX_SUCCESS != rc) { - if ((0 == data_exist) && NULL != rinfo) { - free(rinfo); - } - PMIX_ERROR_LOG(rc); - return rc; - } - } - - /* if this is the first data posted for this rank, then - * update meta info for it */ - if (0 == data_exist) { - set_rank_meta_info(ns_info, rinfo); - if (NULL != rinfo) { - free(rinfo); - } - } - - return rc; -} - -static inline ssize_t _get_univ_size(const char *nspace) -{ - ssize_t nprocs = 0; - pmix_value_t *val; - int rc; - - rc = _dstore_fetch(nspace, PMIX_RANK_WILDCARD, PMIX_UNIV_SIZE, &val); - if( PMIX_SUCCESS != rc ) { - PMIX_ERROR_LOG(rc); - return rc; - } - if( val->type != PMIX_UINT32 ){ - rc = PMIX_ERR_BAD_PARAM; - PMIX_ERROR_LOG(rc); - return rc; - } - nprocs = (ssize_t)val->data.uint32; - PMIX_VALUE_RELEASE(val); - return nprocs; -} - -static pmix_status_t dstore_cache_job_info(struct pmix_nspace_t *ns, - pmix_info_t info[], size_t ninfo) -{ - return PMIX_SUCCESS; -} - -static pmix_status_t dstore_init(pmix_info_t info[], size_t ninfo) -{ - pmix_status_t rc; - size_t n; - char *dstor_tmpdir = NULL; - size_t tbl_idx=0; - ns_map_data_t *ns_map = NULL; - - pmix_output_verbose(2, pmix_gds_base_framework.framework_output, - "pmix:gds:dstore init"); - - /* open the pshmem and select the active plugins */ - if( PMIX_SUCCESS != (rc = pmix_mca_base_framework_open(&pmix_pshmem_base_framework, 0)) ) { - PMIX_ERROR_LOG(rc); - goto err_exit; - } - if( PMIX_SUCCESS != (rc = pmix_pshmem_base_select()) ) { - PMIX_ERROR_LOG(rc); - goto err_exit; - } - - _jobuid = getuid(); - _setjobuid = 0; - -#ifdef ESH_PTHREAD_LOCK - _esh_lock_init = _rwlock_init; -#endif -#ifdef ESH_FCNTL_LOCK - _esh_lock_init = _flock_init; -#endif - - if (PMIX_SUCCESS != (rc = _esh_tbls_init())) { - PMIX_ERROR_LOG(rc); - goto err_exit; - } - - rc = pmix_pshmem.init(); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - goto err_exit; - } - - _set_constants_from_env(); - - if (NULL != _base_path) { - free(_base_path); - _base_path = NULL; - } - - /* find the temp dir */ - if (PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) { - _esh_session_map_search = _esh_session_map_search_server; - - /* scan incoming info for directives */ - if (NULL != info) { - for (n=0; n < ninfo; n++) { - if (0 == strcmp(PMIX_USERID, info[n].key)) { - _jobuid = info[n].value.data.uint32; - _setjobuid = 1; - continue; - } - if (0 == strcmp(PMIX_DSTPATH, info[n].key)) { - /* PMIX_DSTPATH is the way for RM to customize the - * place where shared memory files are placed. - * We need this for the following reasons: - * - disk usage: files can be relatively large and the system may - * have a small common temp directory. - * - performance: system may have a fast IO device (i.e. burst buffer) - * for the local usage. - * - * PMIX_DSTPATH has higher priority than PMIX_SERVER_TMPDIR - */ - if( PMIX_STRING != info[n].value.type ){ - rc = PMIX_ERR_BAD_PARAM; - PMIX_ERROR_LOG(rc); - goto err_exit; - } - dstor_tmpdir = (char*)info[n].value.data.string; - continue; - } - if (0 == strcmp(PMIX_SERVER_TMPDIR, info[n].key)) { - if( PMIX_STRING != info[n].value.type ){ - rc = PMIX_ERR_BAD_PARAM; - PMIX_ERROR_LOG(rc); - goto err_exit; - } - if (NULL == dstor_tmpdir) { - dstor_tmpdir = (char*)info[n].value.data.string; - } - continue; - } - } - } - - if (NULL == dstor_tmpdir) { - if (NULL == (dstor_tmpdir = getenv("TMPDIR"))) { - if (NULL == (dstor_tmpdir = getenv("TEMP"))) { - if (NULL == (dstor_tmpdir = getenv("TMP"))) { - dstor_tmpdir = "/tmp"; - } - } - } - } - - rc = asprintf(&_base_path, "%s/pmix_dstor_%d", dstor_tmpdir, getpid()); - if ((0 > rc) || (NULL == _base_path)) { - rc = PMIX_ERR_OUT_OF_RESOURCE; - PMIX_ERROR_LOG(rc); - goto err_exit; - } - - if (0 != mkdir(_base_path, 0770)) { - if (EEXIST != errno) { - rc = PMIX_ERROR; - PMIX_ERROR_LOG(rc); - goto err_exit; - } - } - if (_setjobuid > 0) { - if (chown(_base_path, (uid_t) _jobuid, (gid_t) -1) < 0){ - rc = PMIX_ERR_NO_PERMISSIONS; - PMIX_ERROR_LOG(rc); - goto err_exit; - } - } - _esh_session_map_search = _esh_session_map_search_server; - return PMIX_SUCCESS; - } - /* for clients */ - else { - if (NULL == (dstor_tmpdir = getenv(PMIX_DSTORE_ESH_BASE_PATH))){ - return PMIX_ERR_NOT_AVAILABLE; // simply disqualify ourselves - } - if (NULL == (_base_path = strdup(dstor_tmpdir))) { - rc = PMIX_ERR_OUT_OF_RESOURCE; - PMIX_ERROR_LOG(rc); - goto err_exit; - } - _esh_session_map_search = _esh_session_map_search_client; - } - - rc = _esh_session_tbl_add(&tbl_idx); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - goto err_exit; - } - - ns_map = _esh_session_map(pmix_globals.myid.nspace, tbl_idx); - if (NULL == ns_map) { - rc = PMIX_ERR_OUT_OF_RESOURCE; - PMIX_ERROR_LOG(rc); - goto err_exit; - } - - if (PMIX_SUCCESS != (rc =_esh_session_init(tbl_idx, ns_map, _jobuid, _setjobuid))) { - PMIX_ERROR_LOG(rc); - goto err_exit; - } - - return PMIX_SUCCESS; -err_exit: - return rc; -} - -static void dstore_finalize(void) -{ - struct stat st = {0}; - pmix_status_t rc = PMIX_SUCCESS; - - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s", __FILE__, __LINE__, __func__)); - - _esh_sessions_cleanup(); - _esh_ns_map_cleanup(); - _esh_ns_track_cleanup(); - - pmix_pshmem.finalize(); - - if (NULL != _base_path){ - if(PMIX_PROC_IS_SERVER(pmix_globals.mypeer)) { - if (lstat(_base_path, &st) >= 0){ - if (PMIX_SUCCESS != (rc = _esh_dir_del(_base_path))) { - PMIX_ERROR_LOG(rc); - } - } - } - free(_base_path); - _base_path = NULL; - } - if (NULL != _clients_peer) { - PMIX_RELEASE(_clients_peer->nptr); - PMIX_RELEASE(_clients_peer); - } -} - -static pmix_status_t _dstore_store(const char *nspace, - pmix_rank_t rank, - pmix_kval_t *kv) -{ - pmix_status_t rc = PMIX_SUCCESS, tmp_rc; - ns_track_elem_t *elem; - pmix_buffer_t xfer; - ns_seg_info_t ns_info; - ns_map_data_t *ns_map = NULL; - - if (NULL == kv) { - return PMIX_ERROR; - } - - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s: for %s:%u", - __FILE__, __LINE__, __func__, nspace, rank)); - - if (NULL == (ns_map = _esh_session_map_search(nspace))) { - rc = PMIX_ERROR; - PMIX_ERROR_LOG(rc); - return rc; - } - - /* set exclusive lock */ - if (PMIX_SUCCESS != (rc = _ESH_WRLOCK(_ESH_SESSION_lock(ns_map->tbl_idx)))) { - PMIX_ERROR_LOG(rc); - return rc; - } - - /* First of all, we go through local track list (list of ns_track_elem_t structures) - * and look for an element for the target namespace. - * If it is there, then shared memory segments for it are created, so we take it. - * Otherwise, create a new element, fill its fields, create corresponding meta - * and data segments for this namespace, add it to the local track list, - * and put this info (ns_seg_info_t) to the initial segment. If initial segment - * if full, then extend it by creating a new one and mark previous one as full. - * All this stuff is done inside _get_track_elem_for_namespace function. - */ - - elem = _get_track_elem_for_namespace(ns_map); - if (NULL == elem) { - rc = PMIX_ERR_OUT_OF_RESOURCE; - PMIX_ERROR_LOG(rc); - goto err_exit; - } - - /* If a new element was just created, we need to create corresponding meta and - * data segments and update corresponding element's fields. */ - if (NULL == elem->meta_seg || NULL == elem->data_seg) { - memset(&ns_info.ns_map, 0, sizeof(ns_info.ns_map)); - strncpy(ns_info.ns_map.name, ns_map->name, sizeof(ns_info.ns_map.name)-1); - ns_info.ns_map.tbl_idx = ns_map->tbl_idx; - ns_info.num_meta_seg = 1; - ns_info.num_data_seg = 1; - rc = _update_ns_elem(elem, &ns_info); - if (PMIX_SUCCESS != rc || NULL == elem->meta_seg || NULL == elem->data_seg) { - PMIX_ERROR_LOG(rc); - goto err_exit; - } - - /* zero created shared memory segments for this namespace */ - memset(elem->meta_seg->seg_info.seg_base_addr, 0, _meta_segment_size); - memset(elem->data_seg->seg_info.seg_base_addr, 0, _data_segment_size); - - /* put ns's shared segments info to the global meta segment. */ - rc = _put_ns_info_to_initial_segment(ns_map, &elem->meta_seg->seg_info, &elem->data_seg->seg_info); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - goto err_exit; - } - } - - /* Now we know info about meta segment for this namespace. If meta segment - * is not empty, then we look for data for the target rank. If they present, replace it. */ - PMIX_CONSTRUCT(&xfer, pmix_buffer_t); - PMIX_LOAD_BUFFER(pmix_globals.mypeer, &xfer, kv->value->data.bo.bytes, kv->value->data.bo.size); - - rc = _store_data_for_rank(elem, rank, &xfer); - - PMIX_DESTRUCT(&xfer); - - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - goto err_exit; - } - - /* unset lock */ - if (PMIX_SUCCESS != (rc = _ESH_UNLOCK(_ESH_SESSION_lock(ns_map->tbl_idx)))) { - PMIX_ERROR_LOG(rc); - } - return rc; - -err_exit: - /* unset lock */ - if (PMIX_SUCCESS != (tmp_rc = _ESH_UNLOCK(_ESH_SESSION_lock(ns_map->tbl_idx)))) { - PMIX_ERROR_LOG(tmp_rc); - } - return rc; -} - -static pmix_status_t dstore_store(const pmix_proc_t *proc, - pmix_scope_t scope, - pmix_kval_t *kv) -{ - pmix_status_t rc = PMIX_SUCCESS; - - pmix_output_verbose(2, pmix_gds_base_framework.framework_output, - "[%s:%d] gds: dstore store for key '%s' scope %d", - proc->nspace, proc->rank, kv->key, scope); - - if (PMIX_PROC_IS_CLIENT(pmix_globals.mypeer)) { - rc = PMIX_ERR_NOT_SUPPORTED; - PMIX_ERROR_LOG(rc); - return rc; - } - else { - pmix_kval_t *kv2; - kv2 = PMIX_NEW(pmix_kval_t); - PMIX_VALUE_CREATE(kv2->value, 1); - kv2->value->type = PMIX_BYTE_OBJECT; - - pmix_buffer_t tmp; - PMIX_CONSTRUCT(&tmp, pmix_buffer_t); - - PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &tmp, kv, 1, PMIX_KVAL); - PMIX_UNLOAD_BUFFER(&tmp, kv2->value->data.bo.bytes, kv2->value->data.bo.size); - - rc = _dstore_store(proc->nspace, proc->rank, kv2); - PMIX_RELEASE(kv2); - PMIX_DESTRUCT(&tmp); - } - return rc; -} - -static pmix_status_t _dstore_fetch(const char *nspace, pmix_rank_t rank, - const char *key, pmix_value_t **kvs) -{ - ns_seg_info_t *ns_info = NULL; - pmix_status_t rc = PMIX_ERROR, lock_rc; - ns_track_elem_t *elem; - rank_meta_info *rinfo = NULL; - size_t kval_cnt = 0; - seg_desc_t *meta_seg, *data_seg; - uint8_t *addr; - pmix_buffer_t buffer; - pmix_value_t val, *kval = NULL; - uint32_t nprocs; - pmix_rank_t cur_rank; - ns_map_data_t *ns_map = NULL; - bool all_ranks_found = true; - bool key_found = false; - pmix_info_t *info = NULL; - size_t ninfo; - - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s: for %s:%u look for key %s", - __FILE__, __LINE__, __func__, nspace, rank, key)); - - if ((PMIX_RANK_UNDEF == rank) && (NULL == key)) { - PMIX_OUTPUT_VERBOSE((7, pmix_gds_base_framework.framework_output, - "dstore: Does not support passed parameters")); - rc = PMIX_ERR_BAD_PARAM; - PMIX_ERROR_LOG(rc); - return rc; - } - - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s: for %s:%u look for key %s", - __FILE__, __LINE__, __func__, nspace, rank, key)); - - if (NULL == (ns_map = _esh_session_map_search(nspace))) { - /* This call is issued from the the client. - * client must have the session, otherwise the error is fatal. - */ - rc = PMIX_ERR_FATAL; - PMIX_ERROR_LOG(rc); - return rc; - } - - if (NULL == kvs) { - rc = PMIX_ERR_FATAL; - PMIX_ERROR_LOG(rc); - return rc; - } - - if (PMIX_RANK_UNDEF == rank) { - ssize_t _nprocs = _get_univ_size(ns_map->name); - if( 0 > _nprocs ){ - PMIX_ERROR_LOG(rc); - return rc; - } - nprocs = (size_t) _nprocs; - cur_rank = 0; - } else { - nprocs = 1; - cur_rank = rank; - } - - /* grab shared lock */ - if (PMIX_SUCCESS != (lock_rc = _ESH_RDLOCK(_ESH_SESSION_lock(ns_map->tbl_idx)))) { - /* Something wrong with the lock. The error is fatal */ - rc = PMIX_ERR_FATAL; - PMIX_ERROR_LOG(lock_rc); - return lock_rc; - } - - /* First of all, we go through all initial segments and look at their field. - * If it's 1, then generate name of next initial segment incrementing id by one and attach to it. - * We need this step to synchronize initial shared segments with our local track list. - * Then we look for the target namespace in all initial segments. - * If it is found, we get numbers of meta & data segments and - * compare these numbers with the number of trackable meta & data - * segments for this namespace in the local track list. - * If the first number exceeds the last, or the local track list - * doesn't track current namespace yet, then we update it (attach - * to additional segments). - */ - - /* first update local information about initial segments. they can be extended, so then we need to attach to new segments. */ - _update_initial_segment_info(ns_map); - - ns_info = _get_ns_info_from_initial_segment(ns_map); - if (NULL == ns_info) { - /* no data for this namespace is found in the shared memory. */ - PMIX_OUTPUT_VERBOSE((7, pmix_gds_base_framework.framework_output, - "%s:%d:%s: no data for ns %s is found in the shared memory.", - __FILE__, __LINE__, __func__, ns_map->name)); - rc = PMIX_ERR_PROC_ENTRY_NOT_FOUND; - goto done; - } - - /* get ns_track_elem_t object for the target namespace from the local track list. */ - elem = _get_track_elem_for_namespace(ns_map); - if (NULL == elem) { - /* Shouldn't happen! */ - rc = PMIX_ERR_FATAL; - PMIX_ERROR_LOG(rc); - goto done; - } - - /* need to update tracker: - * attach to shared memory regions for this namespace and store its info locally - * to operate with address and detach/unlink afterwards. */ - rc = _update_ns_elem(elem, ns_info); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - goto done; - } - - /* Now we have the data from meta segment for this namespace. */ - meta_seg = elem->meta_seg; - data_seg = elem->data_seg; - - while (nprocs--) { - /* Get the rank meta info in the shared meta segment. */ - rinfo = _get_rank_meta_info(cur_rank, meta_seg); - if (NULL == rinfo) { - PMIX_OUTPUT_VERBOSE((7, pmix_gds_base_framework.framework_output, - "%s:%d:%s: no data for this rank is found in the shared memory. rank %u", - __FILE__, __LINE__, __func__, cur_rank)); - all_ranks_found = false; - continue; - } - addr = _get_data_region_by_offset(data_seg, rinfo->offset); - if (NULL == addr) { - /* This means that meta-info is broken - error is fatal */ - rc = PMIX_ERR_FATAL; - PMIX_ERROR_LOG(rc); - goto done; - } - kval_cnt = rinfo->count; - - /* Initialize array for all keys of rank */ - if ((NULL == key) && (kval_cnt > 0)) { - kval = (pmix_value_t*)malloc(sizeof(pmix_value_t)); - if (NULL == kval) { - rc = PMIX_ERR_NOMEM; - goto done; - } - PMIX_VALUE_CONSTRUCT(kval); - - ninfo = kval_cnt; - PMIX_INFO_CREATE(info, ninfo); - if (NULL == info) { - rc = PMIX_ERR_NOMEM; - goto done; - } - - kval->type = PMIX_DATA_ARRAY; - kval->data.darray = (pmix_data_array_t*)malloc(sizeof(pmix_data_array_t)); - if (NULL == kval->data.darray) { - rc = PMIX_ERR_NOMEM; - goto done; - } - kval->data.darray->type = PMIX_INFO; - kval->data.darray->size = ninfo; - kval->data.darray->array = info; - *kvs = kval; - } - - rc = PMIX_SUCCESS; - while (0 < kval_cnt) { - /* data is stored in the following format: - * key_val_pair { - * size_t size; - * char key[KNAME_LEN(addr)]; - * byte_t byte[size]; // should be loaded to pmix_buffer_t and unpacked. - * }; - * segment_format { - * key_val_pair kv_array[n]; - * EXTENSION slot; - * } - * EXTENSION slot which has key = EXTENSION_SLOT and a size_t value for offset - * to next data address for this process. - */ - if (0 == strncmp(ESH_KNAME_PTR(addr), ESH_REGION_INVALIDATED, ESH_KNAME_LEN(ESH_REGION_INVALIDATED))) { - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s: for rank %s:%u, skip %s region", - __FILE__, __LINE__, __func__, nspace, cur_rank, ESH_REGION_INVALIDATED)); - /* skip it - * go to next item, updating address */ - addr += ESH_KV_SIZE(addr); - } else if (0 == strncmp(ESH_KNAME_PTR(addr), ESH_REGION_EXTENSION, ESH_KNAME_LEN(ESH_REGION_EXTENSION))) { - size_t offset; - memcpy(&offset, ESH_DATA_PTR(addr), sizeof(size_t)); - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s: for rank %s:%u, reached %s with %lu value", - __FILE__, __LINE__, __func__, nspace, cur_rank, ESH_REGION_EXTENSION, offset)); - if (0 < offset) { - /* go to next item, updating address */ - addr = _get_data_region_by_offset(data_seg, offset); - if (NULL == addr) { - /* This shouldn't happen - error is fatal */ - rc = PMIX_ERR_FATAL; - PMIX_ERROR_LOG(rc); - goto done; - } - } else { - /* no more data for this rank */ - PMIX_OUTPUT_VERBOSE((7, pmix_gds_base_framework.framework_output, - "%s:%d:%s: no more data for this rank is found in the shared memory. rank %u key %s not found", - __FILE__, __LINE__, __func__, cur_rank, key)); - break; - } - } else if (NULL == key) { - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s: for rank %s:%u, found target key %s", - __FILE__, __LINE__, __func__, nspace, cur_rank, ESH_KNAME_PTR(addr))); - - uint8_t *data_ptr = ESH_DATA_PTR(addr); - size_t data_size = ESH_DATA_SIZE(addr, data_ptr); - PMIX_CONSTRUCT(&buffer, pmix_buffer_t); - PMIX_LOAD_BUFFER(_client_peer(), &buffer, data_ptr, data_size); - int cnt = 1; - /* unpack value for this key from the buffer. */ - PMIX_VALUE_CONSTRUCT(&val); - PMIX_BFROPS_UNPACK(rc, _client_peer(), &buffer, &val, &cnt, PMIX_VALUE); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - goto done; - } - strncpy(info[kval_cnt - 1].key, ESH_KNAME_PTR(addr), ESH_KNAME_LEN((char *)addr)); - pmix_value_xfer(&info[kval_cnt - 1].value, &val); - PMIX_VALUE_DESTRUCT(&val); - buffer.base_ptr = NULL; - buffer.bytes_used = 0; - PMIX_DESTRUCT(&buffer); - key_found = true; - - kval_cnt--; - addr += ESH_KV_SIZE(addr); - } else if (0 == strncmp(ESH_KNAME_PTR(addr), key, ESH_KNAME_LEN(key))) { - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s: for rank %s:%u, found target key %s", - __FILE__, __LINE__, __func__, nspace, cur_rank, key)); - /* target key is found, get value */ - uint8_t *data_ptr = ESH_DATA_PTR(addr); - size_t data_size = ESH_DATA_SIZE(addr, data_ptr); - PMIX_CONSTRUCT(&buffer, pmix_buffer_t); - PMIX_LOAD_BUFFER(_client_peer(), &buffer, data_ptr, data_size); - int cnt = 1; - /* unpack value for this key from the buffer. */ - PMIX_VALUE_CONSTRUCT(&val); - PMIX_BFROPS_UNPACK(rc, _client_peer(), &buffer, &val, &cnt, PMIX_VALUE); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - goto done; - } - PMIX_BFROPS_COPY(rc, _client_peer(), (void**)kvs, &val, PMIX_VALUE); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - goto done; - } - PMIX_VALUE_DESTRUCT(&val); - buffer.base_ptr = NULL; - buffer.bytes_used = 0; - PMIX_DESTRUCT(&buffer); - key_found = true; - goto done; - } else { - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s: for rank %s:%u, skip key %s look for key %s", - __FILE__, __LINE__, __func__, nspace, cur_rank, ESH_KNAME_PTR(addr), key)); - /* go to next item, updating address */ - addr += ESH_KV_SIZE(addr); - kval_cnt--; - } - } - - if (PMIX_RANK_UNDEF == rank) { - cur_rank++; - } - } - -done: - /* unset lock */ - if (PMIX_SUCCESS != (lock_rc = _ESH_UNLOCK(_ESH_SESSION_lock(ns_map->tbl_idx)))) { - PMIX_ERROR_LOG(lock_rc); - } - - if( rc != PMIX_SUCCESS ){ - if ((NULL == key) && (kval_cnt > 0)) { - if( NULL != info ) { - PMIX_INFO_FREE(info, ninfo); - } - if (NULL != kval) { - PMIX_VALUE_RELEASE(kval); - } - } - return rc; - } - - if( key_found ){ - /* the key is found - nothing to do */ - return PMIX_SUCCESS; - } - - if( !all_ranks_found ){ - /* Not all ranks was found - need to request - * all of them and search again - */ - rc = PMIX_ERR_PROC_ENTRY_NOT_FOUND; - return rc; - } - rc = PMIX_ERR_NOT_FOUND; - return rc; -} - -static pmix_status_t dstore_fetch(const pmix_proc_t *proc, - pmix_scope_t scope, bool copy, - const char *key, - pmix_info_t info[], size_t ninfo, - pmix_list_t *kvs) -{ - pmix_kval_t *kv; - pmix_value_t *val; - pmix_status_t rc = PMIX_SUCCESS; - - pmix_output_verbose(2, pmix_gds_base_framework.framework_output, - "gds: dstore fetch `%s`", key == NULL ? "NULL" : key); - - rc = _dstore_fetch(proc->nspace, proc->rank, key, &val); - if (PMIX_SUCCESS == rc) { - if( NULL == key ) { - pmix_info_t *info; - size_t n, ninfo; - - if (NULL == val->data.darray || - PMIX_INFO != val->data.darray->type || - 0 == val->data.darray->size) { - PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND); - return PMIX_ERR_NOT_FOUND; - } - info = (pmix_info_t*)val->data.darray->array; - ninfo = val->data.darray->size; - - for (n = 0; n < ninfo; n++){ - kv = PMIX_NEW(pmix_kval_t); - if (NULL == kv) { - rc = PMIX_ERR_NOMEM; - PMIX_VALUE_RELEASE(val); - return rc; - } - kv->key = strdup(info[n].key); - PMIX_VALUE_XFER(rc, kv->value, &info[n].value); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(kv); - PMIX_VALUE_RELEASE(val); - return rc; - } - pmix_list_append(kvs, &kv->super); - } - - return PMIX_SUCCESS; - } - /* just return the value */ - kv = PMIX_NEW(pmix_kval_t); - if (NULL == kv) { - PMIX_VALUE_RELEASE(val); - return PMIX_ERR_NOMEM; - } - kv->key = strdup(key); - kv->value = val; - pmix_list_append(kvs, &kv->super); - } - return rc; -} - -static pmix_status_t dstore_setup_fork(const pmix_proc_t *peer, char ***env) -{ - pmix_status_t rc = PMIX_SUCCESS; - ns_map_data_t *ns_map = NULL; - - pmix_output_verbose(2, pmix_gds_base_framework.framework_output, - "gds: dstore setup fork"); - - if (NULL == _esh_session_map_search) { - rc = PMIX_ERR_NOT_AVAILABLE; - PMIX_ERROR_LOG(rc); - return rc; - } - - if (NULL == (ns_map = _esh_session_map_search(peer->nspace))) { - rc = PMIX_ERR_NOT_AVAILABLE; - PMIX_ERROR_LOG(rc); - return rc; - } - - if ((NULL == _base_path) || (strlen(_base_path) == 0)){ - rc = PMIX_ERR_NOT_AVAILABLE; - PMIX_ERROR_LOG(rc); - return rc; - } - - if(PMIX_SUCCESS != (rc = pmix_setenv(PMIX_DSTORE_ESH_BASE_PATH, - _ESH_SESSION_path(ns_map->tbl_idx), true, env))){ - PMIX_ERROR_LOG(rc); - } - return rc; -} - -static pmix_status_t dstore_add_nspace(const char *nspace, - pmix_info_t info[], - size_t ninfo) -{ - pmix_status_t rc; - size_t tbl_idx=0; - uid_t jobuid = _jobuid; - char setjobuid = _setjobuid; - size_t n; - ns_map_data_t *ns_map = NULL; - - pmix_output_verbose(2, pmix_gds_base_framework.framework_output, - "gds: dstore add nspace"); - - if (NULL != info) { - for (n=0; n < ninfo; n++) { - if (0 == strcmp(PMIX_USERID, info[n].key)) { - jobuid = info[n].value.data.uint32; - setjobuid = 1; - continue; - } - } - } - - if (PMIX_SUCCESS != _esh_jobuid_tbl_search(jobuid, &tbl_idx)) { - - rc = _esh_session_tbl_add(&tbl_idx); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - return rc; - } - ns_map = _esh_session_map(nspace, tbl_idx); - if (NULL == ns_map) { - rc = PMIX_ERROR; - PMIX_ERROR_LOG(rc); - return rc; - } - - if (PMIX_SUCCESS != (rc =_esh_session_init(tbl_idx, ns_map, jobuid, setjobuid))) { - rc = PMIX_ERROR; - PMIX_ERROR_LOG(rc); - return rc; - } - } - else { - ns_map = _esh_session_map(nspace, tbl_idx); - if (NULL == ns_map) { - rc = PMIX_ERROR; - PMIX_ERROR_LOG(rc); - return rc; - } - } - - return PMIX_SUCCESS; -} - -static pmix_status_t dstore_del_nspace(const char* nspace) -{ - pmix_status_t rc = PMIX_SUCCESS; - size_t map_idx, size; - int in_use = 0; - ns_map_data_t *ns_map_data = NULL; - ns_map_t *ns_map; - session_t *session_tbl = NULL; - ns_track_elem_t *trk = NULL; - - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s delete nspace `%s`", __FILE__, __LINE__, __func__, nspace)); - - if (NULL == (ns_map_data = _esh_session_map_search(nspace))) { - rc = PMIX_ERR_NOT_AVAILABLE; - return rc; - } - - size = pmix_value_array_get_size(_ns_map_array); - ns_map = PMIX_VALUE_ARRAY_GET_BASE(_ns_map_array, ns_map_t); - - for (map_idx = 0; map_idx < size; map_idx++){ - if (ns_map[map_idx].in_use && - (ns_map[map_idx].data.tbl_idx == ns_map_data->tbl_idx)) { - if (0 == strcmp(ns_map[map_idx].data.name, nspace)) { - _esh_session_map_clean(&ns_map[map_idx]); - continue; - } - in_use++; - break; - } - } - - if(ns_map_data->track_idx >= 0) { - trk = pmix_value_array_get_item(_ns_track_array, ns_map_data->track_idx); - if((ns_map_data->track_idx + 1) > (int)pmix_value_array_get_size(_ns_track_array)) { - rc = PMIX_ERR_VALUE_OUT_OF_BOUNDS; - PMIX_ERROR_LOG(rc); - goto exit; - } - if (true == trk->in_use) { - PMIX_DESTRUCT(trk); - } - } - - /* A lot of nspaces may be using same session info - * session record can only be deleted once all references are gone */ - if (!in_use) { - session_tbl = PMIX_VALUE_ARRAY_GET_BASE(_session_array, session_t); - - PMIX_OUTPUT_VERBOSE((10, pmix_gds_base_framework.framework_output, - "%s:%d:%s delete session for jobuid: %d", __FILE__, __LINE__, __func__, session_tbl[ns_map_data->tbl_idx].jobuid)); - _esh_session_release(&session_tbl[ns_map_data->tbl_idx]); - } -exit: - return rc; -} - -static pmix_status_t dstore_assign_module(pmix_info_t *info, size_t ninfo, - int *priority) -{ - size_t n, m; - char **options; - - *priority = 20; - if (NULL != info) { - for (n=0; n < ninfo; n++) { - if (0 == strncmp(info[n].key, PMIX_GDS_MODULE, PMIX_MAX_KEYLEN)) { - options = pmix_argv_split(info[n].value.data.string, ','); - for (m=0; NULL != options[m]; m++) { - if (0 == strcmp(options[m], "ds12")) { - /* they specifically asked for us */ - *priority = 100; - break; - } - if (0 == strcmp(options[m], "dstore")) { - /* they are asking for any dstore module - we - * take an intermediate priority in case another - * dstore is more modern than us */ - *priority = 50; - break; - } - } - pmix_argv_free(options); - break; - } - } - } - -#if 0 - if PMIX_GDS_MODULE != "ds12" - *proirity = 0; - else PMIX_GDS_MODULE == "ds12" || !PMIX_GDS_MODULE - *priority = -1; -#endif - return PMIX_SUCCESS; -} - -static inline int _my_client(const char *nspace, pmix_rank_t rank) -{ - pmix_peer_t *peer; - int i; - int local = 0; - - for (i = 0; i < pmix_server_globals.clients.size; i++) { - if (NULL != (peer = (pmix_peer_t *)pmix_pointer_array_get_item(&pmix_server_globals.clients, i))) { - if (0 == strcmp(peer->info->pname.nspace, nspace) && peer->info->pname.rank == rank) { - local = 1; - break; - } - } - } - - return local; -} - -/* this function is only called by the PMIx server when its - * host has received data from some other peer. It therefore - * always contains data solely from remote procs, and we - * shall store it accordingly */ -static pmix_status_t dstore_store_modex(struct pmix_nspace_t *nspace, - pmix_list_t *cbs, - pmix_byte_object_t *bo) -{ - pmix_nspace_t *ns = (pmix_nspace_t*)nspace; - pmix_server_caddy_t *scd; - pmix_status_t rc = PMIX_SUCCESS; - int32_t cnt; - pmix_buffer_t pbkt; - pmix_proc_t proc; - pmix_kval_t *kv; - pmix_peer_t *peer; - - pmix_output_verbose(2, pmix_gds_base_framework.framework_output, - "[%s:%d] gds:dstore:store_modex for nspace %s", - pmix_globals.myid.nspace, pmix_globals.myid.rank, - ns->nspace); - - /* this is data returned via the PMIx_Fence call when - * data collection was requested, so it only contains - * REMOTE/GLOBAL data. The byte object contains - * the rank followed by pmix_kval_t's. The list of callbacks - * contains all local participants. */ - peer = NULL; - PMIX_LIST_FOREACH(scd, cbs, pmix_server_caddy_t) { - if (scd->peer->nptr == ns) { - peer = scd->peer; - break; - } - } - if (NULL == peer) { - /* we can ignore this one */ - return PMIX_SUCCESS; - } - - /* setup the byte object for unpacking */ - PMIX_CONSTRUCT(&pbkt, pmix_buffer_t); - /* the next step unfortunately NULLs the byte object's - * entries, so we need to ensure we restore them! */ - PMIX_LOAD_BUFFER(peer, &pbkt, bo->bytes, bo->size); - /* unload the proc that provided this data */ - cnt = 1; - PMIX_BFROPS_UNPACK(rc, peer, &pbkt, &proc, &cnt, PMIX_PROC); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - bo->bytes = pbkt.base_ptr; - bo->size = pbkt.bytes_used; // restore the incoming data - pbkt.base_ptr = NULL; - PMIX_DESTRUCT(&pbkt); - return rc; - } - /* don't store blobs to the sm dstore from local clients */ - if (_my_client(proc.nspace, proc.rank)) { - bo->bytes = pbkt.base_ptr; - bo->size = pbkt.bytes_used; // restore the incoming data - pbkt.base_ptr = NULL; - PMIX_DESTRUCT(&pbkt); - return PMIX_SUCCESS; - } - /* unpack the remaining values until we hit the end of the buffer */ - cnt = 1; - kv = PMIX_NEW(pmix_kval_t); - PMIX_BFROPS_UNPACK(rc, peer, &pbkt, kv, &cnt, PMIX_KVAL); - while (PMIX_SUCCESS == rc) { - /* store this in the hash table */ - PMIX_GDS_STORE_KV(rc, pmix_globals.mypeer, &proc, PMIX_REMOTE, kv); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - bo->bytes = pbkt.base_ptr; - bo->size = pbkt.bytes_used; // restore the incoming data - pbkt.base_ptr = NULL; - PMIX_DESTRUCT(&pbkt); - return rc; - } - if (PMIX_SUCCESS != (rc = dstore_store(&proc, PMIX_REMOTE, kv))) { - PMIX_ERROR_LOG(rc); - } - PMIX_RELEASE(kv); // maintain accounting as the hash increments the ref count - /* continue along */ - kv = PMIX_NEW(pmix_kval_t); - cnt = 1; - PMIX_BFROPS_UNPACK(rc, peer, &pbkt, kv, &cnt, PMIX_KVAL); - } - PMIX_RELEASE(kv); // maintain accounting - if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) { - PMIX_ERROR_LOG(rc); - } else { - rc = PMIX_SUCCESS; - } - bo->bytes = pbkt.base_ptr; - bo->size = pbkt.bytes_used; // restore the incoming data - pbkt.base_ptr = NULL; - PMIX_DESTRUCT(&pbkt); - return rc; -} - -static pmix_status_t _store_job_info(pmix_proc_t *proc) -{ - pmix_cb_t cb; - pmix_kval_t *kv; - pmix_buffer_t buf; - pmix_kval_t *kv2 = NULL, *kvp; - pmix_status_t rc = PMIX_SUCCESS; - - PMIX_CONSTRUCT(&cb, pmix_cb_t); - PMIX_CONSTRUCT(&buf, pmix_buffer_t); - kvp = PMIX_NEW(pmix_kval_t); - PMIX_VALUE_CREATE(kvp->value, 1); - kvp->value->type = PMIX_BYTE_OBJECT; - - cb.proc = proc; - cb.scope = PMIX_INTERNAL; - cb.copy = false; - - PMIX_GDS_FETCH_KV(rc, pmix_globals.mypeer, &cb); - if (PMIX_SUCCESS != rc) { - if (rc == PMIX_ERR_PROC_ENTRY_NOT_FOUND) { - /* there is no error if no data for job info */ - rc = PMIX_SUCCESS; - } - goto exit; - } - - PMIX_LIST_FOREACH(kv, &cb.kvs, pmix_kval_t) { - if ((PMIX_PROC_IS_V1(_client_peer()) || PMIX_PROC_IS_V20(_client_peer())) && - 0 != strncmp("pmix.", kv->key, 4) && - kv->value->type == PMIX_DATA_ARRAY) { - pmix_info_t *info; - size_t size, i; - info = kv->value->data.darray->array; - size = kv->value->data.darray->size; - - for (i = 0; i < size; i++) { - if (0 == strcmp(PMIX_LOCAL_PEERS, info[i].key)) { - kv2 = PMIX_NEW(pmix_kval_t); - kv2->key = strdup(kv->key); - PMIX_VALUE_XFER(rc, kv2->value, &info[i].value); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(kv2); - goto exit; - } - PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &buf, kv2, 1, PMIX_KVAL); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - PMIX_RELEASE(kv2); - goto exit; - } - PMIX_RELEASE(kv2); - } - } - } else { - PMIX_BFROPS_PACK(rc, pmix_globals.mypeer, &buf, kv, 1, PMIX_KVAL); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - goto exit; - } - } - } - - PMIX_UNLOAD_BUFFER(&buf, kvp->value->data.bo.bytes, kvp->value->data.bo.size); - if (PMIX_SUCCESS != (rc = _dstore_store(proc->nspace, proc->rank, kvp))) { - PMIX_ERROR_LOG(rc); - goto exit; - } - -exit: - PMIX_RELEASE(kvp); - PMIX_DESTRUCT(&cb); - PMIX_DESTRUCT(&buf); - return rc; -} - -static pmix_status_t dstore_register_job_info(struct pmix_peer_t *pr, - pmix_buffer_t *reply) -{ - pmix_peer_t *peer = (pmix_peer_t*)pr; - pmix_nspace_t *ns = peer->nptr; - char *msg; - pmix_status_t rc; - pmix_proc_t proc; - pmix_rank_info_t *rinfo; - - pmix_output_verbose(2, pmix_gds_base_framework.framework_output, - "[%s:%d] gds:dstore:register_job_info for peer [%s:%d]", - pmix_globals.myid.nspace, pmix_globals.myid.rank, - peer->info->pname.nspace, peer->info->pname.rank); - - if (0 == ns->ndelivered) { // don't store twice - _client_compat_save(peer); - (void)strncpy(proc.nspace, ns->nspace, PMIX_MAX_NSLEN); - proc.rank = PMIX_RANK_WILDCARD; - rc = _store_job_info(&proc); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - return rc; - } - - PMIX_LIST_FOREACH(rinfo, &ns->ranks, pmix_rank_info_t) { - proc.rank = rinfo->pname.rank; - rc = _store_job_info(&proc); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - return rc; - } - } - } - - /* answer to client */ - msg = ns->nspace; - PMIX_BFROPS_PACK(rc, peer, reply, &msg, 1, PMIX_STRING); - if (PMIX_SUCCESS != rc) { - PMIX_ERROR_LOG(rc); - return rc; - } - - return rc; -} - -static pmix_status_t dstore_store_job_info(const char *nspace, pmix_buffer_t *buf) -{ - pmix_status_t rc = PMIX_SUCCESS; - - pmix_output_verbose(2, pmix_gds_base_framework.framework_output, - "[%s:%u] pmix:gds:dstore store job info for nspace %s", - pmix_globals.myid.nspace, pmix_globals.myid.rank, nspace); - - /* check buf data */ - if ((NULL == buf) || (0 == buf->bytes_used)) { - rc = PMIX_ERR_BAD_PARAM; - PMIX_ERROR_LOG(rc); - return rc; - } - return rc; -} - -static void _client_compat_save(pmix_peer_t *peer) -{ - pmix_nspace_t *nptr = NULL; - - if (NULL == _clients_peer) { - _clients_peer = PMIX_NEW(pmix_peer_t); - nptr = PMIX_NEW(pmix_nspace_t); - _clients_peer->nptr = nptr; - } - _clients_peer->nptr->compat = peer->nptr->compat; - _clients_peer->proc_type = peer->proc_type; -} - -static inline pmix_peer_t * _client_peer(void) -{ - if (NULL == _clients_peer) { - return pmix_client_globals.myserver; - } - return _clients_peer; -} diff --git a/opal/mca/pmix/pmix3x/pmix/src/mca/gds/ds12/gds_dstore.h b/opal/mca/pmix/pmix3x/pmix/src/mca/gds/ds12/gds_dstore.h deleted file mode 100644 index abd4723ad25..00000000000 --- a/opal/mca/pmix/pmix3x/pmix/src/mca/gds/ds12/gds_dstore.h +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Copyright (c) 2015-2017 Intel, Inc. All rights reserved. - * Copyright (c) 2017 Mellanox Technologies, Inc. - * All rights reserved. - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - */ - -#ifndef PMIX_DS12_H -#define PMIX_DS12_H - -#include - - -#include "src/mca/gds/gds.h" -#include "src/mca/pshmem/pshmem.h" - -BEGIN_C_DECLS - -#include -#include "src/class/pmix_value_array.h" - -#define INITIAL_SEG_SIZE 4096 -#define NS_META_SEG_SIZE (1<<22) -#define NS_DATA_SEG_SIZE (1<<22) - -#define PMIX_DSTORE_ESH_BASE_PATH "PMIX_DSTORE_ESH_BASE_PATH" - -#ifdef HAVE_PTHREAD_SHARED -#define ESH_PTHREAD_LOCK -#elif defined HAVE_FCNTL_FLOCK -#define ESH_FCNTL_LOCK -#else -#error No locking mechanism was found -#endif - -/* this structs are used to store information about - * shared segments addresses locally at each process, - * so they are common for different types of segments - * and don't have a specific content (namespace's info, - * rank's meta info, ranks's data). */ - -typedef enum { - INITIAL_SEGMENT, - NS_META_SEGMENT, - NS_DATA_SEGMENT -} segment_type; - -typedef struct seg_desc_t seg_desc_t; -struct seg_desc_t { - segment_type type; - pmix_pshmem_seg_t seg_info; - uint32_t id; - seg_desc_t *next; -}; - -typedef struct ns_map_data_s ns_map_data_t; -typedef struct session_s session_t; -typedef struct ns_map_s ns_map_t; - -struct session_s { - int in_use; - uid_t jobuid; - char setjobuid; - char *nspace_path; - char *lockfile; -#ifdef ESH_PTHREAD_LOCK - pmix_pshmem_seg_t *rwlock_seg; - pthread_rwlock_t *rwlock; -#endif - int lockfd; - seg_desc_t *sm_seg_first; - seg_desc_t *sm_seg_last; -}; - -struct ns_map_data_s { - char name[PMIX_MAX_NSLEN+1]; - size_t tbl_idx; - int track_idx; -}; - -struct ns_map_s { - int in_use; - ns_map_data_t data; -}; - -/* initial segment format: - * size_t num_elems; - * size_t full; //indicate to client that it needs to attach to the next segment - * ns_seg_info_t ns_seg_info[max_ns_num]; - */ - -typedef struct { - ns_map_data_t ns_map; - size_t num_meta_seg;/* read by clients to attach to this number of segments. */ - size_t num_data_seg; -} ns_seg_info_t; - -/* meta segment format: - * size_t num_elems; - * rank_meta_info meta_info[max_meta_elems]; - */ - -typedef struct { - size_t rank; - size_t offset; - size_t count; -} rank_meta_info; - -typedef struct { - pmix_value_array_t super; - ns_map_data_t ns_map; - size_t num_meta_seg; - size_t num_data_seg; - seg_desc_t *meta_seg; - seg_desc_t *data_seg; - bool in_use; -} ns_track_elem_t; - -/* the component must be visible data for the linker to find it */ -PMIX_EXPORT extern pmix_gds_base_component_t mca_gds_ds12_component; -extern pmix_gds_base_module_t pmix_ds12_module; - -END_C_DECLS - -#endif diff --git a/opal/mca/pmix/pmix3x/pmix/src/mca/gds/ds12/gds_dstore_component.c b/opal/mca/pmix/pmix3x/pmix/src/mca/gds/ds12/gds_dstore_component.c deleted file mode 100644 index da955113b50..00000000000 --- a/opal/mca/pmix/pmix3x/pmix/src/mca/gds/ds12/gds_dstore_component.c +++ /dev/null @@ -1,86 +0,0 @@ -/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ -/* - * Copyright (c) 2004-2008 The Trustees of Indiana University and Indiana - * University Research and Technology - * Corporation. All rights reserved. - * Copyright (c) 2004-2005 The University of Tennessee and The University - * of Tennessee Research Foundation. All rights - * reserved. - * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, - * University of Stuttgart. All rights reserved. - * Copyright (c) 2004-2005 The Regents of the University of California. - * All rights reserved. - * Copyright (c) 2015 Los Alamos National Security, LLC. All rights - * reserved. - * Copyright (c) 2016-2017 Intel, Inc. All rights reserved. - * Copyright (c) 2017 Mellanox Technologies, Inc. - * All rights reserved. - * $COPYRIGHT$ - * - * Additional copyrights may follow - * - * $HEADER$ - * - * These symbols are in a file by themselves to provide nice linker - * semantics. Since linkers generally pull in symbols by object - * files, keeping these symbols as the only symbols in this file - * prevents utility programs such as "ompi_info" from having to import - * entire components just to query their version and parameters. - */ - -#include -#include "pmix_common.h" - - -#include "src/mca/gds/gds.h" -#include "gds_dstore.h" - -static pmix_status_t component_open(void); -static pmix_status_t component_close(void); -static pmix_status_t component_query(pmix_mca_base_module_t **module, int *priority); - -/* - * Instantiate the public struct with all of our public information - * and pointers to our public functions in it - */ -pmix_gds_base_component_t mca_gds_ds12_component = { - .base = { - PMIX_GDS_BASE_VERSION_1_0_0, - - /* Component name and version */ - .pmix_mca_component_name = "ds12", - PMIX_MCA_BASE_MAKE_VERSION(component, - PMIX_MAJOR_VERSION, - PMIX_MINOR_VERSION, - PMIX_RELEASE_VERSION), - - /* Component open and close functions */ - .pmix_mca_open_component = component_open, - .pmix_mca_close_component = component_close, - .pmix_mca_query_component = component_query, - }, - .data = { - /* The component is checkpoint ready */ - PMIX_MCA_BASE_METADATA_PARAM_CHECKPOINT - } -}; - - -static int component_open(void) -{ - return PMIX_SUCCESS; -} - - -static int component_query(pmix_mca_base_module_t **module, int *priority) -{ - *priority = 20; - *module = (pmix_mca_base_module_t *)&pmix_ds12_module; - return PMIX_SUCCESS; -} - - -static int component_close(void) -{ - return PMIX_SUCCESS; -} diff --git a/opal/mca/pmix/pmix3x/pmix/src/mca/gds/hash/gds_hash.c b/opal/mca/pmix/pmix3x/pmix/src/mca/gds/hash/gds_hash.c index 1f60b49dbcf..4d7a2b8549e 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/mca/gds/hash/gds_hash.c +++ b/opal/mca/pmix/pmix3x/pmix/src/mca/gds/hash/gds_hash.c @@ -426,6 +426,7 @@ pmix_status_t hash_cache_job_info(struct pmix_nspace_t *ns, /* an array of data pertaining to a specific proc */ if (PMIX_DATA_ARRAY != info[n].value.type) { PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); + rc = PMIX_ERR_TYPE_MISMATCH; goto release; } size = info[n].value.data.darray->size; @@ -433,6 +434,7 @@ pmix_status_t hash_cache_job_info(struct pmix_nspace_t *ns, /* first element of the array must be the rank */ if (0 != strcmp(iptr[0].key, PMIX_RANK) || PMIX_PROC_RANK != iptr[0].value.type) { + rc = PMIX_ERR_TYPE_MISMATCH; PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM); goto release; } @@ -458,7 +460,7 @@ pmix_status_t hash_cache_job_info(struct pmix_nspace_t *ns, if (NULL == tmp) { PMIX_ERROR_LOG(PMIX_ERR_NOMEM); rc = PMIX_ERR_NOMEM; - return rc; + goto release; } kp2->value->type = PMIX_COMPRESSED_STRING; free(kp2->value->data.string); @@ -493,10 +495,10 @@ pmix_status_t hash_cache_job_info(struct pmix_nspace_t *ns, if (PMIX_STRING_SIZE_CHECK(kp2->value)) { if (pmix_util_compress_string(kp2->value->data.string, &tmp, &len)) { if (NULL == tmp) { - PMIX_ERROR_LOG(PMIX_ERR_NOMEM); - PMIX_RELEASE(kp2); rc = PMIX_ERR_NOMEM; - return rc; + PMIX_ERROR_LOG(rc); + PMIX_RELEASE(kp2); + goto release; } kp2->value->type = PMIX_COMPRESSED_STRING; free(kp2->value->data.string); diff --git a/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server_get.c b/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server_get.c index b50c7ae743f..73c993c8df0 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server_get.c +++ b/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server_get.c @@ -382,6 +382,7 @@ pmix_status_t pmix_server_get(pmix_buffer_t *buf, } if (PMIX_ERR_NOT_FOUND != rc || NULL == lcd) { /* we have a problem - e.g., out of memory */ + cbfunc(PMIX_ERR_NOT_FOUND, NULL, 0, cbdata, NULL, NULL); PMIX_INFO_FREE(info, ninfo); return rc; } diff --git a/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server_ops.c b/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server_ops.c index 0f3d8f2f41b..829b9d9e461 100644 --- a/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server_ops.c +++ b/opal/mca/pmix/pmix3x/pmix/src/server/pmix_server_ops.c @@ -2015,6 +2015,10 @@ pmix_status_t pmix_server_job_ctrl(pmix_peer_t *peer, pmix_status_t rc; pmix_query_caddy_t *cd; pmix_proc_t proc; + size_t n; + bool epilog; + pmix_info_caddy_t *epi; + pmix_info_t *cache, *mods[4]; pmix_output_verbose(2, pmix_server_globals.base_output, "recvd job control request from client"); @@ -2063,6 +2067,56 @@ pmix_status_t pmix_server_job_ctrl(pmix_peer_t *peer, } } + /* if this includes a request for post-termination cleanup, we handle + * that request ourselves */ + cnt = 0; + epilog = false; + for (n=0; n < cd->ninfo; n++) { + if (0 == strncmp(cd->info[n].key, PMIX_REGISTER_CLEANUP, PMIX_MAX_KEYLEN)) { + cache = &cd->info[n]; + epilog = true; + } else if (0 == strncmp(cd->info[n].key, PMIX_CLEANUP_RECURSIVE, PMIX_MAX_KEYLEN)) { + mods[cnt] = &cd->info[n]; + ++cnt; + } else if (0 == strncmp(cd->info[n].key, PMIX_CLEANUP_EMPTY, PMIX_MAX_KEYLEN)) { + mods[cnt] = &cd->info[n]; + ++cnt; + } else if (0 == strncmp(cd->info[n].key, PMIX_CLEANUP_IGNORE, PMIX_MAX_KEYLEN)) { + mods[cnt] = &cd->info[n]; + ++cnt; + } else if (0 == strncmp(cd->info[n].key, PMIX_CLEANUP_LEAVE_TOPDIR, PMIX_MAX_KEYLEN)) { + mods[cnt] = &cd->info[n]; + ++cnt; + } + } + if (epilog) { + epi = PMIX_NEW(pmix_info_caddy_t); + if (NULL == epi) { + rc = PMIX_ERR_NOMEM; + goto exit; + } + PMIX_INFO_CREATE(epi->info, cnt+1); + if (NULL == epi->info) { + PMIX_RELEASE(epi); + rc = PMIX_ERR_NOMEM; + goto exit; + } + epi->ninfo = cnt+1; + PMIX_INFO_XFER(&epi->info[0], cache); + for (n=0; n < (size_t)cnt; n++) { + PMIX_INFO_XFER(&epi->info[n+1], mods[n]); + } + pmix_list_append(&peer->epilogs, &epi->super); + /* see if this is all there was */ + if (epi->ninfo == cd->ninfo) { + /* yes, so there is nothing that the host RM need do */ + if (NULL != cbfunc) { + cbfunc(PMIX_SUCCESS, NULL, 0, cd, NULL, NULL); + } + return PMIX_SUCCESS; + } + } + /* setup the requesting peer name */ (void)strncpy(proc.nspace, peer->info->pname.nspace, PMIX_MAX_NSLEN); proc.rank = peer->info->pname.rank; diff --git a/opal/mca/pmix/pmix3x/pmix3x.c b/opal/mca/pmix/pmix3x/pmix3x.c index 5499d18d0ab..1fb57e721b0 100644 --- a/opal/mca/pmix/pmix3x/pmix3x.c +++ b/opal/mca/pmix/pmix3x/pmix3x.c @@ -71,6 +71,8 @@ static void pmix3x_query(opal_list_t *queries, static void pmix3x_log(opal_list_t *info, opal_pmix_op_cbfunc_t cbfunc, void *cbdata); +static int pmix3x_register_cleanup(char *path); + const opal_pmix_base_module_t opal_pmix_pmix3x_module = { /* client APIs */ .init = pmix3x_client_init, @@ -101,6 +103,7 @@ const opal_pmix_base_module_t opal_pmix_pmix3x_module = { .log = pmix3x_log, .allocate = pmix3x_allocate, .job_control = pmix3x_job_control, + .register_cleanup = pmix3x_register_cleanup, /* server APIs */ .server_init = pmix3x_server_init, .server_finalize = pmix3x_server_finalize, @@ -333,6 +336,57 @@ void pmix3x_event_hdlr(size_t evhdlr_registration_id, return; } +static void cleanup_cbfunc(pmix_status_t status, + pmix_info_t *info, size_t ninfo, + void *cbdata, + pmix_release_cbfunc_t release_fn, + void *release_cbdata) +{ + opal_pmix_lock_t *lk = (opal_pmix_lock_t*)cbdata; + + OPAL_POST_OBJECT(lk); + + /* let the library release the data and cleanup from + * the operation */ + if (NULL != release_fn) { + release_fn(release_cbdata); + } + + /* release the block */ + lk->status = pmix3x_convert_rc(status); + OPAL_PMIX_WAKEUP_THREAD(lk); +} + +static int pmix3x_register_cleanup(char *path) +{ + opal_pmix_lock_t lk; + pmix_info_t pinfo[4]; + pmix_status_t rc; + int ret, n; + + OPAL_PMIX_CONSTRUCT_LOCK(&lk); + PMIX_INFO_LOAD(&pinfo[0], PMIX_REGISTER_CLEANUP, path, PMIX_STRING); + /* recursively cleanup directories */ + PMIX_INFO_LOAD(&pinfo[1], PMIX_CLEANUP_RECURSIVE, NULL, PMIX_BOOL); + /* ignore any output.txt files in the tree */ + PMIX_INFO_LOAD(&pinfo[2], PMIX_CLEANUP_IGNORE, "output-*", PMIX_STRING); + /* only remove empty subdirectories */ + PMIX_INFO_LOAD(&pinfo[3], PMIX_CLEANUP_EMPTY, NULL, PMIX_BOOL); + + rc = PMIx_Job_control_nb(NULL, 0, pinfo, 4, cleanup_cbfunc, (void*)&lk); + if (PMIX_SUCCESS != rc) { + ret = pmix3x_convert_rc(rc); + } else { + OPAL_PMIX_WAIT_THREAD(&lk); + ret = lk.status; + } + OPAL_PMIX_DESTRUCT_LOCK(&lk); + for (n=0; n < 4; n++) { + PMIX_INFO_DESTRUCT(&pinfo[n]); + } + return ret; +} + opal_vpid_t pmix3x_convert_rank(pmix_rank_t rank) { switch(rank) { diff --git a/opal/mca/pmix/pmix_types.h b/opal/mca/pmix/pmix_types.h index f9c58e7d735..4c18ba7eb48 100644 --- a/opal/mca/pmix/pmix_types.h +++ b/opal/mca/pmix/pmix_types.h @@ -118,6 +118,7 @@ BEGIN_C_DECLS /* information about relative ranks as assigned by the RM */ +#define OPAL_PMIX_CLUSTER_ID "pmix.clid" // (char*) a string name for the cluster this proc is executing on #define OPAL_PMIX_PROCID "pmix.procid" // (opal_process_name_t) process identifier #define OPAL_PMIX_NSPACE "pmix.nspace" // (char*) nspace of a job #define OPAL_PMIX_JOBID "pmix.jobid" // (uint32_t) jobid assigned by scheduler @@ -189,6 +190,7 @@ BEGIN_C_DECLS #define OPAL_PMIX_NOTIFY_COMPLETION "pmix.notecomp" // (bool) notify parent process upon termination of child job #define OPAL_PMIX_RANGE "pmix.range" // (int) opal_pmix_data_range_t value for calls to publish/lookup/unpublish #define OPAL_PMIX_PERSISTENCE "pmix.persist" // (int) opal_pmix_persistence_t value for calls to publish +#define OPAL_PMIX_DATA_SCOPE "pmix.scope" // (pmix_scope_t) scope of the data to be found in a PMIx_Get call #define OPAL_PMIX_OPTIONAL "pmix.optional" // (bool) look only in the immediate data store for the requested value - do // not request data from the server if not found #define OPAL_PMIX_EMBED_BARRIER "pmix.embed.barrier" // (bool) execute a blocking fence operation before executing the @@ -364,6 +366,16 @@ BEGIN_C_DECLS #define OPAL_PMIX_JOB_CTRL_PROVISION_IMAGE "pmix.jctrl.pvnimg" // (char*) name of the image that is to be provisioned #define OPAL_PMIX_JOB_CTRL_PREEMPTIBLE "pmix.jctrl.preempt" // (bool) job can be pre-empted #define OPAL_PMIX_JOB_CTRL_TERMINATE "pmix.jctrl.term" // (bool) politely terminate the specified procs +#define OPAL_PMIX_REGISTER_CLEANUP "pmix.reg.cleanup" // (char*) comma-delimited list of files/directories to + // be removed upon process termination +#define OPAL_PMIX_CLEANUP_RECURSIVE "pmix.clnup.recurse" // (bool) recursively cleanup all subdirectories under the + // specified one(s) +#define OPAL_PMIX_CLEANUP_EMPTY "pmix.clnup.empty" // (bool) only remove empty subdirectories +#define OPAL_PMIX_CLEANUP_IGNORE "pmix.clnup.ignore" // (char*) comma-delimited list of filenames that are not + // to be removed +#define OPAL_PMIX_CLEANUP_LEAVE_TOPDIR "pmix.clnup.lvtop" // (bool) when recursively cleaning subdirs, do not remove + // the top-level directory (the one given in the + // cleanup request) /* monitoring attributes */ diff --git a/orte/mca/ess/base/ess_base_std_app.c b/orte/mca/ess/base/ess_base_std_app.c index 475304a8e23..b83819ce27f 100644 --- a/orte/mca/ess/base/ess_base_std_app.c +++ b/orte/mca/ess/base/ess_base_std_app.c @@ -147,6 +147,19 @@ int orte_ess_base_app_setup(bool db_restrict_local) proc-specific session directory. */ opal_output_set_output_file_info(orte_process_info.proc_session_dir, "output-", NULL, NULL); + /* register the directory for cleanup */ + if (NULL != opal_pmix.register_cleanup) { + if (orte_standalone_operation) { + error = orte_process_info.top_session_dir; + } else { + error = orte_process_info.jobfam_session_dir; + } + if (OPAL_SUCCESS != (ret = opal_pmix.register_cleanup(error))) { + ORTE_ERROR_LOG(ret); + error = "register cleanup"; + goto error; + } + } } /* Setup the communication infrastructure */ /* Routed system */ @@ -357,7 +370,9 @@ int orte_ess_base_app_finalize(void) (void) mca_base_framework_close(&orte_oob_base_framework); (void) mca_base_framework_close(&orte_state_base_framework); - orte_session_dir_finalize(ORTE_PROC_MY_NAME); + if (NULL == opal_pmix.register_cleanup) { + orte_session_dir_finalize(ORTE_PROC_MY_NAME); + } /* cleanup the process info */ orte_proc_info_finalize();