Skip to content

Topic/osc pt2pt fixes #2581

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
16 changes: 16 additions & 0 deletions ompi/mca/osc/pt2pt/osc_pt2pt.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@

BEGIN_C_DECLS

/*
* JJH: This is a hack, but it'll help us make forward progress for now.
* It is possible, in a multthreaded scenario, that the 'outgoing_frag_{signal_} count'
* is incremented in such a way that those waiting in opal_condition_wait for it
* to change never receive a signal. This is because the functions incrementing this
* value do not grab the 'module' lock, and doing so can lead to deadlock...
*
* Ideally we would grab the module lock in the mark_outgoing_completion() to
* prevent that race, but this can result in a deadlock by double locking that
* mutex. So that code need to be audited a bit more.
*/
#define OSC_PT2PT_HARD_SPIN_NO_CV_WAIT 1

struct ompi_osc_pt2pt_frag_t;
struct ompi_osc_pt2pt_receive_t;

Expand Down Expand Up @@ -913,11 +926,14 @@ static inline ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_module_sync_lookup (ompi_osc

return &module->all_sync;
case OMPI_OSC_PT2PT_SYNC_TYPE_PSCW:
OPAL_THREAD_LOCK(&module->all_sync.lock);
if (ompi_osc_pt2pt_sync_pscw_peer (module, target, peer)) {
OPAL_THREAD_UNLOCK(&module->all_sync.lock);
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"osc/pt2pt: found PSCW access epoch target for %d", target));
return &module->all_sync;
}
OPAL_THREAD_UNLOCK(&module->all_sync.lock);
}

return NULL;
Expand Down
43 changes: 41 additions & 2 deletions ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,18 @@ int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
/* wait for completion */
while (module->outgoing_frag_count != module->outgoing_frag_signal_count ||
module->active_incoming_frag_count < module->active_incoming_frag_signal_count) {
#ifdef OSC_PT2PT_HARD_SPIN_NO_CV_WAIT
/* It is possible that mark_outgoing_completion() is called just after the
* while loop condition, and before we go into the _wait() below. This will mean
* that we miss the signal, and block forever in the _wait().
*/
OPAL_THREAD_UNLOCK(&module->lock);
usleep(100);
opal_progress();
OPAL_THREAD_LOCK(&module->lock);
#else
opal_condition_wait(&module->cond, &module->lock);
#endif
}

if (assert & MPI_MODE_NOSUCCEED) {
Expand All @@ -213,9 +224,11 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
ompi_osc_pt2pt_sync_t *sync = &module->all_sync;

OPAL_THREAD_LOCK(&module->lock);
OPAL_THREAD_LOCK(&sync->lock);

/* check if we are already in an access epoch */
if (ompi_osc_pt2pt_access_epoch_active (module)) {
OPAL_THREAD_UNLOCK(&sync->lock);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_RMA_SYNC;
}
Expand All @@ -227,6 +240,12 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
/* haven't processed any post messages yet */
sync->sync_expected = sync->num_peers;

/* If the previous epoch was from Fence, then eager_send_active is still
* set to true at this time, but it shoulnd't be true until we get our
* incoming Posts. So reset to 'false' for this new epoch.
*/
sync->eager_send_active = false;

OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_start entering with group size %d...",
sync->num_peers));
Expand All @@ -243,6 +262,7 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
if (0 == ompi_group_size (group)) {
/* nothing more to do. this is an empty start epoch */
sync->eager_send_active = true;
OPAL_THREAD_UNLOCK(&sync->lock);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS;
}
Expand All @@ -252,12 +272,12 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
/* translate the group ranks into the communicator */
sync->peer_list.peers = ompi_osc_pt2pt_get_peers (module, group);
if (NULL == sync->peer_list.peers) {
OPAL_THREAD_UNLOCK(&sync->lock);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_ERR_OUT_OF_RESOURCE;
}

