Skip to content

Commit 88a560f

Browse files
authored
Merge pull request #5744 from mkurnosov/coll-iscan-recursivedoubling
coll/libnbc: add recursive doubling algorithm for MPI_Iscan
2 parents dfa8d3a + 3d43ff0 commit 88a560f

File tree

3 files changed

+225
-65
lines changed

3 files changed

+225
-65
lines changed

ompi/mca/coll/libnbc/coll_libnbc.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ BEGIN_C_DECLS
7070
#define NBC_NUM_COLL 17
7171

7272
extern bool libnbc_ibcast_skip_dt_decision;
73+
extern int libnbc_iscan_algorithm;
7374

7475
struct ompi_coll_libnbc_component_t {
7576
mca_coll_base_component_2_0_0_t super;

ompi/mca/coll/libnbc/coll_libnbc_component.c

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ static int libnbc_priority = 10;
4646
static bool libnbc_in_progress = false; /* protect from recursive calls */
4747
bool libnbc_ibcast_skip_dt_decision = true;
4848

49+
int libnbc_iscan_algorithm = 0; /* iscan user forced algorithm */
50+
static mca_base_var_enum_value_t iscan_algorithms[] = {
51+
{0, "ignore"},
52+
{1, "linear"},
53+
{2, "recursive_doubling"},
54+
{0, NULL}
55+
};
4956

5057
static int libnbc_open(void);
5158
static int libnbc_close(void);
@@ -128,6 +135,8 @@ libnbc_close(void)
128135
static int
129136
libnbc_register(void)
130137
{
138+
mca_base_var_enum_t *new_enum = NULL;
139+
131140
/* Use a low priority, but allow other components to be lower */
132141
libnbc_priority = 10;
133142
(void) mca_base_component_var_register(&mca_coll_libnbc_component.super.collm_version,
@@ -158,6 +167,16 @@ libnbc_register(void)
158167
MCA_BASE_VAR_SCOPE_READONLY,
159168
&libnbc_ibcast_skip_dt_decision);
160169

170+
libnbc_iscan_algorithm = 0;
171+
(void) mca_base_var_enum_create("coll_libnbc_iscan_algorithms", iscan_algorithms, &new_enum);
172+
mca_base_component_var_register(&mca_coll_libnbc_component.super.collm_version,
173+
"iscan_algorithm",
174+
"Which iscan algorithm is used: 0 ignore, 1 linear, 2 recursive_doubling",
175+
MCA_BASE_VAR_TYPE_INT, new_enum, 0, MCA_BASE_VAR_FLAG_SETTABLE,
176+
OPAL_INFO_LVL_5, MCA_BASE_VAR_SCOPE_ALL,
177+
&libnbc_iscan_algorithm);
178+
OBJ_RELEASE(new_enum);
179+
161180
return OMPI_SUCCESS;
162181
}
163182

ompi/mca/coll/libnbc/nbc_iscan.c

Lines changed: 205 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,20 @@
1818
* Author(s): Torsten Hoefler <[email protected]>
1919
*
2020
*/
21+
#include "opal/include/opal/align.h"
22+
#include "ompi/op/op.h"
23+
2124
#include "nbc_internal.h"
2225

26+
static inline int scan_sched_linear(
27+
int rank, int comm_size, const void *sendbuf, void *recvbuf, int count,
28+
MPI_Datatype datatype, MPI_Op op, char inplace, NBC_Schedule *schedule,
29+
void *tmpbuf);
30+
static inline int scan_sched_recursivedoubling(
31+
int rank, int comm_size, const void *sendbuf, void *recvbuf,
32+
int count, MPI_Datatype datatype, MPI_Op op, char inplace,
33+
NBC_Schedule *schedule, void *tmpbuf1, void *tmpbuf2);
34+
2335
#ifdef NBC_CACHE_SCHEDULE
2436
/* tree comparison function for schedule cache */
2537
int NBC_Scan_args_compare(NBC_Scan_args *a, NBC_Scan_args *b, void *param) {
@@ -39,27 +51,41 @@ int NBC_Scan_args_compare(NBC_Scan_args *a, NBC_Scan_args *b, void *param) {
3951
}
4052
#endif
4153

42-
/* linear iscan
43-
* working principle:
44-
* 1. each node (but node 0) receives from left neighbor
45-
* 2. performs op
46-
* 3. all but rank p-1 do sends to it's right neighbor and exits
47-
*
48-
*/
4954
static int nbc_scan_init(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op,
5055
struct ompi_communicator_t *comm, ompi_request_t ** request,
5156
struct mca_coll_base_module_2_3_0_t *module, bool persistent) {
52-
int rank, p, res;
53-
ptrdiff_t gap, span;
54-
NBC_Schedule *schedule;
55-
void *tmpbuf = NULL;
56-
char inplace;
57-
ompi_coll_libnbc_module_t *libnbc_module = (ompi_coll_libnbc_module_t*) module;
57+
int rank, p, res;
58+
ptrdiff_t gap, span;
59+
NBC_Schedule *schedule;
60+
void *tmpbuf = NULL, *tmpbuf1 = NULL, *tmpbuf2 = NULL;
61+
enum { NBC_SCAN_LINEAR, NBC_SCAN_RDBL } alg;
62+
char inplace;
63+
ompi_coll_libnbc_module_t *libnbc_module = (ompi_coll_libnbc_module_t*) module;
64+
65+
NBC_IN_PLACE(sendbuf, recvbuf, inplace);
5866

59-
NBC_IN_PLACE(sendbuf, recvbuf, inplace);
67+
rank = ompi_comm_rank (comm);
68+
p = ompi_comm_size (comm);
6069

61-
rank = ompi_comm_rank (comm);
62-
p = ompi_comm_size (comm);
70+
if (count == 0) {
71+
return nbc_get_noop_request(persistent, request);
72+
}
73+
74+
span = opal_datatype_span(&datatype->super, count, &gap);
75+
if (libnbc_iscan_algorithm == 2) {
76+
alg = NBC_SCAN_RDBL;
77+
ptrdiff_t span_align = OPAL_ALIGN(span, datatype->super.align, ptrdiff_t);
78+
tmpbuf = malloc(span_align + span);
79+
if (NULL == tmpbuf) { return OMPI_ERR_OUT_OF_RESOURCE; }
80+
tmpbuf1 = (void *)(-gap);
81+
tmpbuf2 = (char *)(span_align) - gap;
82+
} else {
83+
alg = NBC_SCAN_LINEAR;
84+
if (rank > 0) {
85+
tmpbuf = malloc(span);
86+
if (NULL == tmpbuf) { return OMPI_ERR_OUT_OF_RESOURCE; }
87+
}
88+
}
6389

6490
#ifdef NBC_CACHE_SCHEDULE
6591
NBC_Scan_args *args, *found, search;
@@ -75,60 +101,28 @@ static int nbc_scan_init(const void* sendbuf, void* recvbuf, int count, MPI_Data
75101
#endif
76102
schedule = OBJ_NEW(NBC_Schedule);
77103
if (OPAL_UNLIKELY(NULL == schedule)) {
78-
return OMPI_ERR_OUT_OF_RESOURCE;
104+
free(tmpbuf);
105+
return OMPI_ERR_OUT_OF_RESOURCE;
79106
}
80107

81-
if (!inplace) {
82-
/* copy data to receivebuf */
83-
res = NBC_Sched_copy ((void *)sendbuf, false, count, datatype,
84-
recvbuf, false, count, datatype, schedule, false);
85-
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
86-
OBJ_RELEASE(schedule);
87-
return res;
88-
}
108+
if (alg == NBC_SCAN_LINEAR) {
109+
res = scan_sched_linear(rank, p, sendbuf, recvbuf, count, datatype,
110+
op, inplace, schedule, tmpbuf);
111+
} else {
112+
res = scan_sched_recursivedoubling(rank, p, sendbuf, recvbuf, count,
113+
datatype, op, inplace, schedule, tmpbuf1, tmpbuf2);
89114
}
90-
91-
if(rank != 0) {
92-
span = opal_datatype_span(&datatype->super, count, &gap);
93-
tmpbuf = malloc (span);
94-
if (NULL == tmpbuf) {
95-
OBJ_RELEASE(schedule);
96-
return OMPI_ERR_OUT_OF_RESOURCE;
97-
}
98-
99-
/* we have to wait until we have the data */
100-
res = NBC_Sched_recv ((void *)(-gap), true, count, datatype, rank-1, schedule, true);
101-
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
102-
OBJ_RELEASE(schedule);
103-
free(tmpbuf);
104-
return res;
105-
}
106-
107-
/* perform the reduce in my local buffer */
108-
/* this cannot be done until tmpbuf is unused :-( so barrier after the op */
109-
res = NBC_Sched_op ((void *)(-gap), true, recvbuf, false, count, datatype, op, schedule,
110-
true);
111-
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
115+
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
112116
OBJ_RELEASE(schedule);
113117
free(tmpbuf);
114118
return res;
115-
}
116119
}
117120

118-
if (rank != p-1) {
119-
res = NBC_Sched_send (recvbuf, false, count, datatype, rank+1, schedule, false);
120-
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
121+
res = NBC_Sched_commit(schedule);
122+
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
121123
OBJ_RELEASE(schedule);
122124
free(tmpbuf);
123125
return res;
124-
}
125-
}
126-
127-
res = NBC_Sched_commit (schedule);
128-
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
129-
OBJ_RELEASE(schedule);
130-
free(tmpbuf);
131-
return res;
132126
}
133127

