From 0041ce851ec3d8cfd55c0d7794f8beba3909f4ad Mon Sep 17 00:00:00 2001 From: Gilles Gouaillardet Date: Mon, 30 Aug 2021 10:08:44 +0900 Subject: [PATCH 1/5] coll/basic: fix MPI_Alltoallw(MPI_IN_PLACE) gap handling The temporary buffer must be shifted by the true_extent on a per type basis (since the various datatypes might have different true_extent). Thanks Heiko Bauke for reporting this. Refs. open-mpi/ompi#9329 Signed-off-by: Gilles Gouaillardet --- ompi/mca/coll/basic/coll_basic_alltoallw.c | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/ompi/mca/coll/basic/coll_basic_alltoallw.c b/ompi/mca/coll/basic/coll_basic_alltoallw.c index 93fa880fc2d..9470d4ac11c 100644 --- a/ompi/mca/coll/basic/coll_basic_alltoallw.c +++ b/ompi/mca/coll/basic/coll_basic_alltoallw.c @@ -14,8 +14,8 @@ * Copyright (c) 2013 Los Alamos National Security, LLC. All rights * reserved. * Copyright (c) 2013 FUJITSU LIMITED. All rights reserved. - * Copyright (c) 2014-2016 Research Organization for Information Science - * and Technology (RIST). All rights reserved. + * Copyright (c) 2014-2021 Research Organization for Information Science + * and Technology (RIST). All rights reserved. * Copyright (c) 2014 Cisco Systems, Inc. All rights reserved. * Copyright (c) 2017 IBM Corporation. All rights reserved. * $COPYRIGHT$ @@ -44,7 +44,7 @@ mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, con { int i, j, size, rank, err = MPI_SUCCESS, max_size; ompi_request_t *req; - char *tmp_buffer, *save_buffer = NULL; + char *save_buffer = NULL; ptrdiff_t ext, gap = 0; /* Initialize. */ @@ -65,11 +65,10 @@ mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, con } /* Allocate a temporary buffer */ - tmp_buffer = save_buffer = calloc (max_size, 1); - if (NULL == tmp_buffer) { + save_buffer = calloc (max_size, 1); + if (NULL == save_buffer) { return OMPI_ERR_OUT_OF_RESOURCE; } - tmp_buffer -= gap; /* in-place alltoallw slow algorithm (but works) */ for (i = 0 ; i < size ; ++i) { @@ -83,6 +82,10 @@ mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, con /* Initiate all send/recv to/from others. */ if (i == rank && msg_size_j != 0) { + char * tmp_buffer; + /* Shift the temporary buffer according to the current datatype */ + (void)opal_datatype_span(&rdtypes[j]->super, rcounts[j], &gap); + tmp_buffer = save_buffer - gap; /* Copy the data into the temporary buffer */ err = ompi_datatype_copy_content_same_ddt (rdtypes[j], rcounts[j], tmp_buffer, (char *) rbuf + rdisps[j]); @@ -98,6 +101,10 @@ mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, con comm)); if (MPI_SUCCESS != err) { goto error_hndl; } } else if (j == rank && msg_size_i != 0) { + char * tmp_buffer; + /* Shift the temporary buffer according to the current datatype */ + (void)opal_datatype_span(&rdtypes[i]->super, rcounts[i], &gap); + tmp_buffer = save_buffer - gap; /* Copy the data into the temporary buffer */ err = ompi_datatype_copy_content_same_ddt (rdtypes[i], rcounts[i], tmp_buffer, (char *) rbuf + rdisps[i]); From 74049fcd41d69ccd440153f6cfee7cf4ac89a218 Mon Sep 17 00:00:00 2001 From: George Bosilca Date: Tue, 31 Aug 2021 01:17:50 -0400 Subject: [PATCH 2/5] Expose opal_datatype_compute_remote_size. This function can be used to compute the packed size of a datatype on a target architecture. Signed-off-by: George Bosilca --- opal/datatype/opal_convertor.c | 29 +++---------------------- opal/datatype/opal_datatype.h | 9 ++++++++ opal/datatype/opal_datatype_get_count.c | 24 ++++++++++++++++++++ 3 files changed, 36 insertions(+), 26 deletions(-) diff --git a/opal/datatype/opal_convertor.c b/opal/datatype/opal_convertor.c index e08265b42bc..365ddc45a1e 100644 --- a/opal/datatype/opal_convertor.c +++ b/opal/datatype/opal_convertor.c @@ -455,29 +455,6 @@ int32_t opal_convertor_set_position_nocheck(opal_convertor_t *convertor, size_t return rc; } -static size_t opal_datatype_compute_remote_size(const opal_datatype_t *pData, const size_t *sizes) -{ - uint32_t typeMask = pData->bdt_used; - size_t length = 0; - - if (opal_datatype_is_predefined(pData)) { - return sizes[pData->desc.desc->elem.common.type]; - } - - if (OPAL_UNLIKELY(NULL == pData->ptypes)) { - /* Allocate and fill the array of types used in the datatype description */ - opal_datatype_compute_ptypes((opal_datatype_t *) pData); - } - - for (int i = OPAL_DATATYPE_FIRST_TYPE; typeMask && (i < OPAL_DATATYPE_MAX_PREDEFINED); i++) { - if (typeMask & ((uint32_t) 1 << i)) { - length += (pData->ptypes[i] * sizes[i]); - typeMask ^= ((uint32_t) 1 << i); - } - } - return length; -} - /** * Compute the remote size. If necessary remove the homogeneous flag * and redirect the convertor description toward the non-optimized @@ -496,9 +473,9 @@ size_t opal_convertor_compute_remote_size(opal_convertor_t *pConvertor) } if (0 == (pConvertor->flags & CONVERTOR_HAS_REMOTE_SIZE)) { /* This is for a single datatype, we must update it with the count */ - pConvertor->remote_size = opal_datatype_compute_remote_size(datatype, - pConvertor->master - ->remote_sizes); + pConvertor->remote_size = + opal_datatype_compute_remote_size(datatype, + pConvertor->master->remote_sizes); pConvertor->remote_size *= pConvertor->count; } } diff --git a/opal/datatype/opal_datatype.h b/opal/datatype/opal_datatype.h index 7dabd1742c0..5f7fc53fa7d 100644 --- a/opal/datatype/opal_datatype.h +++ b/opal/datatype/opal_datatype.h @@ -311,6 +311,15 @@ OPAL_DECLSPEC int32_t opal_datatype_copy_content_same_ddt(const opal_datatype_t OPAL_DECLSPEC int opal_datatype_compute_ptypes(opal_datatype_t *datatype); +/* + * Compute the size of the datatype using a specific set of predefined type sizes. + * This function allows to compute the size of a packed buffer without creating + * a fully fledged specialized convertor for the remote peer. + */ +OPAL_DECLSPEC size_t +opal_datatype_compute_remote_size(const opal_datatype_t *pData, + const size_t *sizes); + /* Compute the span in memory of count datatypes. This function help with temporary * memory allocations for receiving already typed data (such as those used for reduce * operations). This span is the distance between the minimum and the maximum byte diff --git a/opal/datatype/opal_datatype_get_count.c b/opal/datatype/opal_datatype_get_count.c index 202601d97a2..fed344d1bbd 100644 --- a/opal/datatype/opal_datatype_get_count.c +++ b/opal/datatype/opal_datatype_get_count.c @@ -223,3 +223,27 @@ int opal_datatype_compute_ptypes(opal_datatype_t *datatype) } } } + +size_t opal_datatype_compute_remote_size(const opal_datatype_t *pData, const size_t *sizes) +{ + uint32_t typeMask = pData->bdt_used; + size_t length = 0; + + if (opal_datatype_is_predefined(pData)) { + return sizes[pData->desc.desc->elem.common.type]; + } + + if (OPAL_UNLIKELY(NULL == pData->ptypes)) { + /* Allocate and fill the array of types used in the datatype description */ + opal_datatype_compute_ptypes((opal_datatype_t *) pData); + } + + for (int i = OPAL_DATATYPE_FIRST_TYPE; typeMask && (i < OPAL_DATATYPE_MAX_PREDEFINED); i++) { + if (typeMask & ((uint32_t) 1 << i)) { + length += (pData->ptypes[i] * sizes[i]); + typeMask ^= ((uint32_t) 1 << i); + } + } + return length; +} + From 447b28900173c749258df9868b197b62802ad6dd Mon Sep 17 00:00:00 2001 From: George Bosilca Date: Tue, 31 Aug 2021 01:18:28 -0400 Subject: [PATCH 3/5] Reduce the amount of temporary memory needed for MPI_Alltoallw. Dont copy the datatype into a buffer with the same extent, but instead pack it and send it to the peer as packed. Signed-off-by: George Bosilca --- ompi/mca/coll/basic/coll_basic_alltoallw.c | 53 ++++++++++++---------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/ompi/mca/coll/basic/coll_basic_alltoallw.c b/ompi/mca/coll/basic/coll_basic_alltoallw.c index 9470d4ac11c..fe753b34e74 100644 --- a/ompi/mca/coll/basic/coll_basic_alltoallw.c +++ b/ompi/mca/coll/basic/coll_basic_alltoallw.c @@ -31,6 +31,7 @@ #include "mpi.h" #include "ompi/constants.h" #include "ompi/datatype/ompi_datatype.h" +#include "opal/datatype/opal_convertor_internal.h" #include "ompi/mca/coll/coll.h" #include "ompi/mca/coll/base/coll_tags.h" #include "ompi/mca/pml/pml.h" @@ -42,12 +43,11 @@ mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, con struct ompi_communicator_t *comm, mca_coll_base_module_t *module) { - int i, j, size, rank, err = MPI_SUCCESS, max_size; + int i, j, size, rank, err = MPI_SUCCESS; ompi_request_t *req; char *save_buffer = NULL; - ptrdiff_t ext, gap = 0; - - /* Initialize. */ + size_t max_size = 0, packed_size; + opal_convertor_t convertor; size = ompi_comm_size(comm); rank = ompi_comm_rank(comm); @@ -57,11 +57,14 @@ mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, con return MPI_SUCCESS; } - /* Find the largest receive amount */ + /* Find the largest amount of packed send/recv data */ for (i = 0, max_size = 0 ; i < size ; ++i) { - ext = opal_datatype_span(&rdtypes[i]->super, rcounts[i], &gap); + ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, i); - max_size = ext > max_size ? ext : max_size; + packed_size = opal_datatype_compute_remote_size(&rdtypes[i]->super, + ompi_proc->super.proc_convertor->master->remote_sizes); + packed_size *= rcounts[i]; + max_size = packed_size > max_size ? packed_size : max_size; } /* Allocate a temporary buffer */ @@ -77,45 +80,45 @@ mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, con msg_size_i *= rcounts[i]; for (j = i+1 ; j < size ; ++j) { size_t msg_size_j; + struct iovec iov = {.iov_base = save_buffer, .iov_len = max_size}; + uint32_t iov_count = 1; ompi_datatype_type_size(rdtypes[j], &msg_size_j); msg_size_j *= rcounts[j]; /* Initiate all send/recv to/from others. */ if (i == rank && msg_size_j != 0) { - char * tmp_buffer; - /* Shift the temporary buffer according to the current datatype */ - (void)opal_datatype_span(&rdtypes[j]->super, rcounts[j], &gap); - tmp_buffer = save_buffer - gap; - /* Copy the data into the temporary buffer */ - err = ompi_datatype_copy_content_same_ddt (rdtypes[j], rcounts[j], - tmp_buffer, (char *) rbuf + rdisps[j]); - if (MPI_SUCCESS != err) { goto error_hndl; } + ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, j); + opal_convertor_clone(&convertor, ompi_proc->super.proc_convertor, 0); + opal_convertor_prepare_for_send(&convertor, &rdtypes[j]->super, rcounts[j], + (char *) rbuf + rdisps[j]); + packed_size = max_size; + err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size); + if (1 != err) { goto error_hndl; } /* Exchange data with the peer */ err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[j], rcounts[j], rdtypes[j], j, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req)); if (MPI_SUCCESS != err) { goto error_hndl; } - err = MCA_PML_CALL(send ((void *) tmp_buffer, rcounts[j], rdtypes[j], + err = MCA_PML_CALL(send ((void *) save_buffer, packed_size, MPI_PACKED, j, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD, comm)); if (MPI_SUCCESS != err) { goto error_hndl; } } else if (j == rank && msg_size_i != 0) { - char * tmp_buffer; - /* Shift the temporary buffer according to the current datatype */ - (void)opal_datatype_span(&rdtypes[i]->super, rcounts[i], &gap); - tmp_buffer = save_buffer - gap; - /* Copy the data into the temporary buffer */ - err = ompi_datatype_copy_content_same_ddt (rdtypes[i], rcounts[i], - tmp_buffer, (char *) rbuf + rdisps[i]); - if (MPI_SUCCESS != err) { goto error_hndl; } + ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, i); + opal_convertor_clone(&convertor, ompi_proc->super.proc_convertor, 0); + opal_convertor_prepare_for_send(&convertor, &rdtypes[i]->super, rcounts[i], + (char *) rbuf + rdisps[i]); + packed_size = max_size; + err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size); + if (1 != err) { goto error_hndl; } /* Exchange data with the peer */ err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[i], rcounts[i], rdtypes[i], i, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req)); if (MPI_SUCCESS != err) { goto error_hndl; } - err = MCA_PML_CALL(send ((void *) tmp_buffer, rcounts[i], rdtypes[i], + err = MCA_PML_CALL(send ((void *) save_buffer, packed_size, MPI_PACKED, i, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD, comm)); if (MPI_SUCCESS != err) { goto error_hndl; } From dc4e2ce26ba826a2492bf77890443d73d4aa719f Mon Sep 17 00:00:00 2001 From: George Bosilca Date: Wed, 1 Sep 2021 15:36:19 -0400 Subject: [PATCH 4/5] A better MPI_IN_PLACE alltoallw algorithm. Provide optimized variant for the homogeneous case. Signed-off-by: George Bosilca --- ompi/mca/coll/basic/coll_basic_alltoallw.c | 161 +++++++++++---------- 1 file changed, 87 insertions(+), 74 deletions(-) diff --git a/ompi/mca/coll/basic/coll_basic_alltoallw.c b/ompi/mca/coll/basic/coll_basic_alltoallw.c index fe753b34e74..50ec9c9774b 100644 --- a/ompi/mca/coll/basic/coll_basic_alltoallw.c +++ b/ompi/mca/coll/basic/coll_basic_alltoallw.c @@ -3,7 +3,7 @@ * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. - * Copyright (c) 2004-2016 The University of Tennessee and The University + * Copyright (c) 2004-2021 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, @@ -36,97 +36,110 @@ #include "ompi/mca/coll/base/coll_tags.h" #include "ompi/mca/pml/pml.h" - +/* + * We want to minimize the amount of temporary memory needed while allowing as many ranks + * to exchange data simultaneously. We use a variation of the ring algorithm, where in a + * single step a process echange the data with both neighbors at distance k (on the left + * and the right on a logical ring topology). With this approach we need to pack the data + * for a single of the two neighbors, as we can then use the original buffer (and datatype + * and count) to send the data to the other. + */ static int mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, const int *rdisps, struct ompi_datatype_t * const *rdtypes, struct ompi_communicator_t *comm, mca_coll_base_module_t *module) { - int i, j, size, rank, err = MPI_SUCCESS; - ompi_request_t *req; - char *save_buffer = NULL; - size_t max_size = 0, packed_size; + int i, size, rank, left, right, err = MPI_SUCCESS; + ompi_request_t *req = MPI_REQUEST_NULL; + char *tmp_buffer = NULL; + size_t max_size = 0, packed_size, msg_size_left, msg_size_right; opal_convertor_t convertor; size = ompi_comm_size(comm); - rank = ompi_comm_rank(comm); - - /* If only one process, we're done. */ - if (1 == size) { + if (1 == size) { /* If only one process, we're done. */ return MPI_SUCCESS; } + rank = ompi_comm_rank(comm); - /* Find the largest amount of packed send/recv data */ - for (i = 0, max_size = 0 ; i < size ; ++i) { - ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, i); - - packed_size = opal_datatype_compute_remote_size(&rdtypes[i]->super, - ompi_proc->super.proc_convertor->master->remote_sizes); - packed_size *= rcounts[i]; + /* Find the largest amount of packed send/recv data among all peers where + * we need to pack before the send. + */ + for (i = 1 ; i <= (size >> 1) ; ++i) { + right = (rank + i) % size; +#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT + ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, right); + + if( OPAL_LIKELY(opal_local_arch == ompi_proc->super.proc_convertor->master->remote_arch)) { + opal_datatype_type_size(&rdtypes[right]->super, &packed_size); + } else { + packed_size = opal_datatype_compute_remote_size(&rdtypes[right]->super, + ompi_proc->super.proc_convertor->master->remote_sizes); + } +#else + opal_datatype_type_size(&rdtypes[right]->super, &packed_size); +#endif /* OPAL_ENABLE_HETEROGENEOUS_SUPPORT */ + packed_size *= rcounts[right]; max_size = packed_size > max_size ? packed_size : max_size; } /* Allocate a temporary buffer */ - save_buffer = calloc (max_size, 1); - if (NULL == save_buffer) { + tmp_buffer = calloc (max_size, 1); + if (NULL == tmp_buffer) { return OMPI_ERR_OUT_OF_RESOURCE; } - /* in-place alltoallw slow algorithm (but works) */ - for (i = 0 ; i < size ; ++i) { - size_t msg_size_i; - ompi_datatype_type_size(rdtypes[i], &msg_size_i); - msg_size_i *= rcounts[i]; - for (j = i+1 ; j < size ; ++j) { - size_t msg_size_j; - struct iovec iov = {.iov_base = save_buffer, .iov_len = max_size}; - uint32_t iov_count = 1; - ompi_datatype_type_size(rdtypes[j], &msg_size_j); - msg_size_j *= rcounts[j]; - - /* Initiate all send/recv to/from others. */ - if (i == rank && msg_size_j != 0) { - ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, j); - opal_convertor_clone(&convertor, ompi_proc->super.proc_convertor, 0); - opal_convertor_prepare_for_send(&convertor, &rdtypes[j]->super, rcounts[j], - (char *) rbuf + rdisps[j]); - packed_size = max_size; - err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size); - if (1 != err) { goto error_hndl; } - - /* Exchange data with the peer */ - err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[j], rcounts[j], rdtypes[j], - j, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req)); - if (MPI_SUCCESS != err) { goto error_hndl; } - - err = MCA_PML_CALL(send ((void *) save_buffer, packed_size, MPI_PACKED, - j, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD, - comm)); - if (MPI_SUCCESS != err) { goto error_hndl; } - } else if (j == rank && msg_size_i != 0) { - ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, i); - opal_convertor_clone(&convertor, ompi_proc->super.proc_convertor, 0); - opal_convertor_prepare_for_send(&convertor, &rdtypes[i]->super, rcounts[i], - (char *) rbuf + rdisps[i]); - packed_size = max_size; - err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size); - if (1 != err) { goto error_hndl; } - - /* Exchange data with the peer */ - err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[i], rcounts[i], rdtypes[i], - i, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req)); - if (MPI_SUCCESS != err) { goto error_hndl; } - - err = MCA_PML_CALL(send ((void *) save_buffer, packed_size, MPI_PACKED, - i, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD, - comm)); - if (MPI_SUCCESS != err) { goto error_hndl; } - } else { - continue; - } - - /* Wait for the requests to complete */ + for (i = 1 ; i <= (size >> 1) ; ++i) { + struct iovec iov = {.iov_base = tmp_buffer, .iov_len = max_size}; + uint32_t iov_count = 1; + + right = (rank + i) % size; + left = (rank + size - i) % size; + + ompi_datatype_type_size(rdtypes[right], &msg_size_right); + msg_size_right *= rcounts[right]; + + ompi_datatype_type_size(rdtypes[left], &msg_size_left); + msg_size_left *= rcounts[left]; + + if( 0 != msg_size_right ) { /* nothing to exchange with the peer on the right */ + ompi_proc_t *right_proc = ompi_comm_peer_lookup(comm, right); + opal_convertor_clone(right_proc->super.proc_convertor, &convertor, 0); + opal_convertor_prepare_for_send(&convertor, &rdtypes[right]->super, rcounts[right], + (char *) rbuf + rdisps[right]); + packed_size = max_size; + err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size); + if (1 != err) { goto error_hndl; } + + /* Receive data from the right */ + err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[right], rcounts[right], rdtypes[right], + right, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req)); + if (MPI_SUCCESS != err) { goto error_hndl; } + } + + if( (left != right) && (0 != msg_size_left) ) { + /* Send data to the left */ + err = MCA_PML_CALL(send ((char *) rbuf + rdisps[left], rcounts[left], rdtypes[left], + left, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD, + comm)); + if (MPI_SUCCESS != err) { goto error_hndl; } + + err = ompi_request_wait (&req, MPI_STATUSES_IGNORE); + if (MPI_SUCCESS != err) { goto error_hndl; } + + /* Receive data from the left */ + err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[left], rcounts[left], rdtypes[left], + left, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req)); + if (MPI_SUCCESS != err) { goto error_hndl; } + } + + if( 0 != msg_size_right ) { /* nothing to exchange with the peer on the right */ + /* Send data to the right */ + err = MCA_PML_CALL(send ((char *) tmp_buffer, packed_size, MPI_PACKED, + right, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD, + comm)); + if (MPI_SUCCESS != err) { goto error_hndl; } + err = ompi_request_wait (&req, MPI_STATUSES_IGNORE); if (MPI_SUCCESS != err) { goto error_hndl; } } @@ -134,7 +147,7 @@ mca_coll_basic_alltoallw_intra_inplace(const void *rbuf, const int *rcounts, con error_hndl: /* Free the temporary buffer */ - free (save_buffer); + free (tmp_buffer); /* All done */ From b9012a3ccf2ee03577845ae16cebe175fa686f06 Mon Sep 17 00:00:00 2001 From: George Bosilca Date: Fri, 3 Sep 2021 01:48:50 -0400 Subject: [PATCH 5/5] Better INPLACE algorithm for all2all and all2allv Signed-off-by: George Bosilca --- ompi/mca/coll/base/coll_base_alltoall.c | 142 ++++++++++++--------- ompi/mca/coll/base/coll_base_alltoallv.c | 150 ++++++++++++++--------- 2 files changed, 176 insertions(+), 116 deletions(-) diff --git a/ompi/mca/coll/base/coll_base_alltoall.c b/ompi/mca/coll/base/coll_base_alltoall.c index 9446b8a414d..b3c642c4398 100644 --- a/ompi/mca/coll/base/coll_base_alltoall.c +++ b/ompi/mca/coll/base/coll_base_alltoall.c @@ -3,7 +3,7 @@ * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. - * Copyright (c) 2004-2017 The University of Tennessee and The University + * Copyright (c) 2004-2021 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, @@ -35,88 +35,112 @@ #include "coll_base_topo.h" #include "coll_base_util.h" -/* MPI_IN_PLACE all to all algorithm. TODO: implement a better one. */ +/* + * We want to minimize the amount of temporary memory needed while allowing as many ranks + * to exchange data simultaneously. We use a variation of the ring algorithm, where in a + * single step a process echange the data with both neighbors at distance k (on the left + * and the right on a logical ring topology). With this approach we need to pack the data + * for a single of the two neighbors, as we can then use the original buffer (and datatype + * and count) to send the data to the other. + */ int mca_coll_base_alltoall_intra_basic_inplace(const void *rbuf, int rcount, struct ompi_datatype_t *rdtype, struct ompi_communicator_t *comm, mca_coll_base_module_t *module) { - int i, j, size, rank, err = MPI_SUCCESS, line; - ptrdiff_t ext, gap = 0; + int i, size, rank, left, right, err = MPI_SUCCESS, line; + ptrdiff_t extent; ompi_request_t *req; - char *allocated_buffer = NULL, *tmp_buffer; - size_t max_size; + char *tmp_buffer; + size_t packed_size = 0, max_size; + opal_convertor_t convertor; /* Initialize. */ size = ompi_comm_size(comm); rank = ompi_comm_rank(comm); - /* If only one process, we're done. */ - if (1 == size) { + ompi_datatype_type_size(rdtype, &max_size); + + /* Easy way out */ + if ((1 == size) || (0 == rcount) || (0 == max_size) ) { return MPI_SUCCESS; } - /* Find the largest receive amount */ - ompi_datatype_type_extent (rdtype, &ext); - max_size = opal_datatype_span(&rdtype->super, rcount, &gap); + /* Find the largest amount of packed send/recv data among all peers where + * we need to pack before the send. + */ +#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT + for (i = 1 ; i <= (size >> 1) ; ++i) { + right = (rank + i) % size; + ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, right); + + if( OPAL_UNLIKELY(opal_local_arch != ompi_proc->super.proc_convertor->master->remote_arch)) { + packed_size = opal_datatype_compute_remote_size(&rdtype->super, + ompi_proc->super.proc_convertor->master->remote_sizes); + max_size = packed_size > max_size ? packed_size : max_size; + } + } +#endif /* OPAL_ENABLE_HETEROGENEOUS_SUPPORT */ + max_size *= rcount; - /* Initiate all send/recv to/from others. */ + ompi_datatype_type_extent(rdtype, &extent); /* Allocate a temporary buffer */ - allocated_buffer = calloc (max_size, 1); - if( NULL == allocated_buffer) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto error_hndl; } - tmp_buffer = allocated_buffer - gap; - max_size = ext * rcount; - - /* in-place alltoall slow algorithm (but works) */ - for (i = 0 ; i < size ; ++i) { - for (j = i+1 ; j < size ; ++j) { - if (i == rank) { - /* Copy the data into the temporary buffer */ - err = ompi_datatype_copy_content_same_ddt (rdtype, rcount, tmp_buffer, - (char *) rbuf + j * max_size); - if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; } - - /* Exchange data with the peer */ - err = MCA_PML_CALL(irecv ((char *) rbuf + max_size * j, rcount, rdtype, - j, MCA_COLL_BASE_TAG_ALLTOALL, comm, &req)); - if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; } - - err = MCA_PML_CALL(send ((char *) tmp_buffer, rcount, rdtype, - j, MCA_COLL_BASE_TAG_ALLTOALL, MCA_PML_BASE_SEND_STANDARD, - comm)); - if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; } - } else if (j == rank) { - /* Copy the data into the temporary buffer */ - err = ompi_datatype_copy_content_same_ddt (rdtype, rcount, tmp_buffer, - (char *) rbuf + i * max_size); - if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; } - - /* Exchange data with the peer */ - err = MCA_PML_CALL(irecv ((char *) rbuf + max_size * i, rcount, rdtype, - i, MCA_COLL_BASE_TAG_ALLTOALL, comm, &req)); - if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; } - - err = MCA_PML_CALL(send ((char *) tmp_buffer, rcount, rdtype, - i, MCA_COLL_BASE_TAG_ALLTOALL, MCA_PML_BASE_SEND_STANDARD, - comm)); - if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; } - } else { - continue; - } - - /* Wait for the requests to complete */ - err = ompi_request_wait ( &req, MPI_STATUSES_IGNORE); - if (MPI_SUCCESS != err) { line = __LINE__; goto error_hndl; } + tmp_buffer = calloc (max_size, 1); + if( NULL == tmp_buffer) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto error_hndl; } + + for (i = 1 ; i <= (size >> 1) ; ++i) { + struct iovec iov = {.iov_base = tmp_buffer, .iov_len = max_size}; + uint32_t iov_count = 1; + + right = (rank + i) % size; + left = (rank + size - i) % size; + + ompi_proc_t *right_proc = ompi_comm_peer_lookup(comm, right); + opal_convertor_clone(right_proc->super.proc_convertor, &convertor, 0); + opal_convertor_prepare_for_send(&convertor, &rdtype->super, rcount, + (char *) rbuf + right * extent); + packed_size = max_size; + err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size); + if (1 != err) { goto error_hndl; } + + /* Receive data from the right */ + err = MCA_PML_CALL(irecv ((char *) rbuf + right * extent, rcount, rdtype, + right, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req)); + if (MPI_SUCCESS != err) { goto error_hndl; } + + if( left != right ) { + /* Send data to the left */ + err = MCA_PML_CALL(send ((char *) rbuf + left * extent, rcount, rdtype, + left, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD, + comm)); + if (MPI_SUCCESS != err) { goto error_hndl; } + + err = ompi_request_wait (&req, MPI_STATUSES_IGNORE); + if (MPI_SUCCESS != err) { goto error_hndl; } + + /* Receive data from the left */ + err = MCA_PML_CALL(irecv ((char *) rbuf + left * extent, rcount, rdtype, + left, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req)); + if (MPI_SUCCESS != err) { goto error_hndl; } } + + /* Send data to the right */ + err = MCA_PML_CALL(send ((char *) tmp_buffer, packed_size, MPI_PACKED, + right, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD, + comm)); + if (MPI_SUCCESS != err) { goto error_hndl; } + + err = ompi_request_wait (&req, MPI_STATUSES_IGNORE); + if (MPI_SUCCESS != err) { goto error_hndl; } } error_hndl: /* Free the temporary buffer */ - if( NULL != allocated_buffer ) - free (allocated_buffer); + if( NULL != tmp_buffer ) + free (tmp_buffer); if( MPI_SUCCESS != err ) { OPAL_OUTPUT((ompi_coll_base_framework.framework_output, diff --git a/ompi/mca/coll/base/coll_base_alltoallv.c b/ompi/mca/coll/base/coll_base_alltoallv.c index 5274de89a42..f51bd577fb7 100644 --- a/ompi/mca/coll/base/coll_base_alltoallv.c +++ b/ompi/mca/coll/base/coll_base_alltoallv.c @@ -3,7 +3,7 @@ * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology * Corporation. All rights reserved. - * Copyright (c) 2004-2017 The University of Tennessee and The University + * Copyright (c) 2004-2021 The University of Tennessee and The University * of Tennessee Research Foundation. All rights * reserved. * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, @@ -37,85 +37,121 @@ #include "coll_base_topo.h" #include "coll_base_util.h" +/* + * We want to minimize the amount of temporary memory needed while allowing as many ranks + * to exchange data simultaneously. We use a variation of the ring algorithm, where in a + * single step a process echange the data with both neighbors at distance k (on the left + * and the right on a logical ring topology). With this approach we need to pack the data + * for a single of the two neighbors, as we can then use the original buffer (and datatype + * and count) to send the data to the other. + */ int mca_coll_base_alltoallv_intra_basic_inplace(const void *rbuf, const int *rcounts, const int *rdisps, struct ompi_datatype_t *rdtype, struct ompi_communicator_t *comm, mca_coll_base_module_t *module) { - int i, j, size, rank, err=MPI_SUCCESS; - char *allocated_buffer, *tmp_buffer; - size_t max_size; - ptrdiff_t ext, gap = 0; + int i, size, rank, left, right, err = MPI_SUCCESS, line; + ompi_request_t *req; + char *tmp_buffer; + size_t packed_size = 0, max_size; + opal_convertor_t convertor; /* Initialize. */ size = ompi_comm_size(comm); rank = ompi_comm_rank(comm); - /* If only one process, we're done. */ - if (1 == size) { + ompi_datatype_type_size(rdtype, &max_size); + max_size *= rcounts[rank]; + + /* Easy way out */ + if ((1 == size) || (0 == max_size) ) { return MPI_SUCCESS; } - /* Find the largest receive amount */ - ompi_datatype_type_extent (rdtype, &ext); - for (i = 0, max_size = 0 ; i < size ; ++i) { - if (i == rank) { - continue; - } - size_t cur_size = opal_datatype_span(&rdtype->super, rcounts[i], &gap); - max_size = cur_size > max_size ? cur_size : max_size; - } - /* The gap will always be the same as we are working on the same datatype */ - if (OPAL_UNLIKELY(0 == max_size)) { - return MPI_SUCCESS; + /* Find the largest amount of packed send/recv data among all peers where + * we need to pack before the send. + */ +#if OPAL_ENABLE_HETEROGENEOUS_SUPPORT + for (i = 1 ; i <= (size >> 1) ; ++i) { + right = (rank + i) % size; + ompi_proc_t *ompi_proc = ompi_comm_peer_lookup(comm, right); + + if( OPAL_UNLIKELY(opal_local_arch != ompi_proc->super.proc_convertor->master->remote_arch)) { + packed_size = opal_datatype_compute_remote_size(&rdtype->super, + ompi_proc->super.proc_convertor->master->remote_sizes); + packed_size *= rcounts[right]; + max_size = packed_size > max_size ? packed_size : max_size; + } } +#endif /* OPAL_ENABLE_HETEROGENEOUS_SUPPORT */ /* Allocate a temporary buffer */ - allocated_buffer = calloc (max_size, 1); - if (NULL == allocated_buffer) { - return OMPI_ERR_OUT_OF_RESOURCE; - } - tmp_buffer = allocated_buffer - gap; - - /* Initiate all send/recv to/from others. */ - /* in-place alltoallv slow algorithm (but works) */ - for (i = 0 ; i < size ; ++i) { - for (j = i+1 ; j < size ; ++j) { - if (i == rank && 0 != rcounts[j]) { - /* Copy the data into the temporary buffer */ - err = ompi_datatype_copy_content_same_ddt (rdtype, rcounts[j], - tmp_buffer, (char *) rbuf + rdisps[j] * ext); - if (MPI_SUCCESS != err) { goto error_hndl; } - - /* Exchange data with the peer */ - err = ompi_coll_base_sendrecv_actual((void *) tmp_buffer, rcounts[j], rdtype, - j, MCA_COLL_BASE_TAG_ALLTOALLV, - (char *)rbuf + rdisps[j] * ext, rcounts[j], rdtype, - j, MCA_COLL_BASE_TAG_ALLTOALLV, - comm, MPI_STATUS_IGNORE); - if (MPI_SUCCESS != err) { goto error_hndl; } - } else if (j == rank && 0 != rcounts[i]) { - /* Copy the data into the temporary buffer */ - err = ompi_datatype_copy_content_same_ddt (rdtype, rcounts[i], - tmp_buffer, (char *) rbuf + rdisps[i] * ext); - if (MPI_SUCCESS != err) { goto error_hndl; } - - /* Exchange data with the peer */ - err = ompi_coll_base_sendrecv_actual((void *) tmp_buffer, rcounts[i], rdtype, - i, MCA_COLL_BASE_TAG_ALLTOALLV, - (char *) rbuf + rdisps[i] * ext, rcounts[i], rdtype, - i, MCA_COLL_BASE_TAG_ALLTOALLV, - comm, MPI_STATUS_IGNORE); - if (MPI_SUCCESS != err) { goto error_hndl; } - } + tmp_buffer = calloc (max_size, 1); + if( NULL == tmp_buffer) { err = OMPI_ERR_OUT_OF_RESOURCE; line = __LINE__; goto error_hndl; } + + for (i = 1 ; i <= (size >> 1) ; ++i) { + struct iovec iov = {.iov_base = tmp_buffer, .iov_len = max_size}; + uint32_t iov_count = 1; + + right = (rank + i) % size; + left = (rank + size - i) % size; + + if( 0 != rcounts[right] ) { /* nothing to exchange with the peer on the right */ + ompi_proc_t *right_proc = ompi_comm_peer_lookup(comm, right); + opal_convertor_clone(right_proc->super.proc_convertor, &convertor, 0); + opal_convertor_prepare_for_send(&convertor, &rdtype->super, rcounts[right], + (char *) rbuf + rdisps[right]); + packed_size = max_size; + err = opal_convertor_pack(&convertor, &iov, &iov_count, &packed_size); + if (1 != err) { goto error_hndl; } + + /* Receive data from the right */ + err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[right], rcounts[right], rdtype, + right, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req)); + if (MPI_SUCCESS != err) { goto error_hndl; } + } + + if( (left != right) && (0 != rcounts[left]) ) { + /* Send data to the left */ + err = MCA_PML_CALL(send ((char *) rbuf + rdisps[left], rcounts[left], rdtype, + left, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD, + comm)); + if (MPI_SUCCESS != err) { goto error_hndl; } + + err = ompi_request_wait (&req, MPI_STATUSES_IGNORE); + if (MPI_SUCCESS != err) { goto error_hndl; } + + /* Receive data from the left */ + err = MCA_PML_CALL(irecv ((char *) rbuf + rdisps[left], rcounts[left], rdtype, + left, MCA_COLL_BASE_TAG_ALLTOALLW, comm, &req)); + if (MPI_SUCCESS != err) { goto error_hndl; } + } + + if( 0 != rcounts[right] ) { /* nothing to exchange with the peer on the right */ + /* Send data to the right */ + err = MCA_PML_CALL(send ((char *) tmp_buffer, packed_size, MPI_PACKED, + right, MCA_COLL_BASE_TAG_ALLTOALLW, MCA_PML_BASE_SEND_STANDARD, + comm)); + if (MPI_SUCCESS != err) { goto error_hndl; } + + err = ompi_request_wait (&req, MPI_STATUSES_IGNORE); + if (MPI_SUCCESS != err) { goto error_hndl; } } } error_hndl: /* Free the temporary buffer */ - free (allocated_buffer); + if( NULL != tmp_buffer ) + free (tmp_buffer); + + if( MPI_SUCCESS != err ) { + OPAL_OUTPUT((ompi_coll_base_framework.framework_output, + "%s:%4d\tError occurred %d, rank %2d", __FILE__, line, err, + rank)); + (void)line; // silence compiler warning + } /* All done */ return err;