Skip to content

Commit b9012a3

Browse files
committed
Better INPLACE algorithm for all2all and all2allv
Signed-off-by: George Bosilca <[email protected]>
1 parent dc4e2ce commit b9012a3

File tree

2 files changed

+176
-116
lines changed

2 files changed

+176
-116
lines changed

ompi/mca/coll/base/coll_base_alltoall.c

+83-59
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
44
* University Research and Technology
55
* Corporation. All rights reserved.
6-
* Copyright (c) 2004-2017 The University of Tennessee and The University
6+
* Copyright (c) 2004-2021 The University of Tennessee and The University
77
* of Tennessee Research Foundation. All rights
88
* reserved.
99
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@@ -35,88 +35,112 @@
3535
#include "coll_base_topo.h"
3636
#include "coll_base_util.h"
3737

38-
/* MPI_IN_PLACE all to all algorithm. TODO: implement a better one. */
38+
/*
39+
* We want to minimize the amount of temporary memory needed while allowing as many ranks
40+
* to exchange data simultaneously. We use a variation of the ring algorithm, where in a
41+
* single step a process echange the data with both neighbors at distance k (on the left
42+
* and the right on a logical ring topology). With this approach we need to pack the data
43+
* for a single of the two neighbors, as we can then use the original buffer (and datatype
44+
* and count) to send the data to the other.
45+
*/
3946
int
4047
mca_coll_base_alltoall_intra_basic_inplace(const void *rbuf, int rcount,
4148
struct ompi_datatype_t *rdtype,
4249
struct ompi_communicator_t *comm,
4350
mca_coll_base_module_t *module)
4451
{
45-
int i, j, size, rank, err = MPI_SUCCESS, line;
46-
ptrdiff_t ext, gap = 0;
52+
int i, size, rank, left, right, err = MPI_SUCCESS, line;
53+
ptrdiff_t extent;
4754
ompi_request_t *req;
48-
char *allocated_buffer = NULL, *tmp_buffer;
49-
size_t max_size;
55+
char *tmp_buffer;
56+
size_t packed_size = 0, max_size;
57+
opal_convertor_t convertor;
5058

5159
/* Initialize. */
5260

5361
size = ompi_comm_size(comm);
5462
rank = ompi_comm_rank(comm);
5563

56-
/* If only one process, we're done. */
57-
if (1 == size) {
64+
ompi_datatype_type_size(rdtype, &max_size);
65+
66+
/* Easy way out */
67+
if ((1 == size) || (0 == rcount) || (0 == max_size) ) {
5868
return MPI_SUCCESS;
5969
}
6070

61-
/* Find the largest receive amount */
62-
ompi_datatype_type_extent (rdtype, &ext);
63-
max_size = opal_datatype_span(&rdtype->super, rcount, &gap);
71+
/* Find the largest amount of packed send/recv data among all peers where
72+
* we need to pack before the send.
73+
*/
74+
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
75+
for (i = 1 ; i <= (size >> 1) ; ++i) {
76+
right = (rank + i) % size;
77+
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, right);
78+
79+
if( OPAL_UNLIKELY(opal_local_arch != ompi_proc->super.proc_convertor->master->remote_arch)) {
80+
packed_size = opal_datatype_compute_remote_size(&rdtype->super,
81+
ompi_proc->super.proc_convertor->master->remote_sizes);
82+
max_size = packed_size > max_size ? packed_size : max_size;
83+
}
84+
}
85+
#endif /* OPAL_ENABLE_HETEROGENEOUS_SUPPORT */
86+
max_size *= rcount;
6487

65-
/* Initiate all send/recv to/from others. */
88+
ompi_datatype_type_extent(rdtype, &extent);
6689

6790
/* Allocate a temporary buffer */
68-
allocated_buffer = calloc (max_size, 1);
69-
if( NULL == allocated_buffer) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto error_hndl; }
70-
tmp_buffer = allocated_buffer - gap;
71-
max_size = ext * rcount;
72-
73-
/* in-place alltoall slow algorithm (but works) */
74-
for (i = 0 ; i < size ; ++i) {
75-
for (j = i+1 ; j < size ; ++j) {
76-
if (i == rank) {
77-
/* Copy the data into the temporary buffer */
78-
err = ompi_datatype_copy_content_same_ddt (rdtype, rcount, tmp_buffer,
79-
(char *) rbuf + j * max_size);
80-
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
81-
82-
/* Exchange data with the peer */
83-
err = MCA_PML_CALL(irecv ((char *) rbuf + max_size * j, rcount, rdtype,
84-
j, MCA_COLL_BASE_TAG_ALLTOALL, comm, &req));
85-
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
86-
87-
err = MCA_PML_CALL(send ((char *) tmp_buffer, rcount, rdtype,
88-
j, MCA_COLL_BASE_TAG_ALLTOALL, MCA_PML_BASE_SEND_STANDARD,
89-
comm));
90-
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
91-
} else if (j == rank) {
92-
/* Copy the data into the temporary buffer */
93-
err = ompi_datatype_copy_content_same_ddt (rdtype, rcount, tmp_buffer,
94-
(char *) rbuf + i * max_size);
95-
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
96-
97-
/* Exchange data with the peer */
98-
err = MCA_PML_CALL(irecv ((char *) rbuf + max_size * i, rcount, rdtype,
99-
i, MCA_COLL_BASE_TAG_ALLTOALL, comm, &req));
100-
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
101-
102-
err = MCA_PML_CALL(send ((char *) tmp_buffer, rcount, rdtype,
103-
i, MCA_COLL_BASE_TAG_ALLTOALL, MCA_PML_BASE_SEND_STANDARD,
104-
comm));
105-
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
106-
} else {
107-
continue;
108-
}
109-
110-
/* Wait for the requests to complete */
111-
err = ompi_request_wait ( &req, MPI_STATUSES_IGNORE);
112-
if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; }
91+
tmp_buffer = calloc (max_size, 1);
92+
if( NULL == tmp_buffer) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto error_hndl; }
93+
94+
for (i = 1 ; i <= (size >> 1) ; ++i) {
95+
struct iovec iov = {.iov_base = tmp_buffer, .iov_len = max_size};
96+
uint32_t iov_count = 1;
97+
98+
right = (rank + i) % size;
99+
left = (rank + size - i) % size;
100+
101+
ompi_proc_t *right_proc = ompi_comm_peer_lookup(comm, right);
102+
opal_convertor_clone(right_proc->super.proc_convertor, &convertor, 0);
103+
opal_convertor_prepare_for_send(&convertor, &rdtype->super, rcount,
104+
(char *) rbuf + right * extent);
105+
packed_size = max_size;
106+
err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size);
107+
if (1 != err) { goto error_hndl; }
108+
109+
/* Receive data from the right */
110+
err = MCA_PML_CALL(irecv ((char *) rbuf + right * extent, rcount, rdtype,
111+
right, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
112+
if (MPI_SUCCESS != err) { goto error_hndl; }
113+
114+
if( left != right ) {
115+
/* Send data to the left */
116+
err = MCA_PML_CALL(send ((char *) rbuf + left * extent, rcount, rdtype,
117+
left, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
118+
comm));
119+
if (MPI_SUCCESS != err) { goto error_hndl; }
120+
121+
err = ompi_request_wait (&req, MPI_STATUSES_IGNORE);
122+
if (MPI_SUCCESS != err) { goto error_hndl; }
123+
124+
/* Receive data from the left */
125+
err = MCA_PML_CALL(irecv ((char *) rbuf + left * extent, rcount, rdtype,
126+
left, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
127+
if (MPI_SUCCESS != err) { goto error_hndl; }
113128
}
129+
130+
/* Send data to the right */
131+
err = MCA_PML_CALL(send ((char *) tmp_buffer, packed_size, MPI_PACKED,
132+
right, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
133+
comm));
134+
if (MPI_SUCCESS != err) { goto error_hndl; }
135+
136+
err = ompi_request_wait (&req, MPI_STATUSES_IGNORE);
137+
if (MPI_SUCCESS != err) { goto error_hndl; }
114138
}
115139

116140
error_hndl:
117141
/* Free the temporary buffer */
118-
if( NULL != allocated_buffer )
119-
free (allocated_buffer);
142+
if( NULL != tmp_buffer )
143+
free (tmp_buffer);
120144

121145
if( MPI_SUCCESS != err ) {
122146
OPAL_OUTPUT((ompi_coll_base_framework.framework_output,

ompi/mca/coll/base/coll_base_alltoallv.c

+93-57
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
44
* University Research and Technology
55
* Corporation. All rights reserved.
6-
* Copyright (c) 2004-2017 The University of Tennessee and The University
6+
* Copyright (c) 2004-2021 The University of Tennessee and The University
77
* of Tennessee Research Foundation. All rights
88
* reserved.
99
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@@ -37,85 +37,121 @@
3737
#include "coll_base_topo.h"
3838
#include "coll_base_util.h"
3939

40+
/*
41+
* We want to minimize the amount of temporary memory needed while allowing as many ranks
42+
* to exchange data simultaneously. We use a variation of the ring algorithm, where in a
43+
* single step a process echange the data with both neighbors at distance k (on the left
44+
* and the right on a logical ring topology). With this approach we need to pack the data
45+
* for a single of the two neighbors, as we can then use the original buffer (and datatype
46+
* and count) to send the data to the other.
47+
*/
4048
int
4149
mca_coll_base_alltoallv_intra_basic_inplace(const void *rbuf, const int *rcounts, const int *rdisps,
4250
struct ompi_datatype_t *rdtype,
4351
struct ompi_communicator_t *comm,
4452
mca_coll_base_module_t *module)
4553
{
46-
int i, j, size, rank, err=MPI_SUCCESS;
47-
char *allocated_buffer, *tmp_buffer;
48-
size_t max_size;
49-
ptrdiff_t ext, gap = 0;
54+
int i, size, rank, left, right, err = MPI_SUCCESS, line;
55+
ompi_request_t *req;
56+
char *tmp_buffer;
57+
size_t packed_size = 0, max_size;
58+
opal_convertor_t convertor;
5059

5160
/* Initialize. */
5261

5362
size = ompi_comm_size(comm);
5463
rank = ompi_comm_rank(comm);
5564

56-
/* If only one process, we're done. */
57-
if (1 == size) {
65+
ompi_datatype_type_size(rdtype, &max_size);
66+
max_size *= rcounts[rank];
67+
68+
/* Easy way out */
69+
if ((1 == size) || (0 == max_size) ) {
5870
return MPI_SUCCESS;
5971
}
60-
/* Find the largest receive amount */
61-
ompi_datatype_type_extent (rdtype, &ext);
62-
for (i = 0, max_size = 0 ; i < size ; ++i) {
63-
if (i == rank) {
64-
continue;
65-
}
66-
size_t cur_size = opal_datatype_span(&rdtype->super, rcounts[i], &gap);
67-
max_size = cur_size > max_size ? cur_size : max_size;
68-
}
69-
/* The gap will always be the same as we are working on the same datatype */
7072

71-
if (OPAL_UNLIKELY(0 == max_size)) {
72-
return MPI_SUCCESS;
73+
/* Find the largest amount of packed send/recv data among all peers where
74+
* we need to pack before the send.
75+
*/
76+
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
77+
for (i = 1 ; i <= (size >> 1) ; ++i) {
78+
right = (rank + i) % size;
79+
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, right);
80+
81+
if( OPAL_UNLIKELY(opal_local_arch != ompi_proc->super.proc_convertor->master->remote_arch)) {
82+
packed_size = opal_datatype_compute_remote_size(&rdtype->super,
83+
ompi_proc->super.proc_convertor->master->remote_sizes);
84+
packed_size *= rcounts[right];
85+
max_size = packed_size > max_size ? packed_size : max_size;
86+
}
7387
}
88+
#endif /* OPAL_ENABLE_HETEROGENEOUS_SUPPORT */
7489

7590
/* Allocate a temporary buffer */
76-
allocated_buffer = calloc (max_size, 1);
77-
if (NULL == allocated_buffer) {
78-
return OMPI_ERR_OUT_OF_RESOURCE;
79-
}
80-
tmp_buffer = allocated_buffer - gap;
81-
82-
/* Initiate all send/recv to/from others. */
83-
/* in-place alltoallv slow algorithm (but works) */
84-
for (i = 0 ; i < size ; ++i) {
85-
for (j = i+1 ; j < size ; ++j) {
86-
if (i == rank && 0 != rcounts[j]) {
87-
/* Copy the data into the temporary buffer */
88-
err = ompi_datatype_copy_content_same_ddt (rdtype, rcounts[j],
89-
tmp_buffer, (char *) rbuf + rdisps[j] * ext);
90-
if (MPI_SUCCESS != err) { goto error_hndl; }
91-
92-
/* Exchange data with the peer */
93-
err = ompi_coll_base_sendrecv_actual((void *) tmp_buffer, rcounts[j], rdtype,
94-
j, MCA_COLL_BASE_TAG_ALLTOALLV,
95-
(char *)rbuf + rdisps[j] * ext, rcounts[j], rdtype,
96-
j, MCA_COLL_BASE_TAG_ALLTOALLV,
97-
comm, MPI_STATUS_IGNORE);
98-
if (MPI_SUCCESS != err) { goto error_hndl; }
99-
} else if (j == rank && 0 != rcounts[i]) {
100-
/* Copy the data into the temporary buffer */
101-
err = ompi_datatype_copy_content_same_ddt (rdtype, rcounts[i],
102-
tmp_buffer, (char *) rbuf + rdisps[i] * ext);
103-
if (MPI_SUCCESS != err) { goto error_hndl; }
104-
105-
/* Exchange data with the peer */
106-
err = ompi_coll_base_sendrecv_actual((void *) tmp_buffer, rcounts[i], rdtype,
107-
i, MCA_COLL_BASE_TAG_ALLTOALLV,
108-
(char *) rbuf + rdisps[i] * ext, rcounts[i], rdtype,
109-
i, MCA_COLL_BASE_TAG_ALLTOALLV,
110-
comm, MPI_STATUS_IGNORE);
111-
if (MPI_SUCCESS != err) { goto error_hndl; }
112-
}
91+
tmp_buffer = calloc (max_size, 1);
92+
if( NULL == tmp_buffer) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto error_hndl; }
93+
94+
for (i = 1 ; i <= (size >> 1) ; ++i) {
95+
struct iovec iov = {.iov_base = tmp_buffer, .iov_len = max_size};
96+
uint32_t iov_count = 1;
97+
98+
right = (rank + i) % size;
99+
left = (rank + size - i) % size;
100+
101+
if( 0 != rcounts[right] ) { /* nothing to exchange with the peer on the right */
102+
ompi_proc_t *right_proc = ompi_comm_peer_lookup(comm, right);
103+
opal_convertor_clone(right_proc->super.proc_convertor, &convertor, 0);
104+
opal_convertor_prepare_for_send(&convertor, &rdtype->super, rcounts[right],
105+
(char *) rbuf + rdisps[right]);
106+
packed_size = max_size;
107+
err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size);
108+
if (1 != err) { goto error_hndl; }
109+
110+
/* Receive data from the right */
111+
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[right], rcounts[right], rdtype,
112+
right, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
113+
if (MPI_SUCCESS != err) { goto error_hndl; }
114+
}
115+
116+
if( (left != right) && (0 != rcounts[left]) ) {
117+
/* Send data to the left */
118+
err = MCA_PML_CALL(send ((char *) rbuf + rdisps[left], rcounts[left], rdtype,
119+
left, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
120+
comm));
121+
if (MPI_SUCCESS != err) { goto error_hndl; }
122+
123+
err = ompi_request_wait (&req, MPI_STATUSES_IGNORE);
124+
if (MPI_SUCCESS != err) { goto error_hndl; }
125+
126+
/* Receive data from the left */
127+
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[left], rcounts[left], rdtype,
128+
left, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
129+
if (MPI_SUCCESS != err) { goto error_hndl; }
130+
}
131+
132+
if( 0 != rcounts[right] ) { /* nothing to exchange with the peer on the right */
133+
/* Send data to the right */
134+
err = MCA_PML_CALL(send ((char *) tmp_buffer, packed_size, MPI_PACKED,
135+
right, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
136+
comm));
137+
if (MPI_SUCCESS != err) { goto error_hndl; }
138+
139+
err = ompi_request_wait (&req, MPI_STATUSES_IGNORE);
140+
if (MPI_SUCCESS != err) { goto error_hndl; }
113141
}
114142
}
115143

116144
error_hndl:
117145
/* Free the temporary buffer */
118-
free (allocated_buffer);
146+
if( NULL != tmp_buffer )
147+
free (tmp_buffer);
148+
149+
if( MPI_SUCCESS != err ) {
150+
OPAL_OUTPUT((ompi_coll_base_framework.framework_output,
151+
"%s:%4d\tError occurred %d, rank %2d", __FILE__, line, err,
152+
rank));
153+
(void)line; // silence compiler warning
154+
}
119155

120156
/* All done */
121157
return err;

0 commit comments

Comments
 (0)