Skip to content

opal/sync: fix race condition #1816

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions ompi/request/req_wait.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* Copyright (c) 2012 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2016 Mellanox Technologies. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -116,7 +117,8 @@ int ompi_request_default_wait_any(size_t count,
if (MPI_STATUS_IGNORE != status) {
*status = ompi_status_empty;
}
WAIT_SYNC_RELEASE(&sync);
/* No signal-in-flight can be in this case */
WAIT_SYNC_RELEASE_NOWAIT(&sync);
return rc;
}

Expand All @@ -140,6 +142,15 @@ int ompi_request_default_wait_any(size_t count,
*index = i;
}
}

if( *index == completed ){
/* Only one request has triggered. There was no
* in-flight completions.
* Drop the signalled flag so we won't block
* in WAIT_SYNC_RELEASE
*/
WAIT_SYNC_SIGNALLED(&sync);
}

request = requests[*index];
assert( REQUEST_COMPLETE(request) );
Expand Down Expand Up @@ -361,7 +372,8 @@ int ompi_request_default_wait_some(size_t count,
ompi_request_t **rptr = NULL;
ompi_request_t *request = NULL;
ompi_wait_sync_t sync;

size_t sync_sets = 0, sync_unsets = 0;

WAIT_SYNC_INIT(&sync, 1);

*outcount = 0;
Expand All @@ -386,10 +398,12 @@ int ompi_request_default_wait_some(size_t count,
num_requests_done++;
}
}
sync_sets = count - num_requests_null_inactive - num_requests_done;

if(num_requests_null_inactive == count) {
*outcount = MPI_UNDEFINED;
WAIT_SYNC_RELEASE(&sync);
/* nobody will signall us */
WAIT_SYNC_RELEASE_NOWAIT(&sync);
return rc;
}

Expand Down Expand Up @@ -420,6 +434,14 @@ int ompi_request_default_wait_some(size_t count,
num_requests_done++;
}
}
sync_unsets = count - num_requests_null_inactive - num_requests_done;

if( sync_sets == sync_unsets ){
/* nobody knows about us,
* set signa-in-progress flag to false
*/
WAIT_SYNC_SIGNALLED(&sync);
}

WAIT_SYNC_RELEASE(&sync);

Expand Down
33 changes: 30 additions & 3 deletions opal/threads/wait_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* reserved.
* Copyright (c) 2016 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2016 Mellanox Technologies. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand All @@ -24,26 +25,50 @@ typedef struct ompi_wait_sync_t {
pthread_mutex_t lock;
struct ompi_wait_sync_t *next;
struct ompi_wait_sync_t *prev;
volatile bool signaling;
} ompi_wait_sync_t;

#define REQUEST_PENDING (void*)0L
#define REQUEST_COMPLETED (void*)1L

#define SYNC_WAIT(sync) (opal_using_threads() ? sync_wait_mt (sync) : sync_wait_st (sync))

/* The loop in release handles a race condition between the signaling
* thread and the destruction of the condition variable. The signaling
* member will be set to false after the final signaling thread has
* finished opertating on the sync object. This is done to avoid
* extra atomics in the singalling function and keep it as fast
* as possible. Note that the race window is small so spinning here
* is more optimal than sleeping since this macro is called in
* the critical path. */
#define WAIT_SYNC_RELEASE(sync) \
if (opal_using_threads()) { \
pthread_cond_destroy(&(sync)->condition); \
pthread_mutex_destroy(&(sync)->lock); \
while ((sync)->signaling) { \
continue; \
} \
pthread_cond_destroy(&(sync)->condition); \
pthread_mutex_destroy(&(sync)->lock); \
}

#define WAIT_SYNC_RELEASE_NOWAIT(sync) \
if (opal_using_threads()) { \
pthread_cond_destroy(&(sync)->condition); \
pthread_mutex_destroy(&(sync)->lock); \
}


#define WAIT_SYNC_SIGNAL(sync) \
if (opal_using_threads()) { \
pthread_mutex_lock(&(sync->lock)); \
pthread_cond_signal(&sync->condition); \
pthread_mutex_unlock(&(sync->lock)); \
sync->signaling = false; \
}

#define WAIT_SYNC_SIGNALLED(sync){ \
(sync)->signaling = false; \
}

OPAL_DECLSPEC int sync_wait_mt(ompi_wait_sync_t *sync);
static inline int sync_wait_st (ompi_wait_sync_t *sync)
{
Expand All @@ -61,6 +86,7 @@ static inline int sync_wait_st (ompi_wait_sync_t *sync)
(sync)->next = NULL; \
(sync)->prev = NULL; \
(sync)->status = 0; \
(sync)->signaling = true; \
if (opal_using_threads()) { \
pthread_cond_init (&(sync)->condition, NULL); \
pthread_mutex_init (&(sync)->lock, NULL); \
Expand All @@ -81,8 +107,9 @@ static inline void wait_sync_update(ompi_wait_sync_t *sync, int updates, int sta
}
} else {
/* this is an error path so just use the atomic */
opal_atomic_swap_32 (&sync->count, 0);
sync->status = OPAL_ERROR;
opal_atomic_wmb ();
opal_atomic_swap_32 (&sync->count, 0);
}
WAIT_SYNC_SIGNAL(sync);
}
Expand Down