Skip to content

Try adding local spawn threads by default to parallelize the fork/exec process. #4532

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion orte/mca/errmgr/default_orted/errmgr_default_orted.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "orte/mca/ess/ess.h"
#include "orte/mca/state/state.h"

#include "orte/runtime/orte_wait.h"
#include "orte/runtime/orte_quit.h"
#include "orte/runtime/orte_globals.h"
#include "orte/runtime/data_type_support/orte_dt_support.h"
Expand Down Expand Up @@ -327,6 +328,7 @@ static void proc_errors(int fd, short args, void *cbdata)
orte_plm_cmd_flag_t cmd;
int rc=ORTE_SUCCESS;
int i;
orte_wait_tracker_t *t2;

ORTE_ACQUIRE_OBJECT(caddy);

Expand Down Expand Up @@ -412,7 +414,14 @@ static void proc_errors(int fd, short args, void *cbdata)
goto cleanup;
}
/* leave the exit code alone - process this as a waitpid */
ompi_odls_base_default_wait_local_proc(child, NULL);
t2 = OBJ_NEW(orte_wait_tracker_t);
OBJ_RETAIN(child); // protect against race conditions
t2->child = child;
t2->evb = orte_event_base;
opal_event_set(t2->evb, &t2->ev, -1,
OPAL_EV_WRITE, orte_odls_base_default_wait_local_proc, t2);
opal_event_set_priority(&t2->ev, ORTE_MSG_PRI);
opal_event_active(&t2->ev, OPAL_EV_WRITE, 1);
goto cleanup;
}
OPAL_OUTPUT_VERBOSE((2, orte_errmgr_base_framework.framework_output,
Expand Down
5 changes: 5 additions & 0 deletions orte/mca/odls/base/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* All rights reserved.
* Copyright (c) 2011 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013 Los Alamos National Security, LLC. All rights reserved.
* Copyright (c) 2017 Intel, Inc. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -44,5 +45,9 @@ ORTE_DECLSPEC extern mca_base_framework_t orte_odls_base_framework;
*/
ORTE_DECLSPEC int orte_odls_base_select(void);

ORTE_DECLSPEC void orte_odls_base_start_threads(orte_job_t *jdata);

ORTE_DECLSPEC void orte_odls_base_harvest_threads(void);

END_C_DECLS
#endif
131 changes: 67 additions & 64 deletions orte/mca/odls/base/odls_base_default_fns.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* All rights reserved.
* Copyright (c) 2011-2017 Cisco Systems, Inc. All rights reserved
* Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
* Copyright (c) 2014 Research Organization for Information Science
* Copyright (c) 2014-2017 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2017 Mellanox Technologies Ltd. All rights reserved.
* Copyright (c) 2017 IBM Corporation. All rights reserved.
Expand Down Expand Up @@ -614,6 +614,9 @@ int orte_odls_base_default_construct_child_list(opal_buffer_t *buffer,
goto REPORT_ERROR;
}

/* spin up the spawn threads */
orte_odls_base_start_threads(jdata);

/* to save memory, purge the job map of all procs other than
* our own - for daemons, this will completely release the
* proc structures. For the HNP, the proc structs will
Expand Down Expand Up @@ -727,9 +730,6 @@ void orte_odls_base_spawn_proc(int fd, short sd, void *cbdata)
int rc, i;
bool found;
orte_proc_state_t state;
char **argvptr;
char *pathenv = NULL, *mpiexec_pathenv = NULL;
char *full_search;

ORTE_ACQUIRE_OBJECT(cd);

Expand Down Expand Up @@ -772,44 +772,6 @@ void orte_odls_base_spawn_proc(int fd, short sd, void *cbdata)
goto errorout;
}

/* Search for the OMPI_exec_path and PATH settings in the environment. */
for (argvptr = app->env; *argvptr != NULL; argvptr++) {
if (0 == strncmp("OMPI_exec_path=", *argvptr, 15)) {
mpiexec_pathenv = *argvptr + 15;
}
if (0 == strncmp("PATH=", *argvptr, 5)) {
pathenv = *argvptr + 5;
}
}

/* If OMPI_exec_path is set (meaning --path was used), then create a
temporary environment to be used in the search for the executable.
The PATH setting in this temporary environment is a combination of
the OMPI_exec_path and PATH values. If OMPI_exec_path is not set,
then just use existing environment with PATH in it. */
if (NULL != mpiexec_pathenv) {
argvptr = NULL;
if (pathenv != NULL) {
asprintf(&full_search, "%s:%s", mpiexec_pathenv, pathenv);
} else {
asprintf(&full_search, "%s", mpiexec_pathenv);
}
opal_setenv("PATH", full_search, true, &argvptr);
free(full_search);
} else {
argvptr = app->env;
}

rc = orte_util_check_context_app(app, argvptr);
/* do not ERROR_LOG - it will be reported elsewhere */
if (NULL != mpiexec_pathenv) {
opal_argv_free(argvptr);
}
if (ORTE_SUCCESS != rc) {
state = ORTE_PROC_STATE_FAILED_TO_LAUNCH;
goto errorout;
}

/* did the user request we display output in xterms? */
if (NULL != orte_xterm && !ORTE_FLAG_TEST(jobdat, ORTE_JOB_FLAG_DEBUGGER_DAEMON)) {
opal_list_item_t *nmitem;
Expand Down Expand Up @@ -878,15 +840,14 @@ void orte_odls_base_spawn_proc(int fd, short sd, void *cbdata)
cd->argv[0] = param;
}

if (5 < opal_output_get_verbosity(orte_odls_base_framework.framework_output)) {
opal_output(orte_odls_base_framework.framework_output, "%s odls:launch spawning child %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name));
opal_output_verbose(5, orte_odls_base_framework.framework_output,
"%s odls:launch spawning child %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name));

if (15 < opal_output_get_verbosity(orte_odls_base_framework.framework_output)) {
/* dump what is going to be exec'd */
if (7 < opal_output_get_verbosity(orte_odls_base_framework.framework_output)) {
opal_dss.dump(orte_odls_base_framework.framework_output, app, ORTE_APP_CONTEXT);
}
opal_dss.dump(orte_odls_base_framework.framework_output, app, ORTE_APP_CONTEXT);
}

if (ORTE_SUCCESS != (rc = cd->fork_local(cd))) {
Expand Down Expand Up @@ -923,6 +884,9 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
orte_odls_spawn_caddy_t *cd;
opal_event_base_t *evb;
char *effective_dir = NULL;
char **argvptr;
char *pathenv = NULL, *mpiexec_pathenv = NULL;
char *full_search;

ORTE_ACQUIRE_OBJECT(caddy);

Expand Down Expand Up @@ -1105,6 +1069,44 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
goto GETOUT;
}

/* Search for the OMPI_exec_path and PATH settings in the environment. */
for (argvptr = app->env; *argvptr != NULL; argvptr++) {
if (0 == strncmp("OMPI_exec_path=", *argvptr, 15)) {
mpiexec_pathenv = *argvptr + 15;
}
if (0 == strncmp("PATH=", *argvptr, 5)) {
pathenv = *argvptr + 5;
}
}

/* If OMPI_exec_path is set (meaning --path was used), then create a
temporary environment to be used in the search for the executable.
The PATH setting in this temporary environment is a combination of
the OMPI_exec_path and PATH values. If OMPI_exec_path is not set,
then just use existing environment with PATH in it. */
if (NULL != mpiexec_pathenv) {
argvptr = NULL;
if (pathenv != NULL) {
asprintf(&full_search, "%s:%s", mpiexec_pathenv, pathenv);
} else {
asprintf(&full_search, "%s", mpiexec_pathenv);
}
opal_setenv("PATH", full_search, true, &argvptr);
free(full_search);
} else {
argvptr = app->env;
}

rc = orte_util_check_context_app(app, argvptr);
/* do not ERROR_LOG - it will be reported elsewhere */
if (NULL != mpiexec_pathenv) {
opal_argv_free(argvptr);
}
if (ORTE_SUCCESS != rc) {
goto GETOUT;
}


/* tell all children that they are being launched via ORTE */
opal_setenv(OPAL_MCA_PREFIX"orte_launch", "1", true, &app->env);

Expand Down Expand Up @@ -1186,10 +1188,17 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name)));

/* determine the thread that will handle this child */
++orte_odls_globals.next_base;
if (orte_odls_globals.num_threads <= orte_odls_globals.next_base) {
orte_odls_globals.next_base = 0;
}
evb = orte_odls_globals.ev_bases[orte_odls_globals.next_base];

/* set the waitpid callback here for thread protection and
* to ensure we can capture the callback on shortlived apps */
ORTE_FLAG_SET(child, ORTE_PROC_FLAG_ALIVE);
orte_wait_cb(child, ompi_odls_base_default_wait_local_proc, NULL);
orte_wait_cb(child, orte_odls_base_default_wait_local_proc, evb, NULL);

/* dispatch this child to the next available launch thread */
cd = OBJ_NEW(orte_odls_spawn_caddy_t);
Expand Down Expand Up @@ -1228,16 +1237,11 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
goto GETOUT;
}
}
++orte_odls_globals.next_base;
if (orte_odls_globals.num_threads <= orte_odls_globals.next_base) {
orte_odls_globals.next_base = 0;
}
opal_output_verbose(1, orte_odls_base_framework.framework_output,
"%s odls:dispatch %s to thread %d",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
ORTE_NAME_PRINT(&child->name),
orte_odls_globals.next_base);
evb = orte_odls_globals.ev_bases[orte_odls_globals.next_base];
opal_event_set(evb, &cd->ev, -1,
OPAL_EV_WRITE, orte_odls_base_spawn_proc, cd);
opal_event_set_priority(&cd->ev, ORTE_MSG_PRI);
Expand All @@ -1255,11 +1259,6 @@ void orte_odls_base_default_launch_local(int fd, short sd, void *cbdata)
free(effective_dir);
effective_dir = NULL;
}
/* tell the state machine that all local procs for this job
* were launched so that it can do whatever it needs to do,
* like send a state update message for all procs to the HNP
*/
ORTE_ACTIVATE_JOB_STATE(jobdat, ORTE_JOB_STATE_LOCAL_LAUNCH_COMPLETE);

