Skip to content

coll/libnbc: fix MPI_I{reduce,allreduce,reduce_scatter,reduce_scatter_block} #1760

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions ompi/mca/coll/base/coll_base_allreduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* Copyright (c) 2009 University of Houston. All rights reserved.
* Copyright (c) 2013 Los Alamos National Security, LLC. All Rights
* reserved.
* Copyright (c) 2015 Research Organization for Information Science
* Copyright (c) 2015-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
Expand Down Expand Up @@ -134,7 +134,7 @@ ompi_coll_base_allreduce_intra_recursivedoubling(const void *sbuf, void *rbuf,
{
int ret, line, rank, size, adjsize, remote, distance;
int newrank, newremote, extra_ranks;
char *tmpsend = NULL, *tmprecv = NULL, *tmpswap = NULL, *inplacebuf = NULL;
char *tmpsend = NULL, *tmprecv = NULL, *tmpswap = NULL, *inplacebuf_free = NULL, *inplacebuf;
ompi_request_t *reqs[2] = {NULL, NULL};
OPAL_PTRDIFF_TYPE span, gap;

Expand All @@ -155,8 +155,9 @@ ompi_coll_base_allreduce_intra_recursivedoubling(const void *sbuf, void *rbuf,

/* Allocate and initialize temporary send buffer */
span = opal_datatype_span(&dtype->super, count, &gap);
inplacebuf = (char*) malloc(span);
if (NULL == inplacebuf) { ret = -1; line = __LINE__; goto error_hndl; }
inplacebuf_free = (char*) malloc(span);
if (NULL == inplacebuf_free) { ret = -1; line = __LINE__; goto error_hndl; }
inplacebuf = inplacebuf_free - gap;

if (MPI_IN_PLACE == sbuf) {
ret = ompi_datatype_copy_content_same_ddt(dtype, count, inplacebuf, (char*)rbuf);
Expand Down Expand Up @@ -263,7 +264,7 @@ ompi_coll_base_allreduce_intra_recursivedoubling(const void *sbuf, void *rbuf,
if (ret < 0) { line = __LINE__; goto error_hndl; }
}

if (NULL != inplacebuf) free(inplacebuf);
if (NULL != inplacebuf_free) free(inplacebuf_free);
return MPI_SUCCESS;

error_hndl:
Expand Down
40 changes: 21 additions & 19 deletions ompi/mca/coll/base/coll_base_reduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* All rights reserved.
* Copyright (c) 2013 Los Alamos National Security, LLC. All Rights
* reserved.
* Copyright (c) 2015 Research Organization for Information Science
* Copyright (c) 2015-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
Expand Down Expand Up @@ -485,6 +485,7 @@ int ompi_coll_base_reduce_intra_in_order_binary( const void *sendbuf, void *recv
int ret, rank, size, io_root, segcount = count;
void *use_this_sendbuf = NULL;
void *use_this_recvbuf = NULL;
char *tmpbuf_free = NULL;
size_t typelng;
mca_coll_base_module_t *base_module = (mca_coll_base_module_t*) module;
mca_coll_base_comm_t *data = base_module->base_data;
Expand Down Expand Up @@ -515,24 +516,26 @@ int ompi_coll_base_reduce_intra_in_order_binary( const void *sendbuf, void *recv
use_this_recvbuf = recvbuf;
if (io_root != root) {
ptrdiff_t dsize, gap;
char *tmpbuf = NULL;
char *tmpbuf;

dsize = opal_datatype_span(&datatype->super, count, &gap);

if ((root == rank) && (MPI_IN_PLACE == sendbuf)) {
tmpbuf = (char *) malloc(dsize);
if (NULL == tmpbuf) {
tmpbuf_free = (char *) malloc(dsize);
if (NULL == tmpbuf_free) {
return MPI_ERR_INTERN;
}
tmpbuf = tmpbuf_free - gap;
ompi_datatype_copy_content_same_ddt(datatype, count,
(char*)tmpbuf,
(char*)recvbuf);
use_this_sendbuf = tmpbuf;
} else if (io_root == rank) {
tmpbuf = (char *) malloc(dsize);
if (NULL == tmpbuf) {
tmpbuf_free = (char *) malloc(dsize);
if (NULL == tmpbuf_free) {
return MPI_ERR_INTERN;
}
tmpbuf = tmpbuf_free - gap;
use_this_recvbuf = tmpbuf;
}
}
Expand All @@ -552,19 +555,18 @@ int ompi_coll_base_reduce_intra_in_order_binary( const void *sendbuf, void *recv
MCA_COLL_BASE_TAG_REDUCE, comm,
MPI_STATUS_IGNORE));
if (MPI_SUCCESS != ret) { return ret; }
if (MPI_IN_PLACE == sendbuf) {
free(use_this_sendbuf);
}

} else if (io_root == rank) {
/* Send result from use_this_recvbuf to root */
ret = MCA_PML_CALL(send(use_this_recvbuf, count, datatype, root,
MCA_COLL_BASE_TAG_REDUCE,
MCA_PML_BASE_SEND_STANDARD, comm));
if (MPI_SUCCESS != ret) { return ret; }
free(use_this_recvbuf);
}
}
if (NULL != tmpbuf_free) {
free(tmpbuf_free);
}

return MPI_SUCCESS;
}
Expand Down Expand Up @@ -600,7 +602,7 @@ ompi_coll_base_reduce_intra_basic_linear(const void *sbuf, void *rbuf, int count
ptrdiff_t extent, dsize, gap;
char *free_buffer = NULL;
char *pml_buffer = NULL;
char *inplace_temp = NULL;
char *inplace_temp_free = NULL;
char *inbuf;

/* Initialize */
Expand All @@ -622,18 +624,18 @@ ompi_coll_base_reduce_intra_basic_linear(const void *sbuf, void *rbuf, int count

if (MPI_IN_PLACE == sbuf) {
sbuf = rbuf;
inplace_temp = (char*)malloc(dsize);
if (NULL == inplace_temp) {
inplace_temp_free = (char*)malloc(dsize);
if (NULL == inplace_temp_free) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
rbuf = inplace_temp - gap;
rbuf = inplace_temp_free - gap;
}

if (size > 1) {
free_buffer = (char*)malloc(dsize);
if (NULL == free_buffer) {
if (NULL != inplace_temp) {
free(inplace_temp);
if (NULL != inplace_temp_free) {
free(inplace_temp_free);
}
return OMPI_ERR_OUT_OF_RESOURCE;
}
Expand Down Expand Up @@ -680,9 +682,9 @@ ompi_coll_base_reduce_intra_basic_linear(const void *sbuf, void *rbuf, int count
ompi_op_reduce(op, inbuf, rbuf, count, dtype);
}

if (NULL != inplace_temp) {
err = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)sbuf, inplace_temp);
free(inplace_temp);
if (NULL != inplace_temp_free) {
err = ompi_datatype_copy_content_same_ddt(dtype, count, (char*)sbuf, rbuf);
free(inplace_temp_free);
}
if (NULL != free_buffer) {
free(free_buffer);
Expand Down
29 changes: 16 additions & 13 deletions ompi/mca/coll/basic/coll_basic_reduce_scatter.c
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* Copyright (c) 2012 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2013 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2014-2015 Research Organization for Information Science
* Copyright (c) 2014-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
Expand Down Expand Up @@ -367,8 +367,9 @@ mca_coll_basic_reduce_scatter_inter(const void *sbuf, void *rbuf, const int *rco
{
int err, i, rank, root = 0, rsize, lsize;
int totalcounts;
ptrdiff_t lb, extent;
ptrdiff_t gap, span;
char *tmpbuf = NULL, *tmpbuf2 = NULL;
char *lbuf, *buf;
ompi_request_t *req;
int *disps = NULL;

Expand Down Expand Up @@ -399,10 +400,7 @@ mca_coll_basic_reduce_scatter_inter(const void *sbuf, void *rbuf, const int *rco
* its size is the same as the local communicator size.
*/
if (rank == root) {
err = ompi_datatype_get_extent(dtype, &lb, &extent);
if (OMPI_SUCCESS != err) {
return OMPI_ERROR;
}
span = opal_datatype_span(&dtype->super, totalcounts, &gap);

/* Generate displacements for the scatterv part */
disps = (int*) malloc(sizeof(int) * lsize);
Expand All @@ -414,12 +412,14 @@ mca_coll_basic_reduce_scatter_inter(const void *sbuf, void *rbuf, const int *rco
disps[i + 1] = disps[i] + rcounts[i];
}

tmpbuf = (char *) malloc(totalcounts * extent);
tmpbuf2 = (char *) malloc(totalcounts * extent);
tmpbuf = (char *) malloc(span);
tmpbuf2 = (char *) malloc(span);
if (NULL == tmpbuf || NULL == tmpbuf2) {
err = OMPI_ERR_OUT_OF_RESOURCE;
goto exit;
}
lbuf = tmpbuf - gap;
buf = tmpbuf2 - gap;

/* Do a send-recv between the two root procs. to avoid deadlock */
err = MCA_PML_CALL(isend(sbuf, totalcounts, dtype, 0,
Expand All @@ -429,7 +429,7 @@ mca_coll_basic_reduce_scatter_inter(const void *sbuf, void *rbuf, const int *rco
goto exit;
}

err = MCA_PML_CALL(recv(tmpbuf2, totalcounts, dtype, 0,
err = MCA_PML_CALL(recv(lbuf, totalcounts, dtype, 0,
MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
MPI_STATUS_IGNORE));
if (OMPI_SUCCESS != err) {
Expand All @@ -444,18 +444,21 @@ mca_coll_basic_reduce_scatter_inter(const void *sbuf, void *rbuf, const int *rco

/* Loop receiving and calling reduction function (C or Fortran)
* The result of this reduction operations is then in
* tmpbuf2.
* lbuf.
*/
for (i = 1; i < rsize; i++) {
err = MCA_PML_CALL(recv(tmpbuf, totalcounts, dtype, i,
char *tbuf;
err = MCA_PML_CALL(recv(buf, totalcounts, dtype, i,
MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
MPI_STATUS_IGNORE));
if (MPI_SUCCESS != err) {
goto exit;
}

/* Perform the reduction */
ompi_op_reduce(op, tmpbuf, tmpbuf2, totalcounts, dtype);
ompi_op_reduce(op, lbuf, buf, totalcounts, dtype);
/* swap the buffers */
tbuf = lbuf; lbuf = buf; buf = tbuf;
}
} else {
/* If not root, send data to the root. */
Expand All @@ -468,7 +471,7 @@ mca_coll_basic_reduce_scatter_inter(const void *sbuf, void *rbuf, const int *rco
}

/* Now do a scatterv on the local communicator */
err = comm->c_local_comm->c_coll.coll_scatterv(tmpbuf2, rcounts, disps, dtype,
err = comm->c_local_comm->c_coll.coll_scatterv(lbuf, rcounts, disps, dtype,
rbuf, rcounts[rank], dtype, 0,
comm->c_local_comm,
comm->c_local_comm->c_coll.coll_scatterv_module);
Expand Down
36 changes: 19 additions & 17 deletions ompi/mca/coll/basic/coll_basic_reduce_scatter_block.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* Copyright (c) 2008 Sun Microsystems, Inc. All rights reserved.
* Copyright (c) 2012 Oak Ridge National Labs. All rights reserved.
* Copyright (c) 2012 Sandia National Laboratories. All rights reserved.
* Copyright (c) 2014-2015 Research Organization for Information Science
* Copyright (c) 2014-2016 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* $COPYRIGHT$
*
Expand Down Expand Up @@ -58,7 +58,7 @@ mca_coll_basic_reduce_scatter_block_intra(const void *sbuf, void *rbuf, int rcou
mca_coll_base_module_t *module)
{
int rank, size, count, err = OMPI_SUCCESS;
ptrdiff_t extent, buf_size, gap;
ptrdiff_t gap, span;
char *recv_buf = NULL, *recv_buf_free = NULL;

/* Initialize */
Expand All @@ -72,8 +72,7 @@ mca_coll_basic_reduce_scatter_block_intra(const void *sbuf, void *rbuf, int rcou
}

/* get datatype information */
ompi_datatype_type_extent(dtype, &extent);
buf_size = opal_datatype_span(&dtype->super, count, &gap);
span = opal_datatype_span(&dtype->super, count, &gap);

/* Handle MPI_IN_PLACE */
if (MPI_IN_PLACE == sbuf) {
Expand All @@ -83,12 +82,12 @@ mca_coll_basic_reduce_scatter_block_intra(const void *sbuf, void *rbuf, int rcou
if (0 == rank) {
/* temporary receive buffer. See coll_basic_reduce.c for
details on sizing */
recv_buf_free = (char*) malloc(buf_size);
recv_buf = recv_buf_free - gap;
recv_buf_free = (char*) malloc(span);
if (NULL == recv_buf_free) {
err = OMPI_ERR_OUT_OF_RESOURCE;
goto cleanup;
}
recv_buf = recv_buf_free - gap;
}

/* reduction */
Expand Down Expand Up @@ -126,8 +125,9 @@ mca_coll_basic_reduce_scatter_block_inter(const void *sbuf, void *rbuf, int rcou
{
int err, i, rank, root = 0, rsize, lsize;
int totalcounts;
ptrdiff_t lb, extent;
ptrdiff_t gap, span;
char *tmpbuf = NULL, *tmpbuf2 = NULL;
char *lbuf, *buf;
ompi_request_t *req;

rank = ompi_comm_rank(comm);
Expand All @@ -151,16 +151,15 @@ mca_coll_basic_reduce_scatter_block_inter(const void *sbuf, void *rbuf, int rcou
*
*/
if (rank == root) {
err = ompi_datatype_get_extent(dtype, &lb, &extent);
if (OMPI_SUCCESS != err) {
return OMPI_ERROR;
}
span = opal_datatype_span(&dtype->super, totalcounts, &gap);

tmpbuf = (char *) malloc(totalcounts * extent);
tmpbuf2 = (char *) malloc(totalcounts * extent);
tmpbuf = (char *) malloc(span);
tmpbuf2 = (char *) malloc(span);
if (NULL == tmpbuf || NULL == tmpbuf2) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
lbuf = tmpbuf - gap;
buf = tmpbuf2 - gap;

/* Do a send-recv between the two root procs. to avoid deadlock */
err = MCA_PML_CALL(isend(sbuf, totalcounts, dtype, 0,
Expand All @@ -170,7 +169,7 @@ mca_coll_basic_reduce_scatter_block_inter(const void *sbuf, void *rbuf, int rcou
goto exit;
}

err = MCA_PML_CALL(recv(tmpbuf2, totalcounts, dtype, 0,
err = MCA_PML_CALL(recv(lbuf, totalcounts, dtype, 0,
MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
MPI_STATUS_IGNORE));
if (OMPI_SUCCESS != err) {
Expand All @@ -188,15 +187,18 @@ mca_coll_basic_reduce_scatter_block_inter(const void *sbuf, void *rbuf, int rcou
* tmpbuf2.
*/
for (i = 1; i < rsize; i++) {
err = MCA_PML_CALL(recv(tmpbuf, totalcounts, dtype, i,
char *tbuf;
err = MCA_PML_CALL(recv(buf, totalcounts, dtype, i,
MCA_COLL_BASE_TAG_REDUCE_SCATTER, comm,
MPI_STATUS_IGNORE));
if (MPI_SUCCESS != err) {
goto exit;
}

/* Perform the reduction */
ompi_op_reduce(op, tmpbuf, tmpbuf2, totalcounts, dtype);
ompi_op_reduce(op, lbuf, buf, totalcounts, dtype);
/* swap the buffers */
tbuf = lbuf; lbuf = buf; buf = tbuf;
}
} else {
/* If not root, send data to the root. */
Expand All @@ -209,7 +211,7 @@ mca_coll_basic_reduce_scatter_block_inter(const void *sbuf, void *rbuf, int rcou
}

/* Now do a scatterv on the local communicator */
err = comm->c_local_comm->c_coll.coll_scatter(tmpbuf2, rcount, dtype,
err = comm->c_local_comm->c_coll.coll_scatter(lbuf, rcount, dtype,
rbuf, rcount, dtype, 0,
comm->c_local_comm,
comm->c_local_comm->c_coll.coll_scatter_module);
Expand Down
Loading