Skip to content

Commit 507fcc9

Browse files
authored
Merge pull request #6806 from ggouaillardet/topic/v4.0.x/nbc_retain
v4.0.x: retain operation and datatype(s) in non blocking collectives
2 parents d3a7360 + c9e4240 commit 507fcc9

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+673
-144
lines changed

ompi/mca/coll/base/coll_base_util.c

Lines changed: 187 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
* University of Stuttgart. All rights reserved.
1010
* Copyright (c) 2004-2005 The Regents of the University of California.
1111
* All rights reserved.
12-
* Copyright (c) 2014-2017 Research Organization for Information Science
13-
* and Technology (RIST). All rights reserved.
12+
* Copyright (c) 2014-2019 Research Organization for Information Science
13+
* and Technology (RIST). All rights reserved.
1414
* $COPYRIGHT$
1515
*
1616
* Additional copyrights may follow
@@ -26,6 +26,7 @@
2626
#include "ompi/communicator/communicator.h"
2727
#include "ompi/mca/coll/base/coll_tags.h"
2828
#include "ompi/mca/coll/base/coll_base_functions.h"
29+
#include "ompi/mca/topo/base/base.h"
2930
#include "ompi/mca/pml/pml.h"
3031
#include "coll_base_util.h"
3132

@@ -103,3 +104,187 @@ int ompi_rounddown(int num, int factor)
103104
num /= factor;
104105
return num * factor; /* floor(num / factor) * factor */
105106
}
107+
108+
static void release_objs_callback(struct ompi_coll_base_nbc_request_t *request) {
109+
if (NULL != request->data.objs.objs[0]) {
110+
OBJ_RELEASE(request->data.objs.objs[0]);
111+
}
112+
if (NULL != request->data.objs.objs[1]) {
113+
OBJ_RELEASE(request->data.objs.objs[1]);
114+
}
115+
}
116+
117+
static int complete_objs_callback(struct ompi_request_t *req) {
118+
struct ompi_coll_base_nbc_request_t *request = (ompi_coll_base_nbc_request_t *)req;
119+
int rc = OMPI_SUCCESS;
120+
assert (NULL != request);
121+
if (NULL != request->cb.req_complete_cb) {
122+
rc = request->cb.req_complete_cb(request->req_complete_cb_data);
123+
}
124+
release_objs_callback(request);
125+
return rc;
126+
}
127+
128+
static int free_objs_callback(struct ompi_request_t **rptr) {
129+
struct ompi_coll_base_nbc_request_t *request = *(ompi_coll_base_nbc_request_t **)rptr;
130+
int rc = OMPI_SUCCESS;
131+
if (NULL != request->cb.req_free) {
132+
rc = request->cb.req_free(rptr);
133+
}
134+
release_objs_callback(request);
135+
return rc;
136+
}
137+
138+
int ompi_coll_base_retain_op( ompi_request_t *req, ompi_op_t *op,
139+
ompi_datatype_t *type) {
140+
ompi_coll_base_nbc_request_t *request = (ompi_coll_base_nbc_request_t *)req;
141+
bool retain = false;
142+
if (!ompi_op_is_intrinsic(op)) {
143+
OBJ_RETAIN(op);
144+
request->data.op.op = op;
145+
retain = true;
146+
}
147+
if (!ompi_datatype_is_predefined(type)) {
148+
OBJ_RETAIN(type);
149+
request->data.op.datatype = type;
150+
retain = true;
151+
}
152+
if (OPAL_UNLIKELY(retain)) {
153+
/* We need to consider two cases :
154+
* - non blocking collectives:
155+
* the objects can be released when MPI_Wait() completes
156+
* and we use the req_complete_cb callback
157+
* - persistent non blocking collectives:
158+
* the objects can only be released when the request is freed
159+
* (e.g. MPI_Request_free() completes) and we use req_free callback
160+
*/
161+
if (req->req_persistent) {
162+
request->cb.req_free = req->req_free;
163+
req->req_free = free_objs_callback;
164+
} else {
165+
request->cb.req_complete_cb = req->req_complete_cb;
166+
request->req_complete_cb_data = req->req_complete_cb_data;
167+
req->req_complete_cb = complete_objs_callback;
168+
req->req_complete_cb_data = request;
169+
}
170+
}
171+
return OMPI_SUCCESS;
172+
}
173+
174+
int ompi_coll_base_retain_datatypes( ompi_request_t *req, ompi_datatype_t *stype,
175+
ompi_datatype_t *rtype) {
176+
ompi_coll_base_nbc_request_t *request = (ompi_coll_base_nbc_request_t *)req;
177+
bool retain = false;
178+
if (NULL != stype && !ompi_datatype_is_predefined(stype)) {
179+
OBJ_RETAIN(stype);
180+
request->data.types.stype = stype;
181+
retain = true;
182+
}
183+
if (NULL != rtype && !ompi_datatype_is_predefined(rtype)) {
184+
OBJ_RETAIN(rtype);
185+
request->data.types.rtype = rtype;
186+
retain = true;
187+
}
188+
if (OPAL_UNLIKELY(retain)) {
189+
if (req->req_persistent) {
190+
request->cb.req_free = req->req_free;
191+
req->req_free = free_objs_callback;
192+
} else {
193+
request->cb.req_complete_cb = req->req_complete_cb;
194+
request->req_complete_cb_data = req->req_complete_cb_data;
195+
req->req_complete_cb = complete_objs_callback;
196+
req->req_complete_cb_data = request;
197+
}
198+
}
199+
return OMPI_SUCCESS;
200+
}
201+
202+
static void release_vecs_callback(ompi_coll_base_nbc_request_t *request) {
203+
ompi_communicator_t *comm = request->super.req_mpi_object.comm;
204+
int scount, rcount;
205+
if (OMPI_COMM_IS_TOPO(comm)) {
206+
(void)mca_topo_base_neighbor_count (comm, &rcount, &scount);
207+
} else {
208+
scount = rcount = OMPI_COMM_IS_INTER(comm)?ompi_comm_remote_size(comm):ompi_comm_size(comm);
209+
}
210+
for (int i=0; i<scount; i++) {
211+
if (NULL != request->data.vecs.stypes && NULL != request->data.vecs.stypes[i]) {
212+
OMPI_DATATYPE_RELEASE(request->data.vecs.stypes[i]);
213+
}
214+
}
215+
for (int i=0; i<rcount; i++) {
216+
if (NULL != request->data.vecs.rtypes && NULL != request->data.vecs.rtypes[i]) {
217+
OMPI_DATATYPE_RELEASE(request->data.vecs.rtypes[i]);
218+
}
219+
}
220+
}
221+
222+
static int complete_vecs_callback(struct ompi_request_t *req) {
223+
ompi_coll_base_nbc_request_t *request = (ompi_coll_base_nbc_request_t *)req;
224+
int rc = OMPI_SUCCESS;
225+
assert (NULL != request);
226+
if (NULL != request->cb.req_complete_cb) {
227+
rc = request->cb.req_complete_cb(request->req_complete_cb_data);
228+
}
229+
release_vecs_callback(request);
230+
return rc;
231+
}
232+
233+
static int free_vecs_callback(struct ompi_request_t **rptr) {
234+
struct ompi_coll_base_nbc_request_t *request = *(ompi_coll_base_nbc_request_t **)rptr;
235+
int rc = OMPI_SUCCESS;
236+
if (NULL != request->cb.req_free) {
237+
rc = request->cb.req_free(rptr);
238+
}
239+
release_vecs_callback(request);
240+
return rc;
241+
}
242+
243+
int ompi_coll_base_retain_datatypes_w( ompi_request_t *req,
244+
ompi_datatype_t *stypes[], ompi_datatype_t *rtypes[]) {
245+
ompi_coll_base_nbc_request_t *request = (ompi_coll_base_nbc_request_t *)req;
246+
bool retain = false;
247+
ompi_communicator_t *comm = request->super.req_mpi_object.comm;
248+
int scount, rcount;
249+
if (OMPI_COMM_IS_TOPO(comm)) {
250+
(void)mca_topo_base_neighbor_count (comm, &rcount, &scount);
251+
} else {
252+
scount = rcount = OMPI_COMM_IS_INTER(comm)?ompi_comm_remote_size(comm):ompi_comm_size(comm);
253+
}
254+
255+
for (int i=0; i<scount; i++) {
256+
if (NULL != stypes && NULL != stypes[i] && !ompi_datatype_is_predefined(stypes[i])) {
257+
OBJ_RETAIN(stypes[i]);
258+
retain = true;
259+
}
260+
}
261+
for (int i=0; i<rcount; i++) {
262+
if (NULL != rtypes && NULL != rtypes[i] && !ompi_datatype_is_predefined(rtypes[i])) {
263+
OBJ_RETAIN(rtypes[i]);
264+
retain = true;
265+
}
266+
}
267+
if (OPAL_UNLIKELY(retain)) {
268+
request->data.vecs.stypes = stypes;
269+
request->data.vecs.rtypes = rtypes;
270+
if (req->req_persistent) {
271+
request->cb.req_free = req->req_free;
272+
req->req_free = free_vecs_callback;
273+
} else {
274+
request->cb.req_complete_cb = req->req_complete_cb;
275+
request->req_complete_cb_data = req->req_complete_cb_data;
276+
req->req_complete_cb = complete_vecs_callback;
277+
req->req_complete_cb_data = request;
278+
}
279+
}
280+
return OMPI_SUCCESS;
281+
}
282+
283+
static void nbc_req_cons(ompi_coll_base_nbc_request_t *req) {
284+
req->cb.req_complete_cb = NULL;
285+
req->req_complete_cb_data = NULL;
286+
req->data.objs.objs[0] = NULL;
287+
req->data.objs.objs[1] = NULL;
288+
}
289+
290+
OBJ_CLASS_INSTANCE(ompi_coll_base_nbc_request_t, ompi_request_t, nbc_req_cons, NULL);

