Skip to content

Commit 173d94f

Browse files
committed
A better MPI_IN_PLACE alltoall algorithm.
Signed-off-by: George Bosilca <[email protected]>
1 parent 447b289 commit 173d94f

File tree

1 file changed

+83
-74
lines changed

1 file changed

+83
-74
lines changed

ompi/mca/coll/basic/coll_basic_alltoallw.c

Lines changed: 83 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
44
* University Research and Technology
55
* Corporation. All rights reserved.
6-
* Copyright (c) 2004-2016 The University of Tennessee and The University
6+
* Copyright (c) 2004-2021 The University of Tennessee and The University
77
* of Tennessee Research Foundation. All rights
88
* reserved.
99
* Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
@@ -36,105 +36,114 @@
3636
#include "ompi/mca/coll/base/coll_tags.h"
3737
#include "ompi/mca/pml/pml.h"
3838

39-
39+
/*
40+
* We want to minimize the amount of temporary memory needed while allowing as many ranks
41+
* to exchange data simultaneously. We use a variation of the ring algorithm, where in a
42+
* single step a process echange the data with both neighbors at distance k (on the left
43+
* and the right on a logical ring topology). With this approach we need to pack the data
44+
* for a single of the two neighbors, as we can then use the original buffer (and datatype
45+
* and count) to send the data to the other.
46+
*/
4047
static int
4148
mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, const int *rdisps,
4249
struct ompi_datatype_t * const *rdtypes,
4350
struct ompi_communicator_t *comm,
4451
mca_coll_base_module_t *module)
4552
{
46-
int i, j, size, rank, err = MPI_SUCCESS;
47-
ompi_request_t *req;
48-
char *save_buffer = NULL;
49-
size_t max_size = 0, packed_size;
53+
int i, size, rank, left, right, err = MPI_SUCCESS;
54+
ompi_request_t *req = MPI_REQUEST_NULL;
55+
char *tmp_buffer = NULL;
56+
size_t max_size = 0, packed_size, msg_size_left, msg_size_right;
5057
opal_convertor_t convertor;
5158

5259
size = ompi_comm_size(comm);
53-
rank = ompi_comm_rank(comm);
54-
55-
/* If only one process, we're done. */
56-
if (1 == size) {
60+
if (1 == size) { /* If only one process, we're done. */
5761
return MPI_SUCCESS;
5862
}
63+
rank = ompi_comm_rank(comm);
5964

60-
/* Find the largest amount of packed send/recv data */
61-
for (i = 0, max_size = 0 ; i < size ; ++i) {
62-
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, i);
63-
64-
packed_size = opal_datatype_compute_remote_size(&rdtypes[i]->super,
65-
ompi_proc->super.proc_convertor->master->remote_sizes);
66-
packed_size *= rcounts[i];
65+
/* Find the largest amount of packed send/recv data amoing all peers where
66+
* we need to pack before the send.
67+
*/
68+
for (i = 1 ; i <= (size >> 1) ; ++i) {
69+
right = (rank + i) % size;
70+
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, right);
71+
72+
if( opal_local_arch == ompi_proc->super.proc_convertor->master->remote_arch) {
73+
opal_datatype_type_size(&rdtypes[right]->super, &packed_size);
74+
} else {
75+
packed_size = opal_datatype_compute_remote_size(&rdtypes[right]->super,
76+
ompi_proc->super.proc_convertor->master->remote_sizes);
77+
}
78+
packed_size *= rcounts[right];
6779
max_size = packed_size > max_size ? packed_size : max_size;
6880
}
6981

7082
/* Allocate a temporary buffer */
71-
save_buffer = calloc (max_size, 1);
72-
if (NULL == save_buffer) {
83+
tmp_buffer = calloc (max_size, 1);
84+
if (NULL == tmp_buffer) {
7385
return OMPI_ERR_OUT_OF_RESOURCE;
7486
}
7587

76-
/* in-place alltoallw slow algorithm (but works) */
77-
for (i = 0 ; i < size ; ++i) {
78-
size_t msg_size_i;
79-
ompi_datatype_type_size(rdtypes[i], &msg_size_i);
80-
msg_size_i *= rcounts[i];
81-
for (j = i+1 ; j < size ; ++j) {
82-
size_t msg_size_j;
83-
struct iovec iov = {.iov_base = save_buffer, .iov_len = max_size};
84-
uint32_t iov_count = 1;
85-
ompi_datatype_type_size(rdtypes[j], &msg_size_j);
86-
msg_size_j *= rcounts[j];
87-
88-
/* Initiate all send/recv to/from others. */
89-
if (i == rank && msg_size_j != 0) {
90-
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, j);
91-
opal_convertor_clone(&convertor, ompi_proc->super.proc_convertor, 0);
92-
opal_convertor_prepare_for_send(&convertor, &rdtypes[j]->super, rcounts[j],
93-
(char *) rbuf + rdisps[j]);
94-
packed_size = max_size;
95-
err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size);
96-
if (1 != err) { goto error_hndl; }
97-
98-
/* Exchange data with the peer */
99-
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[j], rcounts[j], rdtypes[j],
100-
j, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
101-
if (MPI_SUCCESS != err) { goto error_hndl; }
102-
103-
err = MCA_PML_CALL(send ((void *) save_buffer, packed_size, MPI_PACKED,
104-
j, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
105-
comm));
106-
if (MPI_SUCCESS != err) { goto error_hndl; }
107-
} else if (j == rank && msg_size_i != 0) {
108-
ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, i);
109-
opal_convertor_clone(&convertor, ompi_proc->super.proc_convertor, 0);
110-
opal_convertor_prepare_for_send(&convertor, &rdtypes[i]->super, rcounts[i],
111-
(char *) rbuf + rdisps[i]);
112-
packed_size = max_size;
113-
err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size);
114-
if (1 != err) { goto error_hndl; }
115-
116-
/* Exchange data with the peer */
117-
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[i], rcounts[i], rdtypes[i],
118-
i, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
119-
if (MPI_SUCCESS != err) { goto error_hndl; }
120-
121-
err = MCA_PML_CALL(send ((void *) save_buffer, packed_size, MPI_PACKED,
122-
i, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
123-
comm));
124-
if (MPI_SUCCESS != err) { goto error_hndl; }
125-
} else {
126-
continue;
127-
}
128-
129-
/* Wait for the requests to complete */
88+
for (i = 1 ; i <= (size >> 1) ; ++i) {
89+
struct iovec iov = {.iov_base = tmp_buffer, .iov_len = max_size};
90+
uint32_t iov_count = 1;
91+
92+
right = (rank + i) % size;
93+
left = (rank + size - i) % size;
94+
95+
ompi_datatype_type_size(rdtypes[right], &msg_size_right);
96+
msg_size_right *= rcounts[right];
97+
98+
ompi_datatype_type_size(rdtypes[left], &msg_size_left);
99+
msg_size_left *= rcounts[left];
100+
101+
if( 0 != msg_size_right ) { /* nothing to exchange with the peer on the right */
102+
ompi_proc_t *right_proc = ompi_comm_peer_lookup(comm, right);
103+
opal_convertor_clone(right_proc->super.proc_convertor, &convertor, 0);
104+
opal_convertor_prepare_for_send(&convertor, &rdtypes[right]->super, rcounts[right],
105+
(char *) rbuf + rdisps[right]);
106+
packed_size = max_size;
107+
err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size);
108+
if (1 != err) { goto error_hndl; }
109+
110+
/* Receive data from the right */
111+
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[right], rcounts[right], rdtypes[right],
112+
right, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
113+
if (MPI_SUCCESS != err) { goto error_hndl; }
114+
}
115+
116+
if( (left != right) && (0 != msg_size_left) ) {
117+
/* Send data to the left */
118+
err = MCA_PML_CALL(send ((char *) rbuf + rdisps[left], rcounts[left], rdtypes[left],
119+
left, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
120+
comm));
121+
if (MPI_SUCCESS != err) { goto error_hndl; }
122+
123+
err = ompi_request_wait (&req, MPI_STATUSES_IGNORE);
124+
if (MPI_SUCCESS != err) { goto error_hndl; }
125+
126+
/* Receive data from the left */
127+
err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[left], rcounts[left], rdtypes[left],
128+
left, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req));
129+
if (MPI_SUCCESS != err) { goto error_hndl; }
130+
}
131+
132+
if( 0 != msg_size_right ) { /* nothing to exchange with the peer on the right */
133+
/* Send data to the right */
134+
err = MCA_PML_CALL(send ((char *) tmp_buffer, packed_size, MPI_PACKED,
135+
right, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD,
136+
comm));
137+
if (MPI_SUCCESS != err) { goto error_hndl; }
138+
130139
err = ompi_request_wait (&req, MPI_STATUSES_IGNORE);
131140
if (MPI_SUCCESS != err) { goto error_hndl; }
132141
}
133142
}
134143

135144
error_hndl:
136145
/* Free the temporary buffer */
137-
free (save_buffer);
146+
free (tmp_buffer);
138147

139148
/* All done */
140149

0 commit comments

Comments
 (0)