diff --git a/ompi/mca/common/ompio/common_ompio.h b/ompi/mca/common/ompio/common_ompio.h index 70860a4238e..4be90e27ec3 100644 --- a/ompi/mca/common/ompio/common_ompio.h +++ b/ompi/mca/common/ompio/common_ompio.h @@ -14,6 +14,7 @@ * Copyright (c) 2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2018 DataDirect Networks. All rights reserved. + * Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -101,6 +102,8 @@ #define OMPIO_PERM_NULL -1 #define OMPIO_IOVEC_INITIAL_SIZE 100 +extern opal_mutex_t mca_common_ompio_mutex; + enum ompio_fs_type { NONE = 0, @@ -274,7 +277,7 @@ OMPI_DECLSPEC int mca_common_ompio_file_iwrite_at_all (ompio_file_t *fp, OMPI_MP OMPI_DECLSPEC int mca_common_ompio_build_io_array ( ompio_file_t *fh, int index, int cycles, size_t bytes_per_cycle, size_t max_data, uint32_t iov_count, - struct iovec *decoded_iov, int *ii, int *jj, size_t *tbw, + struct iovec *decoded_iov, int *ii, size_t *tbw, size_t *spc, mca_common_ompio_io_array_t **io_array, int *num_io_entries ); diff --git a/ompi/mca/common/ompio/common_ompio_buffer.h b/ompi/mca/common/ompio/common_ompio_buffer.h index ba94c6ca80b..1d1d89eb42e 100644 --- a/ompi/mca/common/ompio/common_ompio_buffer.h +++ b/ompi/mca/common/ompio/common_ompio_buffer.h @@ -31,14 +31,10 @@ opal_output(1, "common_ompio: error allocating memory\n"); \ return OMPI_ERR_OUT_OF_RESOURCE; \ } \ - _decoded_iov = (struct iovec *) malloc ( sizeof ( struct iovec )); \ - if ( NULL == _decoded_iov ) { \ - opal_output(1, "common_ompio: could not allocate memory.\n"); \ - return OMPI_ERR_OUT_OF_RESOURCE; \ - } \ - _decoded_iov->iov_base = _tbuf; \ - _decoded_iov->iov_len = _max_data; \ - _iov_count=1;} + if (NULL != _decoded_iov) { \ + ((struct iovec*)_decoded_iov)->iov_base = _tbuf; \ + ((struct iovec*)_decoded_iov)->iov_len = _max_data; \ + _iov_count=1;}} #define OMPIO_PREPARE_READ_BUF(_fh,_buf,_count,_datatype,_tbuf,_convertor,_max_data,_tmp_buf_size,_decoded_iov,_iov_count){ \ OBJ_CONSTRUCT( _convertor, opal_convertor_t); \ @@ -49,14 +45,10 @@ opal_output(1, "common_ompio: error allocating memory\n"); \ return OMPI_ERR_OUT_OF_RESOURCE; \ } \ - _decoded_iov = (struct iovec *) malloc ( sizeof ( struct iovec )); \ - if ( NULL == _decoded_iov ) { \ - opal_output(1, "common_ompio: could not allocate memory.\n"); \ - return OMPI_ERR_OUT_OF_RESOURCE; \ - } \ - _decoded_iov->iov_base = _tbuf; \ - _decoded_iov->iov_len = _max_data; \ - _iov_count=1;} + if (NULL != _decoded_iov) { \ + ((struct iovec*)_decoded_iov)->iov_base = _tbuf; \ + ((struct iovec*)_decoded_iov)->iov_len = _max_data; \ + _iov_count=1;}} void mca_common_ompio_check_gpu_buf ( ompio_file_t *fh, const void *buf, int *is_gpu, int *is_managed); diff --git a/ompi/mca/common/ompio/common_ompio_file_open.c b/ompi/mca/common/ompio/common_ompio_file_open.c index 528c597d191..924994b34e3 100644 --- a/ompi/mca/common/ompio/common_ompio_file_open.c +++ b/ompi/mca/common/ompio/common_ompio_file_open.c @@ -48,6 +48,12 @@ static mca_common_ompio_generate_current_file_view_fn_t generate_current_file_view_fn; static mca_common_ompio_get_mca_parameter_value_fn_t get_mca_parameter_value_fn; +/* + * Global, component-wide OMPIO mutex + */ +opal_mutex_t mca_common_ompio_mutex = {{0}}; + + int mca_common_ompio_file_open (ompi_communicator_t *comm, const char *filename, int amode, diff --git a/ompi/mca/common/ompio/common_ompio_file_read.c b/ompi/mca/common/ompio/common_ompio_file_read.c index 479e98223e2..83bb38bf062 100644 --- a/ompi/mca/common/ompio/common_ompio_file_read.c +++ b/ompi/mca/common/ompio/common_ompio_file_read.c @@ -124,7 +124,6 @@ int mca_common_ompio_file_read_default (ompio_file_t *fh, void *buf, size_t spc=0; ssize_t ret_code=0; int i = 0; /* index into the decoded iovec of the buffer */ - int j = 0; /* index into the file via iovec */ mca_common_ompio_decode_datatype (fh, datatype, count, buf, &max_data, fh->f_mem_convertor, @@ -138,11 +137,10 @@ int mca_common_ompio_file_read_default (ompio_file_t *fh, void *buf, cycles, max_data); #endif - j = fh->f_index_in_file_view; for (index = 0; index < cycles; index++) { mca_common_ompio_build_io_array (fh, index, cycles, bytes_per_cycle, max_data, iov_count, decoded_iov, - &i, &j, &total_bytes_read, &spc, + &i, &total_bytes_read, &spc, &fh->f_io_array, &fh->f_num_of_io_entries); if (fh->f_num_of_io_entries == 0) { ret_code = 0; @@ -182,13 +180,12 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf, int index = 0; int cycles = 0; uint32_t iov_count = 0; - struct iovec *decoded_iov = NULL; + struct iovec decoded_iov; size_t max_data=0, real_bytes_read=0; size_t spc=0; ssize_t ret_code=0; int i = 0; /* index into the decoded iovec of the buffer */ - int j = 0; /* index into the file via iovec */ char *tbuf1=NULL, *tbuf2=NULL; char *unpackbuf=NULL, *readbuf=NULL; @@ -198,7 +195,7 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf, bytes_per_cycle = OMPIO_MCA_GET(fh, pipeline_buffer_size); OMPIO_PREPARE_READ_BUF (fh, buf, count, datatype, tbuf1, &convertor, - max_data, bytes_per_cycle, decoded_iov, iov_count); + max_data, bytes_per_cycle, &decoded_iov, iov_count); cycles = ceil((double)max_data/bytes_per_cycle); readbuf = unpackbuf = tbuf1; @@ -206,7 +203,6 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf, tbuf2 = mca_common_ompio_alloc_buf (fh, bytes_per_cycle); if (NULL == tbuf2) { opal_output(1, "common_ompio: error allocating memory\n"); - free (decoded_iov); return OMPI_ERR_OUT_OF_RESOURCE; } unpackbuf = tbuf2; @@ -236,15 +232,14 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf, ** - unpack buffer i */ - j = fh->f_index_in_file_view; if (can_overlap) { mca_common_ompio_register_progress (); } for (index = 0; index < cycles+1; index++) { if (index < cycles) { - decoded_iov->iov_base = readbuf; - decoded_iov->iov_len = bytes_per_cycle; + decoded_iov.iov_base = readbuf; + decoded_iov.iov_len = bytes_per_cycle; bytes_this_cycle = (index == cycles-1) ? (max_data - (index * bytes_per_cycle)) : bytes_per_cycle; @@ -255,7 +250,7 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf, mca_common_ompio_build_io_array (fh, index, cycles, bytes_per_cycle, bytes_this_cycle, iov_count, - decoded_iov, &i, &j, &tbr, &spc, + &decoded_iov, &i, &tbr, &spc, &fh->f_io_array, &fh->f_num_of_io_entries); if (fh->f_num_of_io_entries == 0) { ret_code = 0; @@ -263,7 +258,7 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf, } if (can_overlap) { - mca_common_ompio_request_alloc ( &ompio_req, MCA_OMPIO_REQUEST_READ); + mca_common_ompio_request_alloc (&ompio_req, MCA_OMPIO_REQUEST_READ); fh->f_fbtl->fbtl_ipreadv (fh, (ompi_request_t *)ompio_req); } else { ret_code = fh->f_fbtl->fbtl_preadv (fh); @@ -293,9 +288,9 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf, if ((can_overlap && index != 0) || (!can_overlap && index < cycles)) { size_t pos = 0; - decoded_iov->iov_base = unpackbuf; - decoded_iov->iov_len = can_overlap ? bytes_prev_cycle : bytes_this_cycle; - opal_convertor_unpack (&convertor, decoded_iov, &iov_count, &pos); + decoded_iov.iov_base = unpackbuf; + decoded_iov.iov_len = can_overlap ? bytes_prev_cycle : bytes_this_cycle; + opal_convertor_unpack (&convertor, &decoded_iov, &iov_count, &pos); } fh->f_num_of_io_entries = 0; @@ -317,7 +312,6 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf, mca_common_ompio_release_buf (fh, tbuf2); } - free (decoded_iov); if ( MPI_STATUS_IGNORE != status ) { status->_ucount = real_bytes_read; } @@ -352,53 +346,95 @@ int mca_common_ompio_file_read_at (ompio_file_t *fh, return ret; } +static void mca_common_ompio_post_next_read_subreq(struct mca_ompio_request_t *req, int index) +{ + uint32_t iov_count = 1; + size_t pos = 0, spc = 0, tbw = 0; + int i = 0; + mca_ompio_request_t *ompio_subreq=NULL; + size_t bytes_per_cycle = OMPIO_MCA_GET(req->req_fh, pipeline_buffer_size); + struct iovec decoded_iov; + + /* Step 1: finish index-1 unpack operation */ + if (index - 1 >= 0) { + size_t num_bytes = bytes_per_cycle; + /** + * should really be 'req_num_subreqs -1 == index -1' + * which is the same as below. + */ + if (req->req_num_subreqs == index) { + num_bytes = req->req_max_data - (index-1)* bytes_per_cycle; + } + decoded_iov.iov_base = req->req_tbuf; + decoded_iov.iov_len = num_bytes; + opal_convertor_unpack (&req->req_convertor, &decoded_iov, &iov_count, &pos); + } + + /* Step 2: post next iread subrequest */ + if (req->req_num_subreqs == index) { + /* all done */ + return; + } + + decoded_iov.iov_base = req->req_tbuf; + decoded_iov.iov_len = (req->req_num_subreqs-1 == index) ? + req->req_max_data - (index* bytes_per_cycle) : req->req_size; + mca_common_ompio_build_io_array (req->req_fh, index, req->req_num_subreqs, + bytes_per_cycle, decoded_iov.iov_len, + iov_count, &decoded_iov, + &i, &tbw, &spc, + &req->req_fh->f_io_array, + &req->req_fh->f_num_of_io_entries); + + mca_common_ompio_request_alloc ( &ompio_subreq, MCA_OMPIO_REQUEST_READ); + ompio_subreq->req_parent = req; + req->req_fh->f_fbtl->fbtl_ipreadv (req->req_fh, (ompi_request_t *)ompio_subreq); + + free(req->req_fh->f_io_array); + req->req_fh->f_io_array = NULL; + req->req_fh->f_num_of_io_entries = 0; +} int mca_common_ompio_file_iread (ompio_file_t *fh, - void *buf, - int count, - struct ompi_datatype_t *datatype, - ompi_request_t **request) + void *buf, + int count, + struct ompi_datatype_t *datatype, + ompi_request_t **request) { int ret = OMPI_SUCCESS; mca_ompio_request_t *ompio_req=NULL; - size_t spc=0; + struct iovec *decoded_iov = NULL; if (fh->f_amode & MPI_MODE_WRONLY){ -// opal_output(10, "Improper use of FILE Mode, Using WRONLY for Read!\n"); ret = MPI_ERR_ACCESS; - return ret; + return ret; } - mca_common_ompio_request_alloc ( &ompio_req, MCA_OMPIO_REQUEST_READ); + mca_common_ompio_request_alloc (&ompio_req, MCA_OMPIO_REQUEST_READ); - if ( 0 == count ) { + if (0 == count || 0 == fh->f_iov_count) { ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; ompio_req->req_ompi.req_status._ucount = 0; ompi_request_complete (&ompio_req->req_ompi, false); *request = (ompi_request_t *) ompio_req; - + return OMPI_SUCCESS; } - if ( NULL != fh->f_fbtl->fbtl_ipreadv ) { + if (NULL != fh->f_fbtl->fbtl_ipreadv) { // This fbtl has support for non-blocking operations - - size_t total_bytes_read = 0; /* total bytes that have been read*/ uint32_t iov_count = 0; - struct iovec *decoded_iov = NULL; - size_t max_data = 0; - int i = 0; /* index into the decoded iovec of the buffer */ - int j = 0; /* index into the file via iovec */ - bool need_to_copy = false; - int is_gpu, is_managed; - mca_common_ompio_check_gpu_buf ( fh, buf, &is_gpu, &is_managed); - if ( is_gpu && !is_managed ) { + + mca_common_ompio_check_gpu_buf (fh, buf, &is_gpu, &is_managed); + if (is_gpu && !is_managed) { need_to_copy = true; } + mca_common_ompio_register_progress (); + if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) && !(datatype == &ompi_mpi_byte.dt || datatype == &ompi_mpi_char.dt )) { @@ -411,88 +447,61 @@ int mca_common_ompio_file_iread (ompio_file_t *fh, need_to_copy = true; } - if ( need_to_copy ) { - char *tbuf=NULL; + if (need_to_copy) { + size_t pipeline_buf_size = OMPIO_MCA_GET(fh, pipeline_buffer_size); - OMPIO_PREPARE_READ_BUF(fh, buf, count, datatype, tbuf, &ompio_req->req_convertor, - max_data, 0, decoded_iov, iov_count); - - ompio_req->req_tbuf = tbuf; - ompio_req->req_size = max_data; + OMPIO_PREPARE_READ_BUF(fh, buf, count, datatype, ompio_req->req_tbuf, + &ompio_req->req_convertor, max_data, + pipeline_buf_size, NULL, iov_count); + + ompio_req->req_num_subreqs = ceil((double)max_data/pipeline_buf_size); + ompio_req->req_size = pipeline_buf_size; + ompio_req->req_max_data = max_data; + ompio_req->req_post_next_subreq = mca_common_ompio_post_next_read_subreq; + ompio_req->req_fh = fh; + ompio_req->req_ompi.req_status.MPI_ERROR = MPI_SUCCESS; + + mca_common_ompio_post_next_read_subreq (ompio_req, 0); } else { - mca_common_ompio_decode_datatype (fh, - datatype, - count, - buf, - &max_data, - fh->f_mem_convertor, - &decoded_iov, - &iov_count); - } - - if ( 0 < max_data && 0 == fh->f_iov_count ) { - ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; - ompio_req->req_ompi.req_status._ucount = 0; - ompi_request_complete (&ompio_req->req_ompi, false); - *request = (ompi_request_t *) ompio_req; - if (NULL != decoded_iov) { - free (decoded_iov); - decoded_iov = NULL; - } - - return OMPI_SUCCESS; - } - - // Non-blocking operations have to occur in a single cycle - j = fh->f_index_in_file_view; - - mca_common_ompio_build_io_array ( fh, - 0, // index - 1, // no. of cyces - max_data, // setting bytes per cycle to match data - max_data, - iov_count, - decoded_iov, - &i, - &j, - &total_bytes_read, - &spc, - &fh->f_io_array, - &fh->f_num_of_io_entries); - - if (fh->f_num_of_io_entries) { - fh->f_fbtl->fbtl_ipreadv (fh, (ompi_request_t *) ompio_req); - } - - mca_common_ompio_register_progress (); - - fh->f_num_of_io_entries = 0; - if (NULL != fh->f_io_array) { - free (fh->f_io_array); - fh->f_io_array = NULL; - } + int i = 0; + size_t spc = 0, tbr = 0; + mca_common_ompio_decode_datatype (fh, datatype, count, buf, + &max_data, fh->f_mem_convertor, + &decoded_iov, &iov_count); + + /** + * Non-blocking operations have to occur in a single cycle + * If the f_io_array is too long, the fbtl will chunk it up + * internally. + */ + mca_common_ompio_build_io_array (fh, 0, 1, max_data, max_data, + iov_count, decoded_iov, &i, + &tbr, &spc, + &fh->f_io_array, &fh->f_num_of_io_entries); - if (NULL != decoded_iov) { - free (decoded_iov); - decoded_iov = NULL; - } + fh->f_fbtl->fbtl_ipreadv (fh, (ompi_request_t *) ompio_req); + } } else { - // This fbtl does not support non-blocking operations - ompi_status_public_t status; - ret = mca_common_ompio_file_read (fh, buf, count, datatype, &status); + // This fbtl does not support non-blocking operations + ompi_status_public_t status; + ret = mca_common_ompio_file_read (fh, buf, count, datatype, &status); - ompio_req->req_ompi.req_status.MPI_ERROR = ret; - ompio_req->req_ompi.req_status._ucount = status._ucount; - ompi_request_complete (&ompio_req->req_ompi, false); + ompio_req->req_ompi.req_status.MPI_ERROR = ret; + ompio_req->req_ompi.req_status._ucount = status._ucount; + ompi_request_complete (&ompio_req->req_ompi, false); } + fh->f_num_of_io_entries = 0; + free (fh->f_io_array); + fh->f_io_array = NULL; + free (decoded_iov); + *request = (ompi_request_t *) ompio_req; return ret; } - int mca_common_ompio_file_iread_at (ompio_file_t *fh, OMPI_MPI_OFFSET_TYPE offset, void *buf, @@ -554,24 +563,20 @@ int mca_common_ompio_file_read_all (ompio_file_t *fh, size_t pos=0, max_data=0; char *tbuf=NULL; opal_convertor_t convertor; - struct iovec *decoded_iov = NULL; + struct iovec decoded_iov; uint32_t iov_count = 0; OMPIO_PREPARE_READ_BUF(fh, buf, count, datatype, tbuf, &convertor, - max_data, 0, decoded_iov, iov_count); + max_data, 0, &decoded_iov, iov_count); ret = fh->f_fcoll->fcoll_file_read_all (fh, - decoded_iov->iov_base, - decoded_iov->iov_len, + decoded_iov.iov_base, + decoded_iov.iov_len, MPI_BYTE, status); - opal_convertor_unpack (&convertor, decoded_iov, &iov_count, &pos ); + opal_convertor_unpack (&convertor, &decoded_iov, &iov_count, &pos ); opal_convertor_cleanup (&convertor); - mca_common_ompio_release_buf (fh, decoded_iov->iov_base); - if (NULL != decoded_iov) { - free (decoded_iov); - decoded_iov = NULL; - } + mca_common_ompio_release_buf (fh, decoded_iov.iov_base); } else { ret = fh->f_fcoll->fcoll_file_read_all (fh, diff --git a/ompi/mca/common/ompio/common_ompio_file_write.c b/ompi/mca/common/ompio/common_ompio_file_write.c index e2ceb8187c9..32848c09e3e 100644 --- a/ompi/mca/common/ompio/common_ompio_file_write.c +++ b/ompi/mca/common/ompio/common_ompio_file_write.c @@ -107,7 +107,6 @@ int mca_common_ompio_file_write_default (ompio_file_t *fh, ssize_t ret_code = 0; size_t spc = 0; int i = 0; /* index into the decoded iovec of the buffer */ - int j = 0; /* index into the file view iovec */ mca_common_ompio_decode_datatype (fh, datatype, count, buf, &max_data, @@ -117,12 +116,11 @@ int mca_common_ompio_file_write_default (ompio_file_t *fh, bytes_per_cycle = OMPIO_MCA_GET(fh, cycle_buffer_size); cycles = ceil((double)max_data/bytes_per_cycle); - j = fh->f_index_in_file_view; for (index = 0; index < cycles; index++) { mca_common_ompio_build_io_array ( fh, index, cycles, bytes_per_cycle, max_data, iov_count, decoded_iov, - &i, &j, &total_bytes_written, &spc, + &i, &total_bytes_written, &spc, &fh->f_io_array, &fh->f_num_of_io_entries); if (fh->f_num_of_io_entries == 0) { ret_code = 0; @@ -162,13 +160,12 @@ int mca_common_ompio_file_write_pipelined (ompio_file_t *fh, int cycles = 0; uint32_t iov_count = 0; - struct iovec *decoded_iov = NULL; + struct iovec decoded_iov; size_t bytes_per_cycle=0, tbw = 0; size_t max_data=0, real_bytes_written=0; ssize_t ret_code=0; size_t spc=0; int i = 0; /* index into the decoded iovec of the buffer */ - int j = 0; /* index into the file view iovec */ size_t pos=0; char *tbuf1=NULL, *tbuf2=NULL; @@ -179,7 +176,7 @@ int mca_common_ompio_file_write_pipelined (ompio_file_t *fh, bytes_per_cycle = OMPIO_MCA_GET(fh, pipeline_buffer_size); OMPIO_PREPARE_BUF (fh, buf, count, datatype, tbuf1, &convertor, - max_data, bytes_per_cycle, decoded_iov, iov_count); + max_data, bytes_per_cycle, &decoded_iov, iov_count); cycles = ceil((double)max_data/bytes_per_cycle); packbuf = tbuf1; @@ -188,7 +185,6 @@ int mca_common_ompio_file_write_pipelined (ompio_file_t *fh, tbuf2 = mca_common_ompio_alloc_buf (fh, bytes_per_cycle); if (NULL == tbuf2) { opal_output(1, "common_ompio: error allocating memory\n"); - free (decoded_iov); return OMPI_ERR_OUT_OF_RESOURCE; } writebuf = tbuf2; @@ -213,25 +209,24 @@ int mca_common_ompio_file_write_pipelined (ompio_file_t *fh, ** - post pwrite for iter i */ - j = fh->f_index_in_file_view; if (can_overlap) { mca_common_ompio_register_progress (); } for (index = 0; index <= cycles; index++) { if (index < cycles) { - decoded_iov->iov_base = packbuf; - decoded_iov->iov_len = bytes_per_cycle; + decoded_iov.iov_base = packbuf; + decoded_iov.iov_len = bytes_per_cycle; iov_count = 1; - opal_convertor_pack (&convertor, decoded_iov, &iov_count, &pos); + opal_convertor_pack (&convertor, &decoded_iov, &iov_count, &pos); spc = 0; tbw = 0; i = 0; mca_common_ompio_build_io_array (fh, index, cycles, bytes_per_cycle, pos, - iov_count, decoded_iov, - &i, &j, &tbw, &spc, + iov_count, &decoded_iov, + &i, &tbw, &spc, &fh->f_io_array, &fh->f_num_of_io_entries); if (fh->f_num_of_io_entries== 0) { ret_code = 0; @@ -283,7 +278,6 @@ int mca_common_ompio_file_write_pipelined (ompio_file_t *fh, } opal_convertor_cleanup (&convertor); - free (decoded_iov); if ( MPI_STATUS_IGNORE != status ) { status->_ucount = real_bytes_written; @@ -316,42 +310,72 @@ int mca_common_ompio_file_write_at (ompio_file_t *fh, return ret; } +static void mca_common_ompio_post_next_write_subreq(struct mca_ompio_request_t *req, int index) +{ + uint32_t iov_count = 1; + size_t bytes_per_cycle = OMPIO_MCA_GET(req->req_fh, pipeline_buffer_size); + size_t pos=0, spc = 0, tbw = 0; + int i = 0; + mca_ompio_request_t *ompio_subreq=NULL; + struct iovec decoded_iov; + + if (req->req_num_subreqs == index) { + /* all done */ + return; + } + + decoded_iov.iov_base = req->req_tbuf; + decoded_iov.iov_len = req->req_size; + opal_convertor_pack (&req->req_convertor, &decoded_iov, &iov_count, &pos); + mca_common_ompio_build_io_array (req->req_fh, index, req->req_num_subreqs, + bytes_per_cycle, pos, + iov_count, &decoded_iov, + &i, &tbw, &spc, + &req->req_fh->f_io_array, + &req->req_fh->f_num_of_io_entries); + + mca_common_ompio_request_alloc (&ompio_subreq, MCA_OMPIO_REQUEST_WRITE); + ompio_subreq->req_parent = req; + req->req_fh->f_fbtl->fbtl_ipwritev (req->req_fh, (ompi_request_t *)ompio_subreq); + + free(req->req_fh->f_io_array); + req->req_fh->f_io_array = NULL; + req->req_fh->f_num_of_io_entries = 0; +} + int mca_common_ompio_file_iwrite (ompio_file_t *fh, - const void *buf, - int count, - struct ompi_datatype_t *datatype, - ompi_request_t **request) + const void *buf, + int count, + struct ompi_datatype_t *datatype, + ompi_request_t **request) { int ret = OMPI_SUCCESS; mca_ompio_request_t *ompio_req=NULL; + struct iovec *decoded_iov = NULL; size_t spc=0; if (fh->f_amode & MPI_MODE_RDONLY){ ret = MPI_ERR_READ_ONLY; - return ret; + return ret; } - - mca_common_ompio_request_alloc ( &ompio_req, MCA_OMPIO_REQUEST_WRITE); - if ( 0 == count ) { + mca_common_ompio_request_alloc (&ompio_req, MCA_OMPIO_REQUEST_WRITE); + + if (0 == count || 0 == fh->f_iov_count) { ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; ompio_req->req_ompi.req_status._ucount = 0; ompi_request_complete (&ompio_req->req_ompi, false); *request = (ompi_request_t *) ompio_req; - + return OMPI_SUCCESS; } - if ( NULL != fh->f_fbtl->fbtl_ipwritev ) { + if (NULL != fh->f_fbtl->fbtl_ipwritev) { /* This fbtl has support for non-blocking operations */ - uint32_t iov_count = 0; - struct iovec *decoded_iov = NULL; size_t max_data = 0; size_t total_bytes_written =0; int i = 0; /* index into the decoded iovec of the buffer */ - int j = 0; /* index into the file via iovec */ - bool need_to_copy = false; int is_gpu, is_managed; @@ -359,10 +383,11 @@ int mca_common_ompio_file_iwrite (ompio_file_t *fh, if ( is_gpu && !is_managed ) { need_to_copy = true; } + mca_common_ompio_register_progress (); - if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) && + if ( !(fh->f_flags & OMPIO_DATAREP_NATIVE) && !(datatype == &ompi_mpi_byte.dt || - datatype == &ompi_mpi_char.dt )) { + datatype == &ompi_mpi_char.dt) ) { /* only need to copy if any of these conditions are given: 1. buffer is an unmanaged device buffer (checked above). 2. Datarepresentation is anything other than 'native' and @@ -372,77 +397,40 @@ int mca_common_ompio_file_iwrite (ompio_file_t *fh, need_to_copy = true; } - if ( need_to_copy ) { - size_t pos=0; - char *tbuf=NULL; - opal_convertor_t convertor; - - OMPIO_PREPARE_BUF (fh, buf, count, datatype, tbuf, &convertor, - max_data, 0, decoded_iov, iov_count); - opal_convertor_pack (&convertor, decoded_iov, &iov_count, &pos ); - opal_convertor_cleanup (&convertor); - - ompio_req->req_tbuf = tbuf; - ompio_req->req_size = max_data; + if (need_to_copy) { + size_t pipeline_buf_size = OMPIO_MCA_GET(fh, pipeline_buffer_size); + OMPIO_PREPARE_BUF (fh, buf, count, datatype, ompio_req->req_tbuf, + &ompio_req->req_convertor, max_data, + pipeline_buf_size, NULL, iov_count); + + ompio_req->req_num_subreqs = ceil((double)max_data/pipeline_buf_size); + ompio_req->req_size = pipeline_buf_size; + ompio_req->req_max_data = max_data; + ompio_req->req_post_next_subreq = mca_common_ompio_post_next_write_subreq; + ompio_req->req_fh = fh; + ompio_req->req_ompi.req_status.MPI_ERROR = MPI_SUCCESS; + + mca_common_ompio_post_next_write_subreq (ompio_req, 0); } else { - mca_common_ompio_decode_datatype (fh, - datatype, - count, - buf, - &max_data, + mca_common_ompio_decode_datatype (fh, datatype, count, + buf, &max_data, fh->f_mem_convertor, - &decoded_iov, - &iov_count); - } + &decoded_iov, &iov_count); + + /** + * Non blocking operations have to occur in a single cycle + * If the f_io_array is too long, the fbtl will chunk it up + * internally. + */ + mca_common_ompio_build_io_array ( fh, 0, 1, max_data, max_data, + iov_count, decoded_iov, + &i, &total_bytes_written, &spc, + &fh->f_io_array, &fh->f_num_of_io_entries); - if ( 0 < max_data && 0 == fh->f_iov_count ) { - ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; - ompio_req->req_ompi.req_status._ucount = 0; - ompi_request_complete (&ompio_req->req_ompi, false); - *request = (ompi_request_t *) ompio_req; - if (NULL != decoded_iov) { - free (decoded_iov); - decoded_iov = NULL; - } - - return OMPI_SUCCESS; - } - - j = fh->f_index_in_file_view; - - /* Non blocking operations have to occur in a single cycle */ - mca_common_ompio_build_io_array ( fh, - 0, // index of current cycle iteration - 1, // number of cycles - max_data, // setting bytes_per_cycle to max_data - max_data, - iov_count, - decoded_iov, - &i, - &j, - &total_bytes_written, - &spc, - &fh->f_io_array, - &fh->f_num_of_io_entries); - - if (fh->f_num_of_io_entries) { fh->f_fbtl->fbtl_ipwritev (fh, (ompi_request_t *) ompio_req); } - - mca_common_ompio_register_progress (); - - fh->f_num_of_io_entries = 0; - if (NULL != fh->f_io_array) { - free (fh->f_io_array); - fh->f_io_array = NULL; - } - if (NULL != decoded_iov) { - free (decoded_iov); - decoded_iov = NULL; - } - } - else { + } else { // This fbtl does not support non-blocking write operations ompi_status_public_t status; ret = mca_common_ompio_file_write(fh,buf,count,datatype, &status); @@ -452,6 +440,11 @@ int mca_common_ompio_file_iwrite (ompio_file_t *fh, ompi_request_complete (&ompio_req->req_ompi, false); } + fh->f_num_of_io_entries = 0; + free (fh->f_io_array); + fh->f_io_array = NULL; + free (decoded_iov); + *request = (ompi_request_t *) ompio_req; return ret; } @@ -516,26 +509,22 @@ int mca_common_ompio_file_write_all (ompio_file_t *fh, size_t pos=0, max_data=0; char *tbuf=NULL; opal_convertor_t convertor; - struct iovec *decoded_iov = NULL; + struct iovec decoded_iov; uint32_t iov_count = 0; OMPIO_PREPARE_BUF (fh, buf, count, datatype, tbuf, &convertor, - max_data, 0, decoded_iov, iov_count); - opal_convertor_pack (&convertor, decoded_iov, &iov_count, &pos ); - opal_convertor_cleanup ( &convertor); + max_data, 0, &decoded_iov, iov_count); + opal_convertor_pack (&convertor, &decoded_iov, &iov_count, &pos ); + opal_convertor_cleanup (&convertor); ret = fh->f_fcoll->fcoll_file_write_all (fh, - decoded_iov->iov_base, - decoded_iov->iov_len, + decoded_iov.iov_base, + decoded_iov.iov_len, MPI_BYTE, status); - mca_common_ompio_release_buf (fh, decoded_iov->iov_base); - if (NULL != decoded_iov) { - free (decoded_iov); - decoded_iov = NULL; - } + mca_common_ompio_release_buf (fh, decoded_iov.iov_base); } else { ret = fh->f_fcoll->fcoll_file_write_all (fh, @@ -622,7 +611,7 @@ int mca_common_ompio_file_iwrite_at_all (ompio_file_t *fp, int mca_common_ompio_build_io_array ( ompio_file_t *fh, int index, int cycles, size_t bytes_per_cycle, size_t max_data, uint32_t iov_count, - struct iovec *decoded_iov, int *ii, int *jj, size_t *tbw, + struct iovec *decoded_iov, int *ii, size_t *tbw, size_t *spc, mca_common_ompio_io_array_t **io_array, int *num_io_entries) { @@ -636,7 +625,7 @@ int mca_common_ompio_build_io_array ( ompio_file_t *fh, int index, int cycles, size_t sum_previous_length = 0; int k = 0; /* index into the io_array */ int i = *ii; - int j = *jj; + int j = fh->f_index_in_file_view; mca_common_ompio_io_array_t *f_io_array=NULL; int f_num_io_entries=0; @@ -742,7 +731,6 @@ int mca_common_ompio_build_io_array ( ompio_file_t *fh, int index, int cycles, } #endif *ii = i; - *jj = j; *tbw = total_bytes_written; *spc = sum_previous_counts; *io_array = f_io_array; diff --git a/ompi/mca/common/ompio/common_ompio_request.c b/ompi/mca/common/ompio/common_ompio_request.c index 40b99043da9..09cafb787ee 100644 --- a/ompi/mca/common/ompio/common_ompio_request.c +++ b/ompi/mca/common/ompio/common_ompio_request.c @@ -11,6 +11,7 @@ * Copyright (c) 2004-2005 The Regents of the University of California. * All rights reserved. * Copyright (c) 2008-2019 University of Houston. All rights reserved. + * Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -30,8 +31,6 @@ bool mca_common_ompio_progress_is_registered=false; */ opal_list_t mca_common_ompio_pending_requests = {{0}}; - - static int mca_common_ompio_request_free ( struct ompi_request_t **req) { mca_ompio_request_t *ompio_req = ( mca_ompio_request_t *)*req; @@ -69,19 +68,27 @@ OBJ_CLASS_INSTANCE(mca_ompio_request_t, ompi_request_t, void mca_common_ompio_request_construct(mca_ompio_request_t* req) { OMPI_REQUEST_INIT (&(req->req_ompi), false ); - req->req_ompi.req_free = mca_common_ompio_request_free; - req->req_ompi.req_cancel = mca_common_ompio_request_cancel; - req->req_ompi.req_type = OMPI_REQUEST_IO; - req->req_data = NULL; - req->req_tbuf = NULL; - req->req_size = 0; - req->req_progress_fn = NULL; - req->req_free_fn = NULL; + req->req_ompi.req_free = mca_common_ompio_request_free; + req->req_ompi.req_cancel = mca_common_ompio_request_cancel; + req->req_ompi.req_type = OMPI_REQUEST_IO; + req->req_data = NULL; + req->req_tbuf = NULL; + req->req_size = 0; + req->req_max_data = 0; + req->req_progress_fn = NULL; + req->req_free_fn = NULL; + req->req_parent = NULL; + req->req_post_next_subreq = NULL; + req->req_num_subreqs = 0; + req->req_subreqs_completed = 0; + req->req_fh = NULL; + req->req_post_followup = false; OBJ_CONSTRUCT(&req->req_item, opal_list_item_t); opal_list_append (&mca_common_ompio_pending_requests, &req->req_item); return; } + void mca_common_ompio_request_destruct(mca_ompio_request_t* req) { OMPI_REQUEST_FINI ( &(req->req_ompi)); @@ -107,6 +114,15 @@ void mca_common_ompio_request_fini ( void ) were not destroyed / completed upon MPI_FINALIZE */ OBJ_DESTRUCT(&mca_common_ompio_pending_requests); + if (mca_common_ompio_progress_is_registered) { + OPAL_THREAD_LOCK (&mca_common_ompio_mutex); + if (mca_common_ompio_progress_is_registered) { + opal_progress_unregister(mca_common_ompio_progress); + mca_common_ompio_progress_is_registered=false; + } + OPAL_THREAD_UNLOCK (&mca_common_ompio_mutex); + } + return; } @@ -125,33 +141,97 @@ void mca_common_ompio_request_alloc ( mca_ompio_request_t **req, mca_ompio_reque void mca_common_ompio_register_progress ( void ) { - if ( false == mca_common_ompio_progress_is_registered) { + if (false == mca_common_ompio_progress_is_registered) { + OPAL_THREAD_LOCK (&mca_common_ompio_mutex); + if (mca_common_ompio_progress_is_registered) { + OPAL_THREAD_UNLOCK (&mca_common_ompio_mutex); + return; + } opal_progress_register (mca_common_ompio_progress); mca_common_ompio_progress_is_registered=true; + OPAL_THREAD_UNLOCK (&mca_common_ompio_mutex); } return; } + int mca_common_ompio_progress ( void ) { mca_ompio_request_t *req=NULL; opal_list_item_t *litem=NULL; int completed=0; - OPAL_LIST_FOREACH(litem, &mca_common_ompio_pending_requests, opal_list_item_t) { - req = GET_OMPIO_REQ_FROM_ITEM(litem); - if( REQUEST_COMPLETE(&req->req_ompi) ) { - continue; - } - if ( NULL != req->req_progress_fn ) { - if ( req->req_progress_fn(req) ) { - completed++; - ompi_request_complete (&req->req_ompi, true); - /* The fbtl progress function is expected to set the - * status elements + if (!OPAL_THREAD_TRYLOCK(&mca_common_ompio_mutex)) { + OPAL_LIST_FOREACH(litem, &mca_common_ompio_pending_requests, opal_list_item_t) { + req = GET_OMPIO_REQ_FROM_ITEM(litem); + if (REQUEST_COMPLETE(&req->req_ompi) ) { + continue; + } + if (NULL != req->req_progress_fn) { + if (req->req_progress_fn(req)) { + /** + * To support pipelined read/write operations, a user level request + * can contain multiple internal requests. These sub-requests + * contain a pointer to the parent request. + */ + mca_ompio_request_t *parent = req->req_parent; + if (NULL != parent) { + /* This is a subrequest */ + if (OMPI_SUCCESS != req->req_ompi.req_status.MPI_ERROR) { + parent->req_ompi.req_status.MPI_ERROR = req->req_ompi.req_status.MPI_ERROR; + ompi_request_complete (&parent->req_ompi, true); + continue; + } + parent->req_subreqs_completed++; + parent->req_ompi.req_status._ucount += req->req_ompi.req_status._ucount; + req->req_post_followup = true; + } else { + /* This is a request without subrequests */ + completed++; + ompi_request_complete (&req->req_ompi, true); + } + /* The fbtl progress function is expected to set the + * status elements + */ + } + } else { + /* This is a request without a lower level progress function, .e.g + * a parent request */ + if (req->req_num_subreqs == req->req_subreqs_completed) { + completed++; + ompi_request_complete (&req->req_ompi, true); + } } } + /** + * Splitting the ompio progress loop is necessary to avoid that a pending operation + * consisting of multiple subrequests is executed in a single invokation of the progress + * function. + * + * Otherwise it can happen that the next subrequest is posted, which ends up at the tail + * of the ompio_pending_requests_list, and would be processed in the same loop execution; + * which then posts the next subrequest, which is also processed potentially right away + * etc. This would make the ompio_progress function block for a long time, and prevent + * overlapping operations. + * + * Splitting the loop into two parts, one checking for completion and one posting + * the next subrequest if necessary avoids the problem. + */ + OPAL_LIST_FOREACH(litem, &mca_common_ompio_pending_requests, opal_list_item_t) { + req = GET_OMPIO_REQ_FROM_ITEM(litem); + if (true == req->req_post_followup) { + if (OPAL_THREAD_TRYLOCK(&req->req_fh->f_fh->f_lock)) { + continue; + } + mca_ompio_request_t *parent = req->req_parent; + parent->req_post_next_subreq(parent, parent->req_subreqs_completed); + OPAL_THREAD_UNLOCK(&req->req_fh->f_fh->f_lock); + ompi_request_complete (&req->req_ompi, false); + ompi_request_free ((ompi_request_t**)&req); + } + } + OPAL_THREAD_UNLOCK(&mca_common_ompio_mutex); } return completed; diff --git a/ompi/mca/common/ompio/common_ompio_request.h b/ompi/mca/common/ompio/common_ompio_request.h index 18083862df9..173f8468242 100644 --- a/ompi/mca/common/ompio/common_ompio_request.h +++ b/ompi/mca/common/ompio/common_ompio_request.h @@ -13,6 +13,7 @@ * Copyright (c) 2008-2019 University of Houston. All rights reserved. * Copyright (c) 2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -43,6 +44,8 @@ typedef enum { MCA_OMPIO_REQUEST_READ_ALL, } mca_ompio_request_type_t; +struct mca_ompio_request_t; +typedef void(*mca_ompio_post_next_subreq_t)(struct mca_ompio_request_t *req, int val); /** * Main structure for OMPIO requests @@ -54,9 +57,16 @@ struct mca_ompio_request_t { opal_list_item_t req_item; void *req_tbuf; size_t req_size; + size_t req_max_data; opal_convertor_t req_convertor; mca_fbtl_base_module_progress_fn_t req_progress_fn; mca_fbtl_base_module_request_free_fn_t req_free_fn; + mca_ompio_post_next_subreq_t req_post_next_subreq; + struct mca_ompio_request_t *req_parent; + int req_num_subreqs; + int req_subreqs_completed; + ompio_file_t *req_fh; + bool req_post_followup; }; typedef struct mca_ompio_request_t mca_ompio_request_t; OBJ_CLASS_DECLARATION(mca_ompio_request_t); diff --git a/ompi/mca/io/ompio/io_ompio_component.c b/ompi/mca/io/ompio/io_ompio_component.c index 99b030196a5..838d327e585 100644 --- a/ompi/mca/io/ompio/io_ompio_component.c +++ b/ompi/mca/io/ompio/io_ompio_component.c @@ -88,14 +88,6 @@ static int io_progress(void); static int priority_param = 30; static int delete_priority_param = 30; - -/* - * Global, component-wide OMPIO mutex because OMPIO is not thread safe - */ -opal_mutex_t mca_io_ompio_mutex = {{0}}; - - - /* * Public string showing this component's version number */ @@ -273,7 +265,7 @@ static int register_component(void) static int open_component(void) { /* Create the mutex */ - OBJ_CONSTRUCT(&mca_io_ompio_mutex, opal_mutex_t); + OBJ_CONSTRUCT(&mca_common_ompio_mutex, opal_mutex_t); mca_common_ompio_request_init (); @@ -286,7 +278,7 @@ static int close_component(void) { mca_common_ompio_request_fini (); mca_common_ompio_buffer_alloc_fini(); - OBJ_DESTRUCT(&mca_io_ompio_mutex); + OBJ_DESTRUCT(&mca_common_ompio_mutex); return OMPI_SUCCESS; } @@ -352,9 +344,9 @@ static int delete_select(const char *filename, struct opal_info_t *info, { int ret; - OPAL_THREAD_LOCK (&mca_io_ompio_mutex); + OPAL_THREAD_LOCK (&mca_common_ompio_mutex); ret = mca_common_ompio_file_delete (filename, info); - OPAL_THREAD_UNLOCK (&mca_io_ompio_mutex); + OPAL_THREAD_UNLOCK (&mca_common_ompio_mutex); return ret; }