diff --git a/ompi/mca/common/ompio/common_ompio_buffer.h b/ompi/mca/common/ompio/common_ompio_buffer.h index 90490158344..ba94c6ca80b 100644 --- a/ompi/mca/common/ompio/common_ompio_buffer.h +++ b/ompi/mca/common/ompio/common_ompio_buffer.h @@ -22,11 +22,11 @@ #define MCA_COMMON_OMPIO_CUDA_H -#define OMPIO_PREPARE_BUF(_fh,_buf,_count,_datatype,_tbuf,_convertor,_max_data,_decoded_iov,_iov_count){ \ +#define OMPIO_PREPARE_BUF(_fh,_buf,_count,_datatype,_tbuf,_convertor,_max_data,_tmp_buf_size,_decoded_iov,_iov_count){ \ OBJ_CONSTRUCT( _convertor, opal_convertor_t); \ opal_convertor_copy_and_prepare_for_send ( _fh->f_file_convertor, &(_datatype->super), _count, _buf, CONVERTOR_SEND_CONVERSION, _convertor ); \ opal_convertor_get_packed_size( _convertor, &_max_data ); \ - _tbuf = mca_common_ompio_alloc_buf (_fh, _max_data); \ + _tbuf = mca_common_ompio_alloc_buf (_fh, _tmp_buf_size==0 ? _max_data : _tmp_buf_size); \ if ( NULL == _tbuf ) { \ opal_output(1, "common_ompio: error allocating memory\n"); \ return OMPI_ERR_OUT_OF_RESOURCE; \ @@ -40,11 +40,11 @@ _decoded_iov->iov_len = _max_data; \ _iov_count=1;} -#define OMPIO_PREPARE_READ_BUF(_fh,_buf,_count,_datatype,_tbuf,_convertor,_max_data,_decoded_iov,_iov_count){ \ +#define OMPIO_PREPARE_READ_BUF(_fh,_buf,_count,_datatype,_tbuf,_convertor,_max_data,_tmp_buf_size,_decoded_iov,_iov_count){ \ OBJ_CONSTRUCT( _convertor, opal_convertor_t); \ opal_convertor_copy_and_prepare_for_recv ( _fh->f_file_convertor, &(_datatype->super), _count, _buf, 0, _convertor ); \ opal_convertor_get_packed_size( _convertor, &_max_data ); \ - _tbuf = mca_common_ompio_alloc_buf (_fh, _max_data); \ + _tbuf = mca_common_ompio_alloc_buf (_fh, _tmp_buf_size==0 ? _max_data : _tmp_buf_size); \ if ( NULL == _tbuf ) { \ opal_output(1, "common_ompio: error allocating memory\n"); \ return OMPI_ERR_OUT_OF_RESOURCE; \ diff --git a/ompi/mca/common/ompio/common_ompio_file_read.c b/ompi/mca/common/ompio/common_ompio_file_read.c index da768f55050..479e98223e2 100644 --- a/ompi/mca/common/ompio/common_ompio_file_read.c +++ b/ompi/mca/common/ompio/common_ompio_file_read.c @@ -12,6 +12,7 @@ * Copyright (c) 2008-2019 University of Houston. All rights reserved. * Copyright (c) 2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -43,56 +44,46 @@ ** mca_io_ompio_file_read/write[_at] ** ** routines are the ones registered with the ompio modules. -** The +** They call the ** ** mca_common_ompio_file_read/write[_at] ** -** routesin are used e.g. from the shared file pointer modules. +** routines, which are however also used from the shared file pointer modules. ** The main difference is, that the first one takes an ompi_file_t ** as a file pointer argument, while the second uses the ompio internal ** ompio_file_t structure. */ -int mca_common_ompio_file_read (ompio_file_t *fh, - void *buf, - int count, - struct ompi_datatype_t *datatype, - ompi_status_public_t *status) -{ - int ret = OMPI_SUCCESS; +static int mca_common_ompio_file_read_default (ompio_file_t *fh, void *buf, + int count, struct ompi_datatype_t *datatype, + ompi_status_public_t *status); - size_t total_bytes_read = 0; /* total bytes that have been read*/ - size_t bytes_per_cycle = 0; /* total read in each cycle by each process*/ - int index = 0; - int cycles = 0; +static int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf, + int count, struct ompi_datatype_t *datatype, + ompi_status_public_t *status); - uint32_t iov_count = 0; - struct iovec *decoded_iov = NULL; - - size_t max_data=0, real_bytes_read=0; - size_t spc=0; - ssize_t ret_code=0; - int i = 0; /* index into the decoded iovec of the buffer */ - int j = 0; /* index into the file via iovec */ +int mca_common_ompio_file_read (ompio_file_t *fh, + void *buf, + int count, + struct ompi_datatype_t *datatype, + ompi_status_public_t *status) +{ + bool need_to_copy = false; + int is_gpu, is_managed; - if (fh->f_amode & MPI_MODE_WRONLY){ -// opal_output(10, "Improper use of FILE Mode, Using WRONLY for Read!\n"); - ret = MPI_ERR_ACCESS; - return ret; + if (fh->f_amode & MPI_MODE_WRONLY) { + return MPI_ERR_ACCESS; } - if ( 0 == count ) { - if ( MPI_STATUS_IGNORE != status ) { + if (0 == count || 0 == fh->f_iov_count) { + if (MPI_STATUS_IGNORE != status) { status->_ucount = 0; } - return ret; + return OMPI_SUCCESS; } - bool need_to_copy = false; - opal_convertor_t convertor; - int is_gpu, is_managed; mca_common_ompio_check_gpu_buf ( fh, buf, &is_gpu, &is_managed); - if ( is_gpu && !is_managed ) { + if (is_gpu && !is_managed) { need_to_copy = true; } @@ -107,96 +98,231 @@ int mca_common_ompio_file_read (ompio_file_t *fh, */ need_to_copy = true; } - - if ( need_to_copy ) { - char *tbuf=NULL; - OMPIO_PREPARE_READ_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count); - } - else { - mca_common_ompio_decode_datatype (fh, - datatype, - count, - buf, - &max_data, - fh->f_mem_convertor, - &decoded_iov, - &iov_count); + if (need_to_copy) { + return mca_common_ompio_file_read_pipelined (fh, buf, count, datatype, status); + } else { + return mca_common_ompio_file_read_default (fh, buf, count, datatype, status); } - if ( 0 < max_data && 0 == fh->f_iov_count ) { - if ( MPI_STATUS_IGNORE != status ) { - status->_ucount = 0; - } - if (NULL != decoded_iov) { - free (decoded_iov); - decoded_iov = NULL; - } - return OMPI_SUCCESS; - } + return OMPI_SUCCESS; //silence compiler +} - if ( -1 == OMPIO_MCA_GET(fh, cycle_buffer_size )) { - bytes_per_cycle = max_data; - } - else { - bytes_per_cycle = OMPIO_MCA_GET(fh, cycle_buffer_size); - } +int mca_common_ompio_file_read_default (ompio_file_t *fh, void *buf, + int count, struct ompi_datatype_t *datatype, + ompi_status_public_t *status) +{ + size_t total_bytes_read = 0; /* total bytes that have been read*/ + size_t bytes_per_cycle = 0; /* total read in each cycle by each process*/ + int index = 0; + int cycles = 0; + + uint32_t iov_count = 0; + struct iovec *decoded_iov = NULL; + + size_t max_data=0, real_bytes_read=0; + size_t spc=0; + ssize_t ret_code=0; + int i = 0; /* index into the decoded iovec of the buffer */ + int j = 0; /* index into the file via iovec */ + + mca_common_ompio_decode_datatype (fh, datatype, count, buf, + &max_data, fh->f_mem_convertor, + &decoded_iov, &iov_count); + + bytes_per_cycle = OMPIO_MCA_GET(fh, cycle_buffer_size); cycles = ceil((double)max_data/bytes_per_cycle); #if 0 - printf ("Bytes per Cycle: %d Cycles: %d max_data:%d \n",bytes_per_cycle, cycles, max_data); + printf ("Bytes per Cycle: %d Cycles: %d max_data:%d \n", bytes_per_cycle, + cycles, max_data); #endif j = fh->f_index_in_file_view; - for (index = 0; index < cycles; index++) { + mca_common_ompio_build_io_array (fh, index, cycles, bytes_per_cycle, + max_data, iov_count, decoded_iov, + &i, &j, &total_bytes_read, &spc, + &fh->f_io_array, &fh->f_num_of_io_entries); + if (fh->f_num_of_io_entries == 0) { + ret_code = 0; + goto exit; + } - mca_common_ompio_build_io_array ( fh, - index, - cycles, - bytes_per_cycle, - max_data, - iov_count, - decoded_iov, - &i, - &j, - &total_bytes_read, - &spc, - &fh->f_io_array, - &fh->f_num_of_io_entries); - - if (fh->f_num_of_io_entries) { - ret_code = fh->f_fbtl->fbtl_preadv (fh); - if ( 0<= ret_code ) { - real_bytes_read+=(size_t)ret_code; - } - } + ret_code = fh->f_fbtl->fbtl_preadv (fh); + if (0 <= ret_code) { + real_bytes_read += (size_t)ret_code; + // Reset ret_code since it is also used to return an error + ret_code = 0; + } else { + goto exit; + } fh->f_num_of_io_entries = 0; - if (NULL != fh->f_io_array) { - free (fh->f_io_array); - fh->f_io_array = NULL; + free (fh->f_io_array); + fh->f_io_array = NULL; + } + + exit: + free (decoded_iov); + if (MPI_STATUS_IGNORE != status) { + status->_ucount = real_bytes_read; + } + + return ret_code; +} + +int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf, + int count, struct ompi_datatype_t *datatype, + ompi_status_public_t *status) +{ + size_t tbr = 0; /* total bytes that have been read*/ + size_t bytes_per_cycle = 0; /* total read in each cycle by each process*/ + size_t bytes_this_cycle = 0, bytes_prev_cycle = 0; + int index = 0; + int cycles = 0; + uint32_t iov_count = 0; + struct iovec *decoded_iov = NULL; + + size_t max_data=0, real_bytes_read=0; + size_t spc=0; + ssize_t ret_code=0; + int i = 0; /* index into the decoded iovec of the buffer */ + int j = 0; /* index into the file via iovec */ + + char *tbuf1=NULL, *tbuf2=NULL; + char *unpackbuf=NULL, *readbuf=NULL; + mca_ompio_request_t *ompio_req=NULL, *prev_ompio_req=NULL; + opal_convertor_t convertor; + bool can_overlap = (NULL != fh->f_fbtl->fbtl_ipreadv); + + bytes_per_cycle = OMPIO_MCA_GET(fh, pipeline_buffer_size); + OMPIO_PREPARE_READ_BUF (fh, buf, count, datatype, tbuf1, &convertor, + max_data, bytes_per_cycle, decoded_iov, iov_count); + cycles = ceil((double)max_data/bytes_per_cycle); + + readbuf = unpackbuf = tbuf1; + if (can_overlap) { + tbuf2 = mca_common_ompio_alloc_buf (fh, bytes_per_cycle); + if (NULL == tbuf2) { + opal_output(1, "common_ompio: error allocating memory\n"); + free (decoded_iov); + return OMPI_ERR_OUT_OF_RESOURCE; } + unpackbuf = tbuf2; } - if ( need_to_copy ) { - size_t pos=0; +#if 0 + printf ("Bytes per Cycle: %d Cycles: %d max_data:%d \n", bytes_per_cycle, + cycles, max_data); +#endif - opal_convertor_unpack (&convertor, decoded_iov, &iov_count, &pos ); - opal_convertor_cleanup (&convertor); - mca_common_ompio_release_buf (fh, decoded_iov->iov_base); + /* + ** The code combines two scenarios: + ** 1. having async read (i.e. ipreadv) which allows to overlap two + ** iterations. + ** 2. not having async read, which doesn't allow for overlap. + ** + ** In the first case we use a double buffering technique, the sequence is + ** - construct io-array for iter i + ** - post ipreadv for iter i + ** - wait for iter i-1 + ** - unpack buffer i-1 + ** - swap buffers + ** + ** In the second case, the sequence is + ** - construct io-array for iter i + ** - post preadv for iter i + ** - unpack buffer i + */ + + j = fh->f_index_in_file_view; + if (can_overlap) { + mca_common_ompio_register_progress (); + } + + for (index = 0; index < cycles+1; index++) { + if (index < cycles) { + decoded_iov->iov_base = readbuf; + decoded_iov->iov_len = bytes_per_cycle; + bytes_this_cycle = (index == cycles-1) ? + (max_data - (index * bytes_per_cycle)) : + bytes_per_cycle; + + i = 0; + spc = 0; + tbr = 0; + + mca_common_ompio_build_io_array (fh, index, cycles, bytes_per_cycle, + bytes_this_cycle, iov_count, + decoded_iov, &i, &j, &tbr, &spc, + &fh->f_io_array, &fh->f_num_of_io_entries); + if (fh->f_num_of_io_entries == 0) { + ret_code = 0; + goto exit; + } + + if (can_overlap) { + mca_common_ompio_request_alloc ( &ompio_req, MCA_OMPIO_REQUEST_READ); + fh->f_fbtl->fbtl_ipreadv (fh, (ompi_request_t *)ompio_req); + } else { + ret_code = fh->f_fbtl->fbtl_preadv (fh); + if (0 <= ret_code) { + real_bytes_read+=(size_t)ret_code; + // Reset ret_code since it is also used to return an error + ret_code = 0; + } + else { + goto exit; + } + } + } + + if (can_overlap) { + if (index != 0) { + ompi_status_public_t stat; + ret_code = ompi_request_wait ((ompi_request_t **)&prev_ompio_req, &stat); + if (OMPI_SUCCESS != ret_code) { + goto exit; + } + real_bytes_read += stat._ucount; + } + prev_ompio_req = ompio_req; + } + + if ((can_overlap && index != 0) || + (!can_overlap && index < cycles)) { + size_t pos = 0; + decoded_iov->iov_base = unpackbuf; + decoded_iov->iov_len = can_overlap ? bytes_prev_cycle : bytes_this_cycle; + opal_convertor_unpack (&convertor, decoded_iov, &iov_count, &pos); + } + + fh->f_num_of_io_entries = 0; + free (fh->f_io_array); + fh->f_io_array = NULL; + bytes_prev_cycle = bytes_this_cycle; + + if (can_overlap) { + char *tmp = unpackbuf; + unpackbuf = readbuf; + readbuf = tmp; + } } - if (NULL != decoded_iov) { - free (decoded_iov); - decoded_iov = NULL; + exit: + opal_convertor_cleanup (&convertor); + mca_common_ompio_release_buf (fh, tbuf1); + if (can_overlap) { + mca_common_ompio_release_buf (fh, tbuf2); } + free (decoded_iov); if ( MPI_STATUS_IGNORE != status ) { status->_ucount = real_bytes_read; } - return ret; + return ret_code; } int mca_common_ompio_file_read_at (ompio_file_t *fh, @@ -288,7 +414,8 @@ int mca_common_ompio_file_iread (ompio_file_t *fh, if ( need_to_copy ) { char *tbuf=NULL; - OMPIO_PREPARE_READ_BUF(fh,buf,count,datatype,tbuf,&ompio_req->req_convertor,max_data,decoded_iov,iov_count); + OMPIO_PREPARE_READ_BUF(fh, buf, count, datatype, tbuf, &ompio_req->req_convertor, + max_data, 0, decoded_iov, iov_count); ompio_req->req_tbuf = tbuf; ompio_req->req_size = max_data; @@ -430,7 +557,8 @@ int mca_common_ompio_file_read_all (ompio_file_t *fh, struct iovec *decoded_iov = NULL; uint32_t iov_count = 0; - OMPIO_PREPARE_READ_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count); + OMPIO_PREPARE_READ_BUF(fh, buf, count, datatype, tbuf, &convertor, + max_data, 0, decoded_iov, iov_count); ret = fh->f_fcoll->fcoll_file_read_all (fh, decoded_iov->iov_base, decoded_iov->iov_len, diff --git a/ompi/mca/common/ompio/common_ompio_file_write.c b/ompi/mca/common/ompio/common_ompio_file_write.c index 0b7daafeda0..e2ceb8187c9 100644 --- a/ompi/mca/common/ompio/common_ompio_file_write.c +++ b/ompi/mca/common/ompio/common_ompio_file_write.c @@ -12,6 +12,7 @@ * Copyright (c) 2008-2019 University of Houston. All rights reserved. * Copyright (c) 2015-2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -35,49 +36,41 @@ #include #include +static int mca_common_ompio_file_write_pipelined (ompio_file_t *fh, const void *buf, + int count, struct ompi_datatype_t *datatype, + ompi_status_public_t *status); + +static int mca_common_ompio_file_write_default (ompio_file_t *fh, const void *buf, + int count, struct ompi_datatype_t *datatype, + ompi_status_public_t *status); + int mca_common_ompio_file_write (ompio_file_t *fh, - const void *buf, - int count, - struct ompi_datatype_t *datatype, - ompi_status_public_t *status) + const void *buf, + int count, + struct ompi_datatype_t *datatype, + ompi_status_public_t *status) { int ret = OMPI_SUCCESS; - int index = 0; - int cycles = 0; - - uint32_t iov_count = 0; - struct iovec *decoded_iov = NULL; - size_t bytes_per_cycle=0; - size_t total_bytes_written = 0; - size_t max_data=0, real_bytes_written=0; - ssize_t ret_code=0; - size_t spc=0; - int i = 0; /* index into the decoded iovec of the buffer */ - int j = 0; /* index into the file view iovec */ - if (fh->f_amode & MPI_MODE_RDONLY){ -// opal_output(10, "Improper use of FILE Mode, Using RDONLY for write!\n"); ret = MPI_ERR_READ_ONLY; - return ret; + return ret; } - - if ( 0 == count ) { - if ( MPI_STATUS_IGNORE != status ) { + if (0 == count || 0 == fh->f_iov_count) { + if (MPI_STATUS_IGNORE != status) { status->_ucount = 0; } return ret; } bool need_to_copy = false; - int is_gpu, is_managed; - mca_common_ompio_check_gpu_buf ( fh, buf, &is_gpu, &is_managed); - if ( is_gpu && !is_managed ) { + mca_common_ompio_check_gpu_buf (fh, buf, &is_gpu, &is_managed); + if (is_gpu && !is_managed) { need_to_copy = true; } - if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) && + if ( !(fh->f_flags & OMPIO_DATAREP_NATIVE ) && !(datatype == &ompi_mpi_byte.dt || datatype == &ompi_mpi_char.dt )) { /* only need to copy if any of these conditions are given: @@ -88,95 +81,215 @@ int mca_common_ompio_file_write (ompio_file_t *fh, */ need_to_copy = true; } - - if ( need_to_copy ) { - size_t pos=0; - char *tbuf=NULL; - opal_convertor_t convertor; - - OMPIO_PREPARE_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count); - opal_convertor_pack (&convertor, decoded_iov, &iov_count, &pos ); - opal_convertor_cleanup ( &convertor); - } - else { - mca_common_ompio_decode_datatype (fh, - datatype, - count, - buf, - &max_data, - fh->f_mem_convertor, - &decoded_iov, - &iov_count); - } - if ( 0 < max_data && 0 == fh->f_iov_count ) { - if ( MPI_STATUS_IGNORE != status ) { - status->_ucount = 0; - } - if (NULL != decoded_iov) { - free (decoded_iov); - decoded_iov = NULL; - } - return OMPI_SUCCESS; + if (need_to_copy) { + return mca_common_ompio_file_write_pipelined (fh, buf, count, datatype, status); + } else { + return mca_common_ompio_file_write_default (fh, buf, count, datatype, status); } - if ( -1 == OMPIO_MCA_GET(fh, cycle_buffer_size )) { - bytes_per_cycle = max_data; - } - else { - bytes_per_cycle = OMPIO_MCA_GET(fh, cycle_buffer_size); - } - cycles = ceil((double)max_data/bytes_per_cycle); + return OMPI_SUCCESS; //silence compiler +} -#if 0 - printf ("Bytes per Cycle: %d Cycles: %d\n", bytes_per_cycle, cycles); -#endif +int mca_common_ompio_file_write_default (ompio_file_t *fh, + const void *buf, + int count, + struct ompi_datatype_t *datatype, + ompi_status_public_t *status) +{ + int index = 0; + int cycles = 0; + uint32_t iov_count = 0; + struct iovec *decoded_iov = NULL; + size_t bytes_per_cycle = 0; + size_t total_bytes_written = 0; + size_t max_data = 0, real_bytes_written = 0; + ssize_t ret_code = 0; + size_t spc = 0; + int i = 0; /* index into the decoded iovec of the buffer */ + int j = 0; /* index into the file view iovec */ + + mca_common_ompio_decode_datatype (fh, datatype, count, + buf, &max_data, + fh->f_mem_convertor, + &decoded_iov, &iov_count); + + bytes_per_cycle = OMPIO_MCA_GET(fh, cycle_buffer_size); + cycles = ceil((double)max_data/bytes_per_cycle); j = fh->f_index_in_file_view; for (index = 0; index < cycles; index++) { - mca_common_ompio_build_io_array ( fh, - index, - cycles, - bytes_per_cycle, - max_data, - iov_count, - decoded_iov, - &i, - &j, - &total_bytes_written, - &spc, - &fh->f_io_array, - &fh->f_num_of_io_entries); + mca_common_ompio_build_io_array ( fh, index, cycles, + bytes_per_cycle, max_data, + iov_count, decoded_iov, + &i, &j, &total_bytes_written, &spc, + &fh->f_io_array, &fh->f_num_of_io_entries); + if (fh->f_num_of_io_entries == 0) { + ret_code = 0; + goto exit; + } - if (fh->f_num_of_io_entries) { - ret_code =fh->f_fbtl->fbtl_pwritev (fh); - if ( 0<= ret_code ) { - real_bytes_written+= (size_t)ret_code; - } - } + ret_code = fh->f_fbtl->fbtl_pwritev (fh); + if (0 <= ret_code) { + real_bytes_written+= (size_t)ret_code; + // Reset ret_code since it is also used to return an error + ret_code = 0; + } else { + goto exit; + } fh->f_num_of_io_entries = 0; - if (NULL != fh->f_io_array) { - free (fh->f_io_array); - fh->f_io_array = NULL; + free (fh->f_io_array); + fh->f_io_array = NULL; + } + + exit: + free (decoded_iov); + if ( MPI_STATUS_IGNORE != status ) { + status->_ucount = real_bytes_written; + } + + return ret_code; +} + +int mca_common_ompio_file_write_pipelined (ompio_file_t *fh, + const void *buf, + int count, + struct ompi_datatype_t *datatype, + ompi_status_public_t *status) +{ + int index = 0; + int cycles = 0; + + uint32_t iov_count = 0; + struct iovec *decoded_iov = NULL; + size_t bytes_per_cycle=0, tbw = 0; + size_t max_data=0, real_bytes_written=0; + ssize_t ret_code=0; + size_t spc=0; + int i = 0; /* index into the decoded iovec of the buffer */ + int j = 0; /* index into the file view iovec */ + + size_t pos=0; + char *tbuf1=NULL, *tbuf2=NULL; + char *packbuf=NULL, *writebuf=NULL; + mca_ompio_request_t *ompio_req=NULL, *prev_ompio_req=NULL; + opal_convertor_t convertor; + bool can_overlap = (NULL != fh->f_fbtl->fbtl_ipwritev); + + bytes_per_cycle = OMPIO_MCA_GET(fh, pipeline_buffer_size); + OMPIO_PREPARE_BUF (fh, buf, count, datatype, tbuf1, &convertor, + max_data, bytes_per_cycle, decoded_iov, iov_count); + cycles = ceil((double)max_data/bytes_per_cycle); + + packbuf = tbuf1; + if (can_overlap) { + //Allocate second buffer to alternate packing and writing + tbuf2 = mca_common_ompio_alloc_buf (fh, bytes_per_cycle); + if (NULL == tbuf2) { + opal_output(1, "common_ompio: error allocating memory\n"); + free (decoded_iov); + return OMPI_ERR_OUT_OF_RESOURCE; } + writebuf = tbuf2; } - if ( need_to_copy ) { - mca_common_ompio_release_buf (fh, decoded_iov->iov_base); + /* + ** The code combines two scenarios: + ** 1. having async write (i.e. ipwritev) which allows to overlap two + ** iterations. + ** 2. not having async write, which doesn't allow for overlap. + ** + ** In the first case we use a double buffering technique, the sequence is + ** - construct io-array for iter i + ** - pack buffer for iter i + ** - post ipwritev for iter i + ** - wait for iter i-1 + ** - swap buffers + ** + ** In the second case, the sequence is + ** - construct io-array for iter i + ** - pack buffer i + ** - post pwrite for iter i + */ + + j = fh->f_index_in_file_view; + if (can_overlap) { + mca_common_ompio_register_progress (); } + for (index = 0; index <= cycles; index++) { + if (index < cycles) { + decoded_iov->iov_base = packbuf; + decoded_iov->iov_len = bytes_per_cycle; + iov_count = 1; + + opal_convertor_pack (&convertor, decoded_iov, &iov_count, &pos); + spc = 0; + tbw = 0; + i = 0; + mca_common_ompio_build_io_array (fh, index, cycles, + bytes_per_cycle, pos, + iov_count, decoded_iov, + &i, &j, &tbw, &spc, + &fh->f_io_array, &fh->f_num_of_io_entries); + if (fh->f_num_of_io_entries== 0) { + ret_code = 0; + goto exit; + } - if (NULL != decoded_iov) { - free (decoded_iov); - decoded_iov = NULL; + if (can_overlap) { + mca_common_ompio_request_alloc ( &ompio_req, MCA_OMPIO_REQUEST_WRITE); + fh->f_fbtl->fbtl_ipwritev (fh, (ompi_request_t *)ompio_req); + } else { + ret_code = fh->f_fbtl->fbtl_pwritev (fh); + if (0 <= ret_code) { + real_bytes_written += (size_t)ret_code; + // Reset ret_code since it is also used to return an error + ret_code = 0; + } else { + goto exit; + } + } + } + + if (can_overlap) { + if (index != 0) { + ompi_status_public_t stat; + ret_code = ompi_request_wait ((ompi_request_t **)&prev_ompio_req, &stat); + if (OMPI_SUCCESS != ret_code) { + goto exit; + } + real_bytes_written += stat._ucount; + } + prev_ompio_req = ompio_req; + } + + fh->f_num_of_io_entries = 0; + free (fh->f_io_array); + fh->f_io_array = NULL; + + if (can_overlap) { + char *tmp = packbuf; + packbuf = writebuf; + writebuf = tmp; + } + } + + exit: + mca_common_ompio_release_buf (fh, tbuf1); + if (can_overlap) { + mca_common_ompio_release_buf (fh, tbuf2); } + opal_convertor_cleanup (&convertor); + free (decoded_iov); + if ( MPI_STATUS_IGNORE != status ) { status->_ucount = real_bytes_written; } - return ret; + return ret_code; } int mca_common_ompio_file_write_at (ompio_file_t *fh, @@ -214,7 +327,6 @@ int mca_common_ompio_file_iwrite (ompio_file_t *fh, size_t spc=0; if (fh->f_amode & MPI_MODE_RDONLY){ -// opal_output(10, "Improper use of FILE Mode, Using RDONLY for write!\n"); ret = MPI_ERR_READ_ONLY; return ret; } @@ -265,7 +377,8 @@ int mca_common_ompio_file_iwrite (ompio_file_t *fh, char *tbuf=NULL; opal_convertor_t convertor; - OMPIO_PREPARE_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count); + OMPIO_PREPARE_BUF (fh, buf, count, datatype, tbuf, &convertor, + max_data, 0, decoded_iov, iov_count); opal_convertor_pack (&convertor, decoded_iov, &iov_count, &pos ); opal_convertor_cleanup (&convertor); @@ -406,7 +519,8 @@ int mca_common_ompio_file_write_all (ompio_file_t *fh, struct iovec *decoded_iov = NULL; uint32_t iov_count = 0; - OMPIO_PREPARE_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count); + OMPIO_PREPARE_BUF (fh, buf, count, datatype, tbuf, &convertor, + max_data, 0, decoded_iov, iov_count); opal_convertor_pack (&convertor, decoded_iov, &iov_count, &pos ); opal_convertor_cleanup ( &convertor); diff --git a/ompi/mca/io/ompio/io_ompio.c b/ompi/mca/io/ompio/io_ompio.c index 8a806861db0..1c64c8463d4 100644 --- a/ompi/mca/io/ompio/io_ompio.c +++ b/ompi/mca/io/ompio/io_ompio.c @@ -15,6 +15,7 @@ * Copyright (c) 2012-2013 Inria. All rights reserved. * Copyright (c) 2015-2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. + * Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -536,6 +537,9 @@ int mca_io_ompio_get_mca_parameter_value ( char *mca_parameter_name, int name_le else if ( !strncmp ( mca_parameter_name, "cycle_buffer_size", name_length )) { return mca_io_ompio_cycle_buffer_size; } + else if ( !strncmp ( mca_parameter_name, "pipeline_buffer_size", name_length )) { + return mca_io_ompio_pipeline_buffer_size; + } else if ( !strncmp ( mca_parameter_name, "max_aggregators_ratio", name_length )) { return mca_io_ompio_max_aggregators_ratio; } diff --git a/ompi/mca/io/ompio/io_ompio.h b/ompi/mca/io/ompio/io_ompio.h index 4c019240892..5f9a4462f1a 100644 --- a/ompi/mca/io/ompio/io_ompio.h +++ b/ompi/mca/io/ompio/io_ompio.h @@ -14,6 +14,7 @@ * Copyright (c) 2015-2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2016-2017 IBM Corporation. All rights reserved. + * Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -45,6 +46,7 @@ #include "ompi/mca/common/ompio/common_ompio.h" extern int mca_io_ompio_cycle_buffer_size; +extern int mca_io_ompio_pipeline_buffer_size; extern int mca_io_ompio_bytes_per_agg; extern int mca_io_ompio_num_aggregators; extern int mca_io_ompio_record_offset_info; @@ -63,6 +65,7 @@ OMPI_DECLSPEC extern int mca_io_ompio_coll_timing_info; */ #define OMPIO_PREALLOC_MAX_BUF_SIZE 33554432 #define OMPIO_DEFAULT_CYCLE_BUF_SIZE 536870912 +#define OMPIO_DEFAULT_PIPELINE_BUF_SIZE 1048576 #define OMPIO_TAG_GATHER -100 #define OMPIO_TAG_GATHERV -101 #define OMPIO_TAG_BCAST -102 diff --git a/ompi/mca/io/ompio/io_ompio_component.c b/ompi/mca/io/ompio/io_ompio_component.c index 19cce279d27..99b030196a5 100644 --- a/ompi/mca/io/ompio/io_ompio_component.c +++ b/ompi/mca/io/ompio/io_ompio_component.c @@ -17,6 +17,7 @@ * and Technology (RIST). All rights reserved. * Copyright (c) 2016-2017 IBM Corporation. All rights reserved. * Copyright (c) 2018 DataDirect Networks. All rights reserved. + * Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -37,6 +38,7 @@ #include "ompi/mca/common/ompio/common_ompio_buffer.h" int mca_io_ompio_cycle_buffer_size = OMPIO_DEFAULT_CYCLE_BUF_SIZE; +int mca_io_ompio_pipeline_buffer_size = OMPIO_DEFAULT_PIPELINE_BUF_SIZE; int mca_io_ompio_bytes_per_agg = OMPIO_PREALLOC_MAX_BUF_SIZE; int mca_io_ompio_num_aggregators = -1; int mca_io_ompio_record_offset_info = 0; @@ -177,6 +179,16 @@ static int register_component(void) MCA_BASE_VAR_SCOPE_READONLY, &mca_io_ompio_cycle_buffer_size); + mca_io_ompio_pipeline_buffer_size = OMPIO_DEFAULT_PIPELINE_BUF_SIZE; + (void) mca_base_component_var_register(&mca_io_ompio_component.io_version, + "pipeline_buffer_size", + "Size of temporary buffer used by individual reads/writes " + "in the pipeline protocol", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &mca_io_ompio_pipeline_buffer_size); + mca_io_ompio_bytes_per_agg = OMPIO_PREALLOC_MAX_BUF_SIZE; (void) mca_base_component_var_register(&mca_io_ompio_component.io_version, "bytes_per_agg",