if (!(assert & MPI_MODE_NOCHECK)) {
OPAL_THREAD_LOCK(&sync->lock);
for (int i = 0 ; i < sync->num_peers ; ++i) {
ompi_osc_pt2pt_peer_t *peer = sync->peer_list.peers[i];

Expand All @@ -270,7 +290,6 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
ompi_osc_pt2pt_peer_set_unex (peer, false);
}
}
OPAL_THREAD_UNLOCK(&sync->lock);
} else {
sync->sync_expected = 0;
}
Expand All @@ -289,6 +308,7 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
"ompi_osc_pt2pt_start complete. eager sends active: %d",
sync->eager_send_active));

OPAL_THREAD_UNLOCK(&sync->lock);
OPAL_THREAD_UNLOCK(&module->lock);
return OMPI_SUCCESS;
}
Expand Down Expand Up @@ -398,7 +418,18 @@ int ompi_osc_pt2pt_complete (ompi_win_t *win)
/* wait for outgoing requests to complete. Don't wait for incoming, as
we're only completing the access epoch, not the exposure epoch */
while (module->outgoing_frag_count != module->outgoing_frag_signal_count) {
#ifdef OSC_PT2PT_HARD_SPIN_NO_CV_WAIT
/* It is possible that mark_outgoing_completion() is called just after the
* while loop condition, and before we go into the _wait() below. This will mean
* that we miss the signal, and block forever in the _wait().
*/
OPAL_THREAD_UNLOCK(&module->lock);
usleep(100);
opal_progress();
OPAL_THREAD_LOCK(&module->lock);
#else
opal_condition_wait(&module->cond, &module->lock);
#endif
}

/* unlock here, as group cleanup can take a while... */
Expand Down Expand Up @@ -516,7 +547,15 @@ int ompi_osc_pt2pt_wait (ompi_win_t *win)
"active_incoming_frag_count = %d, active_incoming_frag_signal_count = %d",
module->num_complete_msgs, module->active_incoming_frag_count,
module->active_incoming_frag_signal_count));
#ifdef OSC_PT2PT_HARD_SPIN_NO_CV_WAIT
/* Possible to not receive the signal to release based upon num_complete_msgs in osc_pt2pt_incoming_complete() */
OPAL_THREAD_UNLOCK(&module->lock);
usleep(100);
opal_progress();
OPAL_THREAD_LOCK(&module->lock);
#else
opal_condition_wait(&module->cond, &module->lock);
#endif
}

group = module->pw_group;
Expand Down
22 changes: 20 additions & 2 deletions ompi/mca/osc/pt2pt/osc_pt2pt_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,16 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_

if (is_long_msg) {
/* wait for eager sends to be active before starting a long put */
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
if (pt2pt_sync->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK) {
OPAL_THREAD_LOCK(&pt2pt_sync->lock);
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
while (!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_EAGER)) {
opal_condition_wait(&pt2pt_sync->cond, &pt2pt_sync->lock);
}
OPAL_THREAD_UNLOCK(&pt2pt_sync->lock);
} else {
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
}
}

OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
Expand Down Expand Up @@ -495,7 +504,16 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,

if (is_long_msg) {
/* wait for synchronization before posting a long message */
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
if (pt2pt_sync->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK) {
OPAL_THREAD_LOCK(&pt2pt_sync->lock);
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
while (!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_EAGER)) {
opal_condition_wait(&pt2pt_sync->cond, &pt2pt_sync->lock);
}
OPAL_THREAD_UNLOCK(&pt2pt_sync->lock);
} else {
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
}
}

header = (ompi_osc_pt2pt_header_acc_t*) ptr;
Expand Down
6 changes: 6 additions & 0 deletions ompi/mca/osc/pt2pt/osc_pt2pt_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,12 @@ static int component_register (void)

static int component_progress (void)
{
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_operations_lock);
int pending_count = opal_list_get_size (&mca_osc_pt2pt_component.pending_operations);
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_operations_lock);
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_receives_lock);
int recv_count = opal_list_get_size (&mca_osc_pt2pt_component.pending_receives);
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_receives_lock);
ompi_osc_pt2pt_pending_t *pending, *next;