134128
#ifdef NBC_CACHE_SCHEDULE
@@ -162,14 +156,160 @@ static int nbc_scan_init(const void* sendbuf, void* recvbuf, int count, MPI_Data
162156
}
163157
#endif
164158

165-
res = NBC_Schedule_request(schedule, comm, libnbc_module, persistent, request, tmpbuf);
166-
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
167-
OBJ_RELEASE(schedule);
168-
free(tmpbuf);
159+
res = NBC_Schedule_request(schedule, comm, libnbc_module, persistent, request, tmpbuf);
160+
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) {
161+
OBJ_RELEASE(schedule);
162+
free(tmpbuf);
163+
return res;
164+
}
165+
166+
return OMPI_SUCCESS;
167+
}
168+
169+
/*
170+
* scan_sched_linear:
171+
*
172+
* Function: Linear algorithm for inclusive scan.
173+
* Accepts: Same as MPI_Iscan
174+
* Returns: MPI_SUCCESS or error code
175+
*
176+
* Working principle:
177+
* 1. Each process (but process 0) receives from left neighbor
178+
* 2. Performs op
179+
* 3. All but rank p-1 do sends to it's right neighbor and exits
180+
*
181+
* Schedule length: O(1)
182+
*/
183+
static inline int scan_sched_linear(
184+
int rank, int comm_size, const void *sendbuf, void *recvbuf, int count,
185+
MPI_Datatype datatype, MPI_Op op, char inplace, NBC_Schedule *schedule,
186+
void *tmpbuf)
187+
{
188+
int res = OMPI_SUCCESS;
189+
190+
if (!inplace) {
191+
/* Copy data to recvbuf */
192+
res = NBC_Sched_copy((void *)sendbuf, false, count, datatype,
193+
recvbuf, false, count, datatype, schedule, false);
194+
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; }
195+
}
196+
197+
if (rank > 0) {
198+
ptrdiff_t gap;
199+
opal_datatype_span(&datatype->super, count, &gap);
200+
/* We have to wait until we have the data */
201+
res = NBC_Sched_recv((void *)(-gap), true, count, datatype, rank - 1, schedule, true);
202+
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; }
203+
204+
/* Perform the reduce in my local buffer */
205+
/* this cannot be done until tmpbuf is unused :-( so barrier after the op */
206+
res = NBC_Sched_op((void *)(-gap), true, recvbuf, false, count, datatype, op, schedule,
207+
true);
208+
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; }
209+
}
210+
211+
if (rank != comm_size - 1) {
212+
res = NBC_Sched_send(recvbuf, false, count, datatype, rank + 1, schedule, false);
213+
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; }
214+
}
215+
216+
cleanup_and_return:
169217
return res;
170-
}
218+
}
171219