ompi/mca/coll/base/coll_base_util.h

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
* University of Stuttgart. All rights reserved.
1010
* Copyright (c) 2004-2005 The Regents of the University of California.
1111
* All rights reserved.
12-
* Copyright (c) 2014-2017 Research Organization for Information Science
13-
* and Technology (RIST). All rights reserved.
12+
* Copyright (c) 2014-2019 Research Organization for Information Science
13+
* and Technology (RIST). All rights reserved.
1414
* $COPYRIGHT$
1515
*
1616
* Additional copyrights may follow
@@ -27,10 +27,41 @@
2727
#include "ompi/mca/mca.h"
2828
#include "ompi/datatype/ompi_datatype.h"
2929
#include "ompi/request/request.h"
30+
#include "ompi/op/op.h"
3031
#include "ompi/mca/pml/pml.h"
3132

3233
BEGIN_C_DECLS
3334

35+
struct ompi_coll_base_nbc_request_t {
36+
ompi_request_t super;
37+
union {
38+
ompi_request_complete_fn_t req_complete_cb;
39+
ompi_request_free_fn_t req_free;
40+
} cb;
41+
void *req_complete_cb_data;
42+
union {
43+
struct {
44+
ompi_op_t *op;
45+
ompi_datatype_t *datatype;
46+
} op;
47+
struct {
48+
ompi_datatype_t *stype;
49+
ompi_datatype_t *rtype;
50+
} types;
51+
struct {
52+
opal_object_t *objs[2];
53+
} objs;
54+
struct {
55+
ompi_datatype_t **stypes;
56+
ompi_datatype_t **rtypes;
57+
} vecs;
58+
} data;
59+
};
60+
61+
OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_coll_base_nbc_request_t);
62+
63+
typedef struct ompi_coll_base_nbc_request_t ompi_coll_base_nbc_request_t;
64+
3465
/**
3566
* A MPI_like function doing a send and a receive simultaneously.
3667
* If one of the communications results in a zero-byte message the
@@ -84,5 +115,17 @@ unsigned int ompi_mirror_perm(unsigned int x, int nbits);
84115
*/
85116
int ompi_rounddown(int num, int factor);
86117