ERROR_OUT:
/* ensure we reset our working directory back to our default location */
Expand Down Expand Up @@ -1323,8 +1322,10 @@ int orte_odls_base_default_signal_local_procs(const orte_process_name_t *proc, i
* Wait for a callback indicating the child has completed.
*/

void ompi_odls_base_default_wait_local_proc(orte_proc_t *proc, void* cbdata)
void orte_odls_base_default_wait_local_proc(int fd, short sd, void *cbdata)
{
orte_wait_tracker_t *t2 = (orte_wait_tracker_t*)cbdata;
orte_proc_t *proc = t2->child;
int i;
orte_job_t *jobdat;
orte_proc_state_t state=ORTE_PROC_STATE_WAITPID_FIRED;
Expand Down Expand Up @@ -1528,6 +1529,8 @@ void ompi_odls_base_default_wait_local_proc(orte_proc_t *proc, void* cbdata)
/* cancel the wait as this proc has already terminated */
orte_wait_cb_cancel(proc);
ORTE_ACTIVATE_PROC_STATE(&proc->name, state);
/* cleanup the tracker */
OBJ_RELEASE(t2);
}

typedef struct {
Expand Down Expand Up @@ -1903,17 +1906,17 @@ int orte_odls_base_default_restart_proc(orte_proc_t *child,
goto CLEANUP;
}
}
orte_wait_cb(child, ompi_odls_base_default_wait_local_proc, NULL);

++orte_odls_globals.next_base;
if (orte_odls_globals.num_threads <= orte_odls_globals.next_base) {
orte_odls_globals.next_base = 0;
}
evb = orte_odls_globals.ev_bases[orte_odls_globals.next_base];
orte_wait_cb(child, orte_odls_base_default_wait_local_proc, evb, NULL);

OPAL_OUTPUT_VERBOSE((5, orte_odls_base_framework.framework_output,
"%s restarting app %s",
ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), app->app));

evb = orte_odls_globals.ev_bases[orte_odls_globals.next_base];
opal_event_set(evb, &cd->ev, -1,
OPAL_EV_WRITE, orte_odls_base_spawn_proc, cd);
opal_event_set_priority(&cd->ev, ORTE_MSG_PRI);
Expand Down
Loading