Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 70 additions & 43 deletions ompi/mca/coll/basic/coll_basic_gatherv.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,75 +42,102 @@ mca_coll_basic_gatherv_intra(const void *sbuf, int scount,
void *rbuf, const int *rcounts, const int *disps,
struct ompi_datatype_t *rdtype, int root,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module)
mca_coll_base_module_t *module)
{
int i, rank, size, err;
int err, i, peer, rank, size;
char *ptmp;
ptrdiff_t lb, extent;
size_t rdsize;

size = ompi_comm_size(comm);
rank = ompi_comm_rank(comm);

/* Everyone but root sends data and returns. Don't send anything
for sendcounts of 0 (even though MPI_Gatherv has a guard for 0
counts, this routine is used elsewhere, like the implementation
of allgatherv, so it's possible to get here with a scount of
0) */
if (root == rank) {
/* Root receives from everyone else */
ompi_datatype_type_size(rdtype, &rdsize);
if (OPAL_UNLIKELY(0 == rdsize)) {
/* bozzo case */
return MPI_SUCCESS;
}

if (rank != root) {
size_t sdsize;
ompi_datatype_type_size(sdtype, &sdsize);
if (scount > 0 && sdsize > 0) {
return MCA_PML_CALL(send(sbuf, scount, sdtype, root,
MCA_COLL_BASE_TAG_GATHERV,
MCA_PML_BASE_SEND_STANDARD, comm));
err = ompi_datatype_get_extent(rdtype, &lb, &extent);
if (OMPI_SUCCESS != err) {
return OMPI_ERROR;
}
return MPI_SUCCESS;
}

/* I am the root, loop receiving data. */
if (MPI_IN_PLACE != sbuf && (0 < scount) && (0 < rcounts[rank])) {
/* Directly copy self sbuf to rbuf */
err = ompi_datatype_sndrcv(sbuf, scount, sdtype,
((char *) rbuf) + (extent * disps[rank]), rcounts[rank],
rdtype);
if (MPI_SUCCESS != err) {
return err;
}
}

ompi_datatype_type_size(rdtype, &rdsize);
if (OPAL_UNLIKELY(0 == rdsize)) {
/* bozzo case */
return MPI_SUCCESS;
}
ompi_request_t **reqs;
size_t nrecv = 0, recv_iter = 0;

err = ompi_datatype_get_extent(rdtype, &lb, &extent);
if (OMPI_SUCCESS != err) {
return OMPI_ERROR;
}
for (i = 0; i < size; ++i) {
/* We directly copied the data from self */
if (0 < rcounts[i] && rank != i) {
++nrecv;
}
}

for (i = 0; i < size; ++i) {
ptmp = ((char *) rbuf) + (extent * disps[i]);
if (0 == nrecv) {
/* Nothing to receive */
return MPI_SUCCESS;
}

if (i == rank) {
/* simple optimization */
if (MPI_IN_PLACE != sbuf && (0 < scount) && (0 < rcounts[i])) {
err = ompi_datatype_sndrcv(sbuf, scount, sdtype,
ptmp, rcounts[i], rdtype);
}
} else {
reqs = ompi_coll_base_comm_get_reqs(module->base_data, nrecv);

for (i = 1; i < size; ++i) {
peer = (rank + i) % size;
ptmp = ((char *) rbuf) + (extent * disps[peer]);
/* Only receive if there is something to receive */
if (rcounts[i] > 0) {
err = MCA_PML_CALL(recv(ptmp, rcounts[i], rdtype, i,
MCA_COLL_BASE_TAG_GATHERV,
comm, MPI_STATUS_IGNORE));
if (0 < rcounts[peer]) {
err = MCA_PML_CALL(irecv(ptmp, rcounts[peer], rdtype, peer,
MCA_COLL_BASE_TAG_GATHERV, comm, &reqs[recv_iter++]));
}
}

if (MPI_SUCCESS != err) {
return err;
assert(nrecv == recv_iter);

err = ompi_request_wait_all(nrecv, reqs, MPI_STATUSES_IGNORE);

if (MPI_ERR_IN_STATUS == err) {
for (int i = 0; i < nrecv; i++) {
if (MPI_REQUEST_NULL == reqs[i])
continue;
if (MPI_ERR_PENDING == reqs[i]->req_status.MPI_ERROR)
continue;
if (MPI_SUCCESS != reqs[i]->req_status.MPI_ERROR) {
err = reqs[i]->req_status.MPI_ERROR;
break;
}
}
}

ompi_coll_base_free_reqs(reqs, nrecv);
return err;
}

/* All done */
/* Everyone but root sends data and returns. Don't send anything
for sendcounts of 0 (even though MPI_Gatherv has a guard for 0
counts, this routine is used elsewhere, like the implementation
of allgatherv, so it's possible to get here with a scount of
0) */

size_t sdsize;
ompi_datatype_type_size(sdtype, &sdsize);
if (scount > 0 && sdsize > 0) {
return MCA_PML_CALL(send(sbuf, scount, sdtype, root, MCA_COLL_BASE_TAG_GATHERV,
MCA_PML_BASE_SEND_STANDARD, comm));
}
return MPI_SUCCESS;
}


/*
* gatherv_inter
*
Expand Down