118+
int ompi_coll_base_retain_op( ompi_request_t *request,
119+
ompi_op_t *op,
120+
ompi_datatype_t *type);
121+
122+
int ompi_coll_base_retain_datatypes( ompi_request_t *request,
123+
ompi_datatype_t *stype,
124+
ompi_datatype_t *rtype);
125+
126+
int ompi_coll_base_retain_datatypes_w( ompi_request_t *request,
127+
ompi_datatype_t *stypes[],
128+
ompi_datatype_t *rtypes[]);
129+
87130
END_C_DECLS
88131
#endif /* MCA_COLL_BASE_UTIL_EXPORT_H */

ompi/mca/coll/libnbc/coll_libnbc.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
1414
* Copyright (c) 2013-2015 Los Alamos National Security, LLC. All rights
1515
* reserved.
16-
* Copyright (c) 2014-2017 Research Organization for Information Science
17-
* and Technology (RIST). All rights reserved.
16+
* Copyright (c) 2014-2019 Research Organization for Information Science
17+
* and Technology (RIST). All rights reserved.
1818
* Copyright (c) 2016-2017 IBM Corporation. All rights reserved.
1919
* Copyright (c) 2018 FUJITSU LIMITED. All rights reserved.
2020
* $COPYRIGHT$
@@ -28,7 +28,7 @@
2828
#define MCA_COLL_LIBNBC_EXPORT_H
2929

