@@ -135,12 +135,16 @@ int mca_pml_ucx_open(void)
135
135
UCP_PARAM_FIELD_REQUEST_SIZE |
136
136
UCP_PARAM_FIELD_REQUEST_INIT |
137
137
UCP_PARAM_FIELD_REQUEST_CLEANUP |
138
- UCP_PARAM_FIELD_TAG_SENDER_MASK ;
138
+ UCP_PARAM_FIELD_TAG_SENDER_MASK |
139
+ UCP_PARAM_FIELD_MT_WORKERS_SHARED ;
139
140
params .features = UCP_FEATURE_TAG ;
140
141
params .request_size = sizeof (ompi_request_t );
141
142
params .request_init = mca_pml_ucx_request_init ;
142
143
params .request_cleanup = mca_pml_ucx_request_cleanup ;
143
144
params .tag_sender_mask = PML_UCX_SPECIFIC_SOURCE_MASK ;
145
+ params .mt_workers_shared = 0 ; /* we do not need mt support for context
146
+ since it will be protected by worker */
147
+
144
148
145
149
status = ucp_init (& params , config , & ompi_pml_ucx .ucp_context );
146
150
ucp_config_release (config );
@@ -178,17 +182,42 @@ int mca_pml_ucx_init(void)
178
182
{
179
183
ucp_worker_params_t params ;
180
184
ucs_status_t status ;
185
+ ucp_worker_attr_t attr ;
181
186
int rc ;
182
187
183
188
PML_UCX_VERBOSE (1 , "mca_pml_ucx_init" );
184
189
185
190
/* TODO check MPI thread mode */
186
191
params .field_mask = UCP_WORKER_PARAM_FIELD_THREAD_MODE ;
187
192
params .thread_mode = UCS_THREAD_MODE_SINGLE ;
193
+ if (ompi_mpi_thread_multiple ) {
194
+ params .thread_mode = UCS_THREAD_MODE_MULTI ;
195
+ } else {
196
+ params .thread_mode = UCS_THREAD_MODE_SINGLE ;
197
+ }
188
198
189
199
status = ucp_worker_create (ompi_pml_ucx .ucp_context , & params ,
190
200
& ompi_pml_ucx .ucp_worker );
191
201
if (UCS_OK != status ) {
202
+ PML_UCX_ERROR ("Failed to create UCP worker" );
203
+ return OMPI_ERROR ;
204
+ }
205
+
206
+ attr .field_mask = UCP_WORKER_ATTR_FIELD_THREAD_MODE ;
207
+ status = ucp_worker_query (ompi_pml_ucx .ucp_worker , & attr );
208
+ if (UCS_OK != status ) {
209
+ ucp_worker_destroy (ompi_pml_ucx .ucp_worker );
210
+ ompi_pml_ucx .ucp_worker = NULL ;
211
+ PML_UCX_ERROR ("Failed to query UCP worker thread level" );
212
+ return OMPI_ERROR ;
213
+ }
214
+
215
+ if (ompi_mpi_thread_multiple && attr .thread_mode != UCS_THREAD_MODE_MULTI ) {
216
+ /* UCX does not support multithreading, disqualify current PML for now */
217
+ /* TODO: we should let OMPI to fallback to THREAD_SINGLE mode */
218
+ ucp_worker_destroy (ompi_pml_ucx .ucp_worker );
219
+ ompi_pml_ucx .ucp_worker = NULL ;
220
+ PML_UCX_ERROR ("UCP worker does not support MPI_THREAD_MULTIPLE" );
192
221
return OMPI_ERROR ;
193
222
}
194
223
0 commit comments