172-
return OMPI_SUCCESS;
220+
/*
221+
* scan_sched_recursivedoubling:
222+
*
223+
* Function: Recursive doubling algorithm for inclusive scan.
224+
* Accepts: Same as MPI_Iscan
225+
* Returns: MPI_SUCCESS or error code
226+
*
227+
* Description: Implements recursive doubling algorithm for MPI_Iscan.
228+
* The algorithm preserves order of operations so it can
229+
* be used both by commutative and non-commutative operations.
230+
*
231+
* Example for 5 processes and commutative operation MPI_SUM:
232+
* Process: 0 1 2 3 4
233+
* recvbuf: [0] [1] [2] [3] [4]
234+
* psend: [0] [1] [2] [3] [4]
235+
*
236+
* Step 1:
237+
* recvbuf: [0] [0+1] [2] [2+3] [4]
238+
* psend: [1+0] [0+1] [3+2] [2+3] [4]
239+
*
240+
* Step 2:
241+
* recvbuf: [0] [0+1] [(1+0)+2] [(1+0)+(2+3)] [4]
242+
* psend: [(3+2)+(1+0)] [(2+3)+(0+1)] [(1+0)+(3+2)] [(1+0)+(2+3)] [4]
243+
*
244+
* Step 3:
245+
* recvbuf: [0] [0+1] [(1+0)+2] [(1+0)+(2+3)] [((3+2)+(1+0))+4]
246+
* psend: [4+((3+2)+(1+0))] [((3+2)+(1+0))+4]
247+
*
248+
* Time complexity (worst case): \ceil(\log_2(p))(2\alpha + 2m\beta + 2m\gamma)
249+
* Memory requirements (per process): 2 * count * typesize = O(count)
250+
* Limitations: intra-communicators only
251+
* Schedule length: O(log(p))
252+
*/
253+
static inline int scan_sched_recursivedoubling(
254+
int rank, int comm_size, const void *sendbuf, void *recvbuf, int count,
255+
MPI_Datatype datatype, MPI_Op op, char inplace,
256+
NBC_Schedule *schedule, void *tmpbuf1, void *tmpbuf2)
257+
{
258+
int res = OMPI_SUCCESS;
259+
260+
if (!inplace) {
261+
res = NBC_Sched_copy((void *)sendbuf, false, count, datatype,
262+
recvbuf, false, count, datatype, schedule, true);
263+
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; }
264+
}
265+
if (comm_size < 2)
266+
goto cleanup_and_return;
267+
268+
char *psend = (char *)tmpbuf1;
269+
char *precv = (char *)tmpbuf2;
270+
res = NBC_Sched_copy(recvbuf, false, count, datatype,
271+
psend, true, count, datatype, schedule, true);
272+
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; }
273+
274+
int is_commute = ompi_op_is_commute(op);
275+
for (int mask = 1; mask < comm_size; mask <<= 1) {
276+
int remote = rank ^ mask;
277+
if (remote < comm_size) {
278+
res = NBC_Sched_send(psend, true, count, datatype, remote, schedule, false);
279+
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; }
280+
res = NBC_Sched_recv(precv, true, count, datatype, remote, schedule, true);
281+
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; }
282+
283+
if (rank > remote) {
284+
/* Accumulate prefix reduction: recvbuf = precv <op> recvbuf */
285+
res = NBC_Sched_op(precv, true, recvbuf, false, count,
286+
datatype, op, schedule, false);
287+
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; }
288+
/* Partial result: psend = precv <op> psend */
289+
res = NBC_Sched_op(precv, true, psend, true, count,
290+
datatype, op, schedule, true);
291+
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; }
292+
} else {
293+
if (is_commute) {
294+
/* psend = precv <op> psend */
295+
res = NBC_Sched_op(precv, true, psend, true, count,
296+
datatype, op, schedule, true);
297+
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; }
298+
} else {
299+
/* precv = psend <op> precv */
300+
res = NBC_Sched_op(psend, true, precv, true, count,
301+
datatype, op, schedule, true);
302+
if (OPAL_UNLIKELY(OMPI_SUCCESS != res)) { goto cleanup_and_return; }
303+
char *tmp = psend;
304+
psend = precv;
305+
precv = tmp;
306+
}
307+
}
308+
}
309+
}
310+
311+
cleanup_and_return:
312+
return res;
173313
}
174314

175315
int ompi_coll_libnbc_iscan(const void* sendbuf, void* recvbuf, int count, MPI_Datatype datatype, MPI_Op op,

0 commit comments

Comments
 (0)