3030
#include "ompi/mca/coll/coll.h"
31-
#include "ompi/request/request.h"
31+
#include "ompi/mca/coll/base/coll_base_util.h"
3232
#include "opal/sys/atomic.h"
3333

3434
BEGIN_C_DECLS
@@ -114,7 +114,7 @@ typedef struct NBC_Schedule NBC_Schedule;
114114
OBJ_CLASS_DECLARATION(NBC_Schedule);
115115

116116
struct ompi_coll_libnbc_request_t {
117-
ompi_request_t super;
117+
ompi_coll_base_nbc_request_t super;
118118
MPI_Comm comm;
119119
long row_offset;
120120
bool nbc_complete; /* status in libnbc level */
@@ -138,13 +138,13 @@ typedef ompi_coll_libnbc_request_t NBC_Handle;
138138
opal_free_list_item_t *item; \
139139
item = opal_free_list_wait (&mca_coll_libnbc_component.requests); \
140140
req = (ompi_coll_libnbc_request_t*) item; \
141-
OMPI_REQUEST_INIT(&req->super, persistent); \
142-
req->super.req_mpi_object.comm = comm; \
141+
OMPI_REQUEST_INIT(&req->super.super, persistent); \
142+
req->super.super.req_mpi_object.comm = comm; \
143143
} while (0)
144144

145145
#define OMPI_COLL_LIBNBC_REQUEST_RETURN(req) \
146146
do { \
147-
OMPI_REQUEST_FINI(&(req)->super); \
147+
OMPI_REQUEST_FINI(&(req)->super.super); \
148148
opal_free_list_return (&mca_coll_libnbc_component.requests, \
149149
(opal_free_list_item_t*) (req)); \
150150
} while (0)

