diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt.h b/ompi/mca/osc/pt2pt/osc_pt2pt.h index f31f429746e..f9fd2d93a5d 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt.h @@ -46,6 +46,19 @@ BEGIN_C_DECLS +/* + * JJH: This is a hack, but it'll help us make forward progress for now. + * It is possible, in a multthreaded scenario, that the 'outgoing_frag_{signal_} count' + * is incremented in such a way that those waiting in opal_condition_wait for it + * to change never receive a signal. This is because the functions incrementing this + * value do not grab the 'module' lock, and doing so can lead to deadlock... + * + * Ideally we would grab the module lock in the mark_outgoing_completion() to + * prevent that race, but this can result in a deadlock by double locking that + * mutex. So that code need to be audited a bit more. + */ +#define OSC_PT2PT_HARD_SPIN_NO_CV_WAIT 1 + struct ompi_osc_pt2pt_frag_t; struct ompi_osc_pt2pt_receive_t; @@ -913,11 +926,14 @@ static inline ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_module_sync_lookup (ompi_osc return &module->all_sync; case OMPI_OSC_PT2PT_SYNC_TYPE_PSCW: + OPAL_THREAD_LOCK(&module->all_sync.lock); if (ompi_osc_pt2pt_sync_pscw_peer (module, target, peer)) { + OPAL_THREAD_UNLOCK(&module->all_sync.lock); OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "osc/pt2pt: found PSCW access epoch target for %d", target)); return &module->all_sync; } + OPAL_THREAD_UNLOCK(&module->all_sync.lock); } return NULL; diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c index 3c086a42f26..81c07fabb83 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c @@ -186,7 +186,18 @@ int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win) /* wait for completion */ while (module->outgoing_frag_count != module->outgoing_frag_signal_count || module->active_incoming_frag_count < module->active_incoming_frag_signal_count) { +#ifdef OSC_PT2PT_HARD_SPIN_NO_CV_WAIT + /* It is possible that mark_outgoing_completion() is called just after the + * while loop condition, and before we go into the _wait() below. This will mean + * that we miss the signal, and block forever in the _wait(). + */ + OPAL_THREAD_UNLOCK(&module->lock); + usleep(100); + opal_progress(); + OPAL_THREAD_LOCK(&module->lock); +#else opal_condition_wait(&module->cond, &module->lock); +#endif } if (assert & MPI_MODE_NOSUCCEED) { @@ -213,9 +224,11 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win) ompi_osc_pt2pt_sync_t *sync = &module->all_sync; OPAL_THREAD_LOCK(&module->lock); + OPAL_THREAD_LOCK(&sync->lock); /* check if we are already in an access epoch */ if (ompi_osc_pt2pt_access_epoch_active (module)) { + OPAL_THREAD_UNLOCK(&sync->lock); OPAL_THREAD_UNLOCK(&module->lock); return OMPI_ERR_RMA_SYNC; } @@ -227,6 +240,12 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win) /* haven't processed any post messages yet */ sync->sync_expected = sync->num_peers; + /* If the previous epoch was from Fence, then eager_send_active is still + * set to true at this time, but it shoulnd't be true until we get our + * incoming Posts. So reset to 'false' for this new epoch. + */ + sync->eager_send_active = false; + OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_start entering with group size %d...", sync->num_peers)); @@ -243,6 +262,7 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win) if (0 == ompi_group_size (group)) { /* nothing more to do. this is an empty start epoch */ sync->eager_send_active = true; + OPAL_THREAD_UNLOCK(&sync->lock); OPAL_THREAD_UNLOCK(&module->lock); return OMPI_SUCCESS; } @@ -252,12 +272,12 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win) /* translate the group ranks into the communicator */ sync->peer_list.peers = ompi_osc_pt2pt_get_peers (module, group); if (NULL == sync->peer_list.peers) { + OPAL_THREAD_UNLOCK(&sync->lock); OPAL_THREAD_UNLOCK(&module->lock); return OMPI_ERR_OUT_OF_RESOURCE; } if (!(assert & MPI_MODE_NOCHECK)) { - OPAL_THREAD_LOCK(&sync->lock); for (int i = 0 ; i < sync->num_peers ; ++i) { ompi_osc_pt2pt_peer_t *peer = sync->peer_list.peers[i]; @@ -270,7 +290,6 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win) ompi_osc_pt2pt_peer_set_unex (peer, false); } } - OPAL_THREAD_UNLOCK(&sync->lock); } else { sync->sync_expected = 0; } @@ -289,6 +308,7 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win) "ompi_osc_pt2pt_start complete. eager sends active: %d", sync->eager_send_active)); + OPAL_THREAD_UNLOCK(&sync->lock); OPAL_THREAD_UNLOCK(&module->lock); return OMPI_SUCCESS; } @@ -398,7 +418,18 @@ int ompi_osc_pt2pt_complete (ompi_win_t *win) /* wait for outgoing requests to complete. Don't wait for incoming, as we're only completing the access epoch, not the exposure epoch */ while (module->outgoing_frag_count != module->outgoing_frag_signal_count) { +#ifdef OSC_PT2PT_HARD_SPIN_NO_CV_WAIT + /* It is possible that mark_outgoing_completion() is called just after the + * while loop condition, and before we go into the _wait() below. This will mean + * that we miss the signal, and block forever in the _wait(). + */ + OPAL_THREAD_UNLOCK(&module->lock); + usleep(100); + opal_progress(); + OPAL_THREAD_LOCK(&module->lock); +#else opal_condition_wait(&module->cond, &module->lock); +#endif } /* unlock here, as group cleanup can take a while... */ @@ -516,7 +547,15 @@ int ompi_osc_pt2pt_wait (ompi_win_t *win) "active_incoming_frag_count = %d, active_incoming_frag_signal_count = %d", module->num_complete_msgs, module->active_incoming_frag_count, module->active_incoming_frag_signal_count)); +#ifdef OSC_PT2PT_HARD_SPIN_NO_CV_WAIT + /* Possible to not receive the signal to release based upon num_complete_msgs in osc_pt2pt_incoming_complete() */ + OPAL_THREAD_UNLOCK(&module->lock); + usleep(100); + opal_progress(); + OPAL_THREAD_LOCK(&module->lock); +#else opal_condition_wait(&module->cond, &module->lock); +#endif } group = module->pw_group; diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c index 4d16f1a8ce9..2194801f92b 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_comm.c @@ -336,7 +336,16 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_ if (is_long_msg) { /* wait for eager sends to be active before starting a long put */ - ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); + if (pt2pt_sync->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK) { + OPAL_THREAD_LOCK(&pt2pt_sync->lock); + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target); + while (!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_EAGER)) { + opal_condition_wait(&pt2pt_sync->cond, &pt2pt_sync->lock); + } + OPAL_THREAD_UNLOCK(&pt2pt_sync->lock); + } else { + ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); + } } OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, @@ -495,7 +504,16 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count, if (is_long_msg) { /* wait for synchronization before posting a long message */ - ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); + if (pt2pt_sync->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK) { + OPAL_THREAD_LOCK(&pt2pt_sync->lock); + ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target); + while (!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_EAGER)) { + opal_condition_wait(&pt2pt_sync->cond, &pt2pt_sync->lock); + } + OPAL_THREAD_UNLOCK(&pt2pt_sync->lock); + } else { + ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync); + } } header = (ompi_osc_pt2pt_header_acc_t*) ptr; diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c index e41a8306b7d..f155f2e8df3 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_component.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_component.c @@ -153,8 +153,12 @@ static int component_register (void) static int component_progress (void) { + OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_operations_lock); int pending_count = opal_list_get_size (&mca_osc_pt2pt_component.pending_operations); + OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_operations_lock); + OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_receives_lock); int recv_count = opal_list_get_size (&mca_osc_pt2pt_component.pending_receives); + OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_receives_lock); ompi_osc_pt2pt_pending_t *pending, *next; if (recv_count) { @@ -336,6 +340,8 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit goto cleanup; } + module->num_complete_msgs = 0; + /* options */ /* FIX ME: should actually check this value... */ #if 1 diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c index 059c83be450..d5e158bb8fe 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c @@ -747,7 +747,9 @@ static int ompi_osc_pt2pt_acc_op_queue (ompi_osc_pt2pt_module_t *module, ompi_os } /* add to the pending acc queue */ - OPAL_THREAD_SCOPED_LOCK(&module->lock, opal_list_append (&module->pending_acc, &pending_acc->super)); + ompi_osc_pt2pt_accumulate_lock(module); + opal_list_append (&module->pending_acc, &pending_acc->super); + ompi_osc_pt2pt_accumulate_unlock(module); return OMPI_SUCCESS; } diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c index 4721f82d004..006839687ae 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c @@ -419,6 +419,27 @@ static int ompi_osc_pt2pt_unlock_internal (int target, ompi_win_t *win) /* wait for unlock acks. this signals remote completion of fragments */ ompi_osc_pt2pt_sync_wait_expected (lock); + /* Mark - It is possible for the unlock to finish too early before + * the data is actually present in teh recv buf (for non-contiguous datatypes) + * So make sure to wait for all of the fragments to arrive. + */ + OPAL_THREAD_LOCK(&module->lock); + while (module->outgoing_frag_count < module->outgoing_frag_signal_count) { +#ifdef OSC_PT2PT_HARD_SPIN_NO_CV_WAIT + /* It is possible that mark_outgoing_completion() is called just after the + * while loop condition, and before we go into the _wait() below. This will mean + * that we miss the signal, and block forever in the _wait(). + */ + OPAL_THREAD_UNLOCK(&module->lock); + usleep(100); + opal_progress(); + OPAL_THREAD_LOCK(&module->lock); +#else + opal_condition_wait(&module->cond, &module->lock); +#endif + } + OPAL_THREAD_UNLOCK(&module->lock); + OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_unlock: unlock of %d complete", target)); } else { diff --git a/ompi/mca/osc/pt2pt/osc_pt2pt_request.h b/ompi/mca/osc/pt2pt/osc_pt2pt_request.h index dee5c86892d..de8c1cc9a8c 100644 --- a/ompi/mca/osc/pt2pt/osc_pt2pt_request.h +++ b/ompi/mca/osc/pt2pt/osc_pt2pt_request.h @@ -40,7 +40,8 @@ OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_request_t); do { \ opal_free_list_item_t *item; \ do { \ - item = opal_free_list_get (&mca_osc_pt2pt_component.requests); \ + OPAL_THREAD_SCOPED_LOCK(&mca_osc_pt2pt_component.lock, \ + item = opal_free_list_get (&mca_osc_pt2pt_component.requests)); \ if (NULL == item) { \ opal_progress(); \ } \ @@ -58,8 +59,9 @@ OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_request_t); do { \ OMPI_REQUEST_FINI(&(req)->super); \ (req)->outstanding_requests = 0; \ - opal_free_list_return (&mca_osc_pt2pt_component.requests, \ - (opal_free_list_item_t *) (req)); \ + OPAL_THREAD_SCOPED_LOCK(&mca_osc_pt2pt_component.lock, \ + opal_free_list_return (&mca_osc_pt2pt_component.requests, \ + (opal_free_list_item_t *) (req))); \ } while (0) static inline void ompi_osc_pt2pt_request_complete (ompi_osc_pt2pt_request_t *request, int mpi_error) diff --git a/ompi/request/request.h b/ompi/request/request.h index aaac0df912b..f67d76aaaf3 100644 --- a/ompi/request/request.h +++ b/ompi/request/request.h @@ -428,7 +428,7 @@ static inline int ompi_request_complete(ompi_request_t* request, bool with_signa ompi_wait_sync_t *tmp_sync = (ompi_wait_sync_t *) OPAL_ATOMIC_SWAP_PTR(&request->req_complete, REQUEST_COMPLETED); /* In the case where another thread concurrently changed the request to REQUEST_PENDING */ - if( REQUEST_PENDING != tmp_sync ) + if( REQUEST_PENDING != tmp_sync && REQUEST_COMPLETED != tmp_sync ) wait_sync_update(tmp_sync, 1, request->req_status.MPI_ERROR); } } else