if (recv_count) {
Expand Down Expand Up @@ -336,6 +340,8 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
goto cleanup;
}

module->num_complete_msgs = 0;

/* options */
/* FIX ME: should actually check this value... */
#if 1
Expand Down
4 changes: 3 additions & 1 deletion ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,9 @@ static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_os
}

/* add to the pending acc queue */
OPAL_THREAD_SCOPED_LOCK(&module->lock, opal_list_append (&module->pending_acc, &pending_acc->super));
ompi_osc_pt2pt_accumulate_lock(module);
opal_list_append (&module->pending_acc, &pending_acc->super);
ompi_osc_pt2pt_accumulate_unlock(module);

return OMPI_SUCCESS;
}
Expand Down
21 changes: 21 additions & 0 deletions ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,27 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win)
/* wait for unlock acks. this signals remote completion of fragments */
ompi_osc_pt2pt_sync_wait_expected (lock);

/* Mark - It is possible for the unlock to finish too early before
* the data is actually present in teh recv buf (for non-contiguous datatypes)
* So make sure to wait for all of the fragments to arrive.
*/
OPAL_THREAD_LOCK(&module->lock);
while (module->outgoing_frag_count < module->outgoing_frag_signal_count) {
#ifdef OSC_PT2PT_HARD_SPIN_NO_CV_WAIT
/* It is possible that mark_outgoing_completion() is called just after the
* while loop condition, and before we go into the _wait() below. This will mean
* that we miss the signal, and block forever in the _wait().
*/
OPAL_THREAD_UNLOCK(&module->lock);
usleep(100);
opal_progress();
OPAL_THREAD_LOCK(&module->lock);
#else
opal_condition_wait(&module->cond, &module->lock);
#endif
}
OPAL_THREAD_UNLOCK(&module->lock);

OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
"ompi_osc_pt2pt_unlock: unlock of %d complete", target));
} else {
Expand Down
8 changes: 5 additions & 3 deletions ompi/mca/osc/pt2pt/osc_pt2pt_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_request_t);
do { \
opal_free_list_item_t *item; \
do { \
item = opal_free_list_get (&mca_osc_pt2pt_component.requests); \
OPAL_THREAD_SCOPED_LOCK(&mca_osc_pt2pt_component.lock, \
item = opal_free_list_get (&mca_osc_pt2pt_component.requests)); \
if (NULL == item) { \
opal_progress(); \
} \
Expand All @@ -58,8 +59,9 @@ OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_request_t);
do { \
OMPI_REQUEST_FINI(&(req)->super); \
(req)->outstanding_requests = 0; \
opal_free_list_return (&mca_osc_pt2pt_component.requests, \
(opal_free_list_item_t *) (req)); \
OPAL_THREAD_SCOPED_LOCK(&mca_osc_pt2pt_component.lock, \
opal_free_list_return (&mca_osc_pt2pt_component.requests, \
(opal_free_list_item_t *) (req))); \
} while (0)

static inline void ompi_osc_pt2pt_request_complete (ompi_osc_pt2pt_request_t *request, int mpi_error)
Expand Down
2 changes: 1 addition & 1 deletion ompi/request/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ static inline int ompi_request_complete(ompi_request_t* request, bool with_signa
ompi_wait_sync_t *tmp_sync = (ompi_wait_sync_t *) OPAL_ATOMIC_SWAP_PTR(&request->req_complete,
REQUEST_COMPLETED);
/* In the case where another thread concurrently changed the request to REQUEST_PENDING */
if( REQUEST_PENDING != tmp_sync )
if( REQUEST_PENDING != tmp_sync && REQUEST_COMPLETED != tmp_sync )
wait_sync_update(tmp_sync, 1, request->req_status.MPI_ERROR);
}
} else
Expand Down