ompi/mca/coll/libnbc/coll_libnbc_component.c

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
* Copyright (c) 2008 Cisco Systems, Inc. All rights reserved.
1414
* Copyright (c) 2013-2015 Los Alamos National Security, LLC. All rights
1515
* reserved.
16-
* Copyright (c) 2016-2017 Research Organization for Information Science
17-
* and Technology (RIST). All rights reserved.
16+
* Copyright (c) 2016-2019 Research Organization for Information Science
17+
* and Technology (RIST). All rights reserved.
1818
* Copyright (c) 2016 IBM Corporation. All rights reserved.
1919
* Copyright (c) 2017 Ian Bradley Morgan and Anthony Skjellum. All
2020
* rights reserved.
@@ -328,21 +328,21 @@ ompi_coll_libnbc_progress(void)
328328
/* done, remove and complete */
329329
OPAL_THREAD_LOCK(&mca_coll_libnbc_component.lock);
330330
opal_list_remove_item(&mca_coll_libnbc_component.active_requests,
331-
&request->super.super.super);
331+
&request->super.super.super.super);
332332
OPAL_THREAD_UNLOCK(&mca_coll_libnbc_component.lock);
333333

334334
if( OMPI_SUCCESS == res || NBC_OK == res || NBC_SUCCESS == res ) {
335-
request->super.req_status.MPI_ERROR = OMPI_SUCCESS;
335+
request->super.super.req_status.MPI_ERROR = OMPI_SUCCESS;
336336
}
337337
else {
338-
request->super.req_status.MPI_ERROR = res;
338+
request->super.super.req_status.MPI_ERROR = res;
339339
}
340-
if(request->super.req_persistent) {
340+
if(request->super.super.req_persistent) {
341341
/* reset for the next communication */
342342
request->row_offset = 0;
343343
}
344-
if(!request->super.req_persistent || !REQUEST_COMPLETE(&request->super)) {
345-
ompi_request_complete(&request->super, true);
344+
if(!request->super.super.req_persistent || !REQUEST_COMPLETE(&request->super.super)) {
345+
ompi_request_complete(&request->super.super, true);
346346
}
347347
}
348348
OPAL_THREAD_LOCK(&mca_coll_libnbc_component.lock);
@@ -407,7 +407,7 @@ request_start(size_t count, ompi_request_t ** requests)
407407
NBC_DEBUG(5, "tmpbuf address=%p size=%u\n", handle->tmpbuf, sizeof(handle->tmpbuf));
408408
NBC_DEBUG(5, "--------------------------------\n");
409409

410-
handle->super.req_complete = REQUEST_PENDING;
410+
handle->super.super.req_complete = REQUEST_PENDING;
411411
handle->nbc_complete = false;
412412

413413
res = NBC_Start(handle);
@@ -437,7 +437,7 @@ request_free(struct ompi_request_t **ompi_req)
437437
ompi_coll_libnbc_request_t *request =
438438
(ompi_coll_libnbc_request_t*) *ompi_req;
439439

440-
if( !REQUEST_COMPLETE(&request->super) ) {
440+
if( !REQUEST_COMPLETE(&request->super.super) ) {
441441
return MPI_ERR_REQUEST;
442442
}
443443

@@ -451,11 +451,11 @@ request_free(struct ompi_request_t **ompi_req)
451451
static void
452452
request_construct(ompi_coll_libnbc_request_t *request)
453453
{
454-
request->super.req_type = OMPI_REQUEST_COLL;
455-
request->super.req_status._cancelled = 0;
456-
request->super.req_start = request_start;
457-
request->super.req_free = request_free;
458-
request->super.req_cancel = request_cancel;
454+
request->super.super.req_type = OMPI_REQUEST_COLL;
455+
request->super.super.req_status._cancelled = 0;
456+
request->super.super.req_start = request_start;
457+
request->super.super.req_free = request_free;
458+
request->super.super.req_cancel = request_cancel;
459459
}
460460

461461

0 commit comments

Comments
 (0)