Skip to content

Commit dc4e2ce

Browse files
committed
A better MPI_IN_PLACE alltoallw algorithm.
Provide optimized variant for the homogeneous case. Signed-off-by: George Bosilca <[email protected]>
1 parent 447b289 commit dc4e2ce

File tree

1 file changed

+87
-74
lines changed

1 file changed

+87
-74
lines changed

ompi/mca/coll/basic/coll_basic_alltoallw.c

Lines changed: 87 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
44
* University Research and Technology
55
* Corporation. All rights reserved.
6-
* Copyright (c) 2004-2016 The University of Tennessee and The University
6+
* Copyright (c) 2004-2021 The University of Tennessee and The University
77
* of Tennessee Research Foundation. All rights
88
* reserved.
99
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@@ -36,105 +36,118 @@
3636
#include "ompi/mca/coll/base/coll_tags.h"
3737
#include "ompi/mca/pml/pml.h"
3838

39-
39+
/*
40+
* We want to minimize the amount of temporary memory needed while allowing as many ranks
41+
* to exchange data simultaneously. We use a variation of the ring algorithm, where in a
42+
* single step a process echange the data with both neighbors at distance k (on the left
43+
* and the right on a logical ring topology). With this approach we need to pack the data
44+
* for a single of the two neighbors, as we can then use the original buffer (and datatype
45+
* and count) to send the data to the other.
46+
*/
4047
static int
4148
mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, const int *rdisps,
4249
struct ompi_datatype_t * const *rdtypes,
4350
struct ompi_communicator_t *comm,
4451
mca_coll_base_module_t *module)
4552
{
46-
int i, j, size, rank, err = MPI_SUCCESS;
47-
ompi_request_t *req;
48-
char *save_buffer = NULL;
49-
size_t max_size = 0, packed_size;
53+
int i, size, rank, left, right, err = MPI_SUCCESS;
54+
ompi_request_t *req = MPI_REQUEST_NULL;
55+
char *tmp_buffer = NULL;
56+
size_t max_size = 0, packed_size, msg_size_left, msg_size_right;
5057
opal_convertor_t convertor;
5158

5259
size = ompi_comm_size(comm);
53-
rank = ompi_comm_rank(comm);
54-
55-
/* If only one process, we're done. */
56-
if (1 == size) {
60+
if (1 == size) { /* If only one process, we're done. */
5761
return MPI_SUCCESS;
5862
}
63+
rank = ompi_comm_rank(comm);
5964

60-
/* Find the largest amount of packed send/recv data */
61-
for (i = 0, max_size = 0 ; i < size ; ++i) {
62-
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, i);
63-
64-
packed_size = opal_datatype_compute_remote_size(&rdtypes[i]->super,
65-
ompi_proc->super.proc_convertor->master->remote_sizes);
66-
packed_size *= rcounts[i];
65+
/* Find the largest amount of packed send/recv data among all peers where
66+
* we need to pack before the send.
67+
*/
68+
for (i = 1 ; i <= (size >> 1) ; ++i) {
69+
right = (rank + i) % size;
70+
#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
71+
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, right);
72+
73+
if( OPAL_LIKELY(opal_local_arch == ompi_proc->super.proc_convertor->master->remote_arch)) {
74+
opal_datatype_type_size(&rdtypes[right]->super, &packed_size);
75+
} else {
76+
packed_size = opal_datatype_compute_remote_size(&rdtypes[right]->super,
77+
ompi_proc->super.proc_convertor->master->remote_sizes);
78+
}
79+
#else
80+
opal_datatype_type_size(&rdtypes[right]->super, &packed_size);
81+
#endif /* OPAL_ENABLE_HETEROGENEOUS_SUPPORT */
82+
packed_size *= rcounts[right];
6783
max_size = packed_size > max_size ? packed_size : max_size;
6884
}
6985

7086
/* Allocate a temporary buffer */
71-
save_buffer = calloc (max_size, 1);
72-
if (NULL == save_buffer) {
87+
tmp_buffer = calloc (max_size, 1);
88+
if (NULL == tmp_buffer) {
7389
return OMPI_ERR_OUT_OF_RESOURCE;
7490
}
7591

76-
/* in-place alltoallw slow algorithm (but works) */
77-
for (i = 0 ; i < size ; ++i) {
78-
size_t msg_size_i;
79-
ompi_datatype_type_size(rdtypes[i], &msg_size_i);
80-
msg_size_i *= rcounts[i];
81-
for (j = i+1 ; j < size ; ++j) {
82-
size_t msg_size_j;
83-
struct iovec iov = {.iov_base = save_buffer, .iov_len = max_size};
84-
uint32_t iov_count = 1;
85-
ompi_datatype_type_size(rdtypes[j], &msg_size_j);
86-
msg_size_j *= rcounts[j];
87-
88-
/* Initiate all send/recv to/from others. */
89-
if (i == rank && msg_size_j != 0) {
90-
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, j);
91-
opal_convertor_clone(&convertor, ompi_proc->super.proc_convertor, 0);
92-
opal_convertor_prepare_for_send(&convertor, &rdtypes[j]->super, rcounts[j],
93-
(char *) rbuf + rdisps[j]);
94-
packed_size = max_size;
95-
err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size);
96-
if (1 != err) { goto error_hndl; }
97-
98-
/* Exchange data with the peer */
99-
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[j], rcounts[j], rdtypes[j],
100-
j, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
101-
if (MPI_SUCCESS != err) { goto error_hndl; }
102-
103-
err = MCA_PML_CALL(send ((void *) save_buffer, packed_size, MPI_PACKED,
104-
j, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
105-
comm));
106-
if (MPI_SUCCESS != err) { goto error_hndl; }
107-
} else if (j == rank && msg_size_i != 0) {
108-
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, i);
109-
opal_convertor_clone(&convertor, ompi_proc->super.proc_convertor, 0);
110-
opal_convertor_prepare_for_send(&convertor, &rdtypes[i]->super, rcounts[i],
111-
(char *) rbuf + rdisps[i]);
112-
packed_size = max_size;
113-
err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size);
114-
if (1 != err) { goto error_hndl; }
115-
116-
/* Exchange data with the peer */
117-
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[i], rcounts[i], rdtypes[i],
118-
i, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
119-
if (MPI_SUCCESS != err) { goto error_hndl; }
120-
121-
err = MCA_PML_CALL(send ((void *) save_buffer, packed_size, MPI_PACKED,
122-
i, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
123-
comm));
124-
if (MPI_SUCCESS != err) { goto error_hndl; }
125-
} else {
126-
continue;
127-
}
128-
129-
/* Wait for the requests to complete */
92+
for (i = 1 ; i <= (size >> 1) ; ++i) {
93+
struct iovec iov = {.iov_base = tmp_buffer, .iov_len = max_size};
94+
uint32_t iov_count = 1;
95+
96+
right = (rank + i) % size;
97+
left = (rank + size - i) % size;
98+
99+
ompi_datatype_type_size(rdtypes[right], &msg_size_right);
100+
msg_size_right *= rcounts[right];
101+
102+
ompi_datatype_type_size(rdtypes[left], &msg_size_left);
103+
msg_size_left *= rcounts[left];
104+
105+
if( 0 != msg_size_right ) { /* nothing to exchange with the peer on the right */
106+
ompi_proc_t *right_proc = ompi_comm_peer_lookup(comm, right);
107+
opal_convertor_clone(right_proc->super.proc_convertor, &convertor, 0);
108+
opal_convertor_prepare_for_send(&convertor, &rdtypes[right]->super, rcounts[right],
109+
(char *) rbuf + rdisps[right]);
110+
packed_size = max_size;
111+
err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size);
112+
if (1 != err) { goto error_hndl; }
113+
114+
/* Receive data from the right */
115+
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[right], rcounts[right], rdtypes[right],
116+
right, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
117+
if (MPI_SUCCESS != err) { goto error_hndl; }
118+
}
119+
120+
if( (left != right) && (0 != msg_size_left) ) {
121+
/* Send data to the left */
122+
err = MCA_PML_CALL(send ((char *) rbuf + rdisps[left], rcounts[left], rdtypes[left],
123+
left, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
124+
comm));
125+
if (MPI_SUCCESS != err) { goto error_hndl; }
126+
127+
err = ompi_request_wait (&req, MPI_STATUSES_IGNORE);
128+
if (MPI_SUCCESS != err) { goto error_hndl; }
129+
130+
/* Receive data from the left */
131+
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[left], rcounts[left], rdtypes[left],
132+
left, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
133+
if (MPI_SUCCESS != err) { goto error_hndl; }
134+
}
135+
136+
if( 0 != msg_size_right ) { /* nothing to exchange with the peer on the right */
137+
/* Send data to the right */
138+
err = MCA_PML_CALL(send ((char *) tmp_buffer, packed_size, MPI_PACKED,
139+
right, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
140+
comm));
141+
if (MPI_SUCCESS != err) { goto error_hndl; }
142+
130143
err = ompi_request_wait (&req, MPI_STATUSES_IGNORE);
131144
if (MPI_SUCCESS != err) { goto error_hndl; }
132145
}
133146
}
134147

135148
error_hndl:
136149
/* Free the temporary buffer */
137-
free (save_buffer);
150+
free (tmp_buffer);
138151

139152
/* All done */
140153

0 commit comments

Comments
 (0)