Skip to content

Commit 7979d17

Browse files
committed
common/ompio: implement pipelined file_iwrite and iread
Pipelined iread/iwrite operations require the notion of subrequests, i.e. a user level request can contain multiple internal subrequests that all have to complete before the user level operation is considered finished. This requires adjustments to the internal ompio progress engine and data structures. Note: this is purely just a pipelined algorithm, no overlap between different iterations.
1 parent fefae1a commit 7979d17

File tree

5 files changed

+286
-199
lines changed

5 files changed

+286
-199
lines changed

ompi/mca/common/ompio/common_ompio.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ OMPI_DECLSPEC int mca_common_ompio_file_iwrite_at_all (ompio_file_t *fp, OMPI_MP
274274

275275
OMPI_DECLSPEC int mca_common_ompio_build_io_array ( ompio_file_t *fh, int index, int cycles,
276276
size_t bytes_per_cycle, size_t max_data, uint32_t iov_count,
277-
struct iovec *decoded_iov, int *ii, int *jj, size_t *tbw,
277+
struct iovec *decoded_iov, int *ii, size_t *tbw,
278278
size_t *spc, mca_common_ompio_io_array_t **io_array,
279279
int *num_io_entries );
280280

ompi/mca/common/ompio/common_ompio_file_read.c

Lines changed: 113 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ int mca_common_ompio_file_read_default (ompio_file_t *fh, void *buf,
124124
size_t spc=0;
125125
ssize_t ret_code=0;
126126
int i = 0; /* index into the decoded iovec of the buffer */
127-
int j = 0; /* index into the file via iovec */
128127

129128
mca_common_ompio_decode_datatype (fh, datatype, count, buf,
130129
&max_data, fh->f_mem_convertor,
@@ -138,11 +137,10 @@ int mca_common_ompio_file_read_default (ompio_file_t *fh, void *buf,
138137
cycles, max_data);
139138
#endif
140139

141-
j = fh->f_index_in_file_view;
142140
for (index = 0; index < cycles; index++) {
143141
mca_common_ompio_build_io_array (fh, index, cycles, bytes_per_cycle,
144142
max_data, iov_count, decoded_iov,
145-
&i, &j, &total_bytes_read, &spc,
143+
&i, &total_bytes_read, &spc,
146144
&fh->f_io_array, &fh->f_num_of_io_entries);
147145
if (fh->f_num_of_io_entries == 0) {
148146
ret_code = 0;
@@ -188,7 +186,6 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf,
188186
size_t spc=0;
189187
ssize_t ret_code=0;
190188
int i = 0; /* index into the decoded iovec of the buffer */
191-
int j = 0; /* index into the file via iovec */
192189

193190
char *tbuf1=NULL, *tbuf2=NULL;
194191
char *unpackbuf=NULL, *readbuf=NULL;
@@ -236,7 +233,6 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf,
236233
** - unpack buffer i
237234
*/
238235

239-
j = fh->f_index_in_file_view;
240236
if (can_overlap) {
241237
mca_common_ompio_register_progress ();
242238
}
@@ -255,7 +251,7 @@ int mca_common_ompio_file_read_pipelined (ompio_file_t *fh, void *buf,
255251

256252
mca_common_ompio_build_io_array (fh, index, cycles, bytes_per_cycle,
257253
bytes_this_cycle, iov_count,
258-
decoded_iov, &i, &j, &tbr, &spc,
254+
decoded_iov, &i, &tbr, &spc,
259255
&fh->f_io_array, &fh->f_num_of_io_entries);
260256
if (fh->f_num_of_io_entries == 0) {
261257
ret_code = 0;
@@ -352,53 +348,103 @@ int mca_common_ompio_file_read_at (ompio_file_t *fh,
352348
return ret;
353349
}
354350

351+
static void mca_common_ompio_post_next_read_subreq(struct mca_ompio_request_t *req, int index)
352+
{
353+
uint32_t iov_count = 1;
354+
size_t pos = 0, spc = 0, tbw = 0;
355+
int i = 0;
356+
mca_ompio_request_t *ompio_subreq=NULL;
357+
size_t bytes_per_cycle = OMPIO_MCA_GET(req->req_fh, pipeline_buffer_size);
358+
359+
struct iovec *decoded_iov = (struct iovec *) malloc (sizeof (struct iovec));
360+
if ( NULL == decoded_iov ) {
361+
opal_output(1, "common_ompio: could not allocate memory.\n");
362+
return;
363+
}
364+
365+
/* Step 1: finish index-1 unpack operation */
366+
if (index - 1 >= 0) {
367+
size_t num_bytes = bytes_per_cycle;
368+
/**
369+
* should really be 'req_num_subreqs -1 == index -1'
370+
* which is the same as below.
371+
*/
372+
if (req->req_num_subreqs == index) {
373+
num_bytes = req->req_max_data - (index-1)* bytes_per_cycle;
374+
}
375+
decoded_iov->iov_base = req->req_tbuf;
376+
decoded_iov->iov_len = num_bytes;
377+
opal_convertor_unpack (&req->req_convertor, decoded_iov, &iov_count, &pos);
378+
}
379+
380+
/* Step 2: post next iread subrequest */
381+
if (req->req_num_subreqs == index) {
382+
/* all done */
383+
free (decoded_iov);
384+
return;
385+
}
386+
387+
decoded_iov->iov_base = req->req_tbuf;
388+
decoded_iov->iov_len = (req->req_num_subreqs-1 == index) ?
389+
req->req_max_data - (index* bytes_per_cycle) : req->req_size;
390+
mca_common_ompio_build_io_array (req->req_fh, index, req->req_num_subreqs,
391+
bytes_per_cycle, decoded_iov->iov_len,
392+
iov_count, decoded_iov,
393+
&i, &tbw, &spc,
394+
&req->req_fh->f_io_array,
395+
&req->req_fh->f_num_of_io_entries);
396+
397+
mca_common_ompio_request_alloc ( &ompio_subreq, MCA_OMPIO_REQUEST_READ);
398+
ompio_subreq->req_parent = req;
399+
req->req_fh->f_fbtl->fbtl_ipreadv (req->req_fh, (ompi_request_t *)ompio_subreq);
400+
401+
free(req->req_fh->f_io_array);
402+
req->req_fh->f_io_array = NULL;
403+
req->req_fh->f_num_of_io_entries = 0;
404+
405+
free (decoded_iov);
406+
}
355407

356408
int mca_common_ompio_file_iread (ompio_file_t *fh,
357-
void *buf,
358-
int count,
359-
struct ompi_datatype_t *datatype,
360-
ompi_request_t **request)
409+
void *buf,
410+
int count,
411+
struct ompi_datatype_t *datatype,
412+
ompi_request_t **request)
361413
{
362414
int ret = OMPI_SUCCESS;
363415
mca_ompio_request_t *ompio_req=NULL;
364-
size_t spc=0;
416+
struct iovec *decoded_iov = NULL;
365417

366418
if (fh->f_amode & MPI_MODE_WRONLY){
367-
// opal_output(10, "Improper use of FILE Mode, Using WRONLY for Read!\n");
368419
ret = MPI_ERR_ACCESS;
369-
return ret;
420+
return ret;
370421
}
371422

372423
mca_common_ompio_request_alloc ( &ompio_req, MCA_OMPIO_REQUEST_READ);
373424

374-
if ( 0 == count ) {
425+
if (0 == count || 0 == fh->f_iov_count) {
375426
ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
376427
ompio_req->req_ompi.req_status._ucount = 0;
377428
ompi_request_complete (&ompio_req->req_ompi, false);
378429
*request = (ompi_request_t *) ompio_req;
379-
430+
380431
return OMPI_SUCCESS;
381432
}
382433

383434
if ( NULL != fh->f_fbtl->fbtl_ipreadv ) {
384435
// This fbtl has support for non-blocking operations
385-
386-
size_t total_bytes_read = 0; /* total bytes that have been read*/
387436
uint32_t iov_count = 0;
388-
struct iovec *decoded_iov = NULL;
389-
390437
size_t max_data = 0;
391-
int i = 0; /* index into the decoded iovec of the buffer */
392-
int j = 0; /* index into the file via iovec */
393-
394438
bool need_to_copy = false;
395-
396439
int is_gpu, is_managed;
397-
mca_common_ompio_check_gpu_buf ( fh, buf, &is_gpu, &is_managed);
398-
if ( is_gpu && !is_managed ) {
440+
441+
mca_common_ompio_check_gpu_buf (fh, buf, &is_gpu, &is_managed);
442+
if (is_gpu && !is_managed) {
399443
need_to_copy = true;
400444
}
401445

446+
mca_common_ompio_register_progress ();
447+
402448
if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
403449
!(datatype == &ompi_mpi_byte.dt ||
404450
datatype == &ompi_mpi_char.dt )) {
@@ -411,88 +457,61 @@ int mca_common_ompio_file_iread (ompio_file_t *fh,
411457
need_to_copy = true;
412458
}
413459

414-
if ( need_to_copy ) {
415-
char *tbuf=NULL;
460+
if (need_to_copy) {
461+
size_t pipeline_buf_size = OMPIO_MCA_GET(fh, pipeline_buffer_size);
416462

417-
OMPIO_PREPARE_READ_BUF(fh, buf, count, datatype, tbuf, &ompio_req->req_convertor,
418-
max_data, 0, decoded_iov, iov_count);
419-
420-
ompio_req->req_tbuf = tbuf;
421-
ompio_req->req_size = max_data;
463+
OMPIO_PREPARE_READ_BUF(fh, buf, count, datatype, ompio_req->req_tbuf,
464+
&ompio_req->req_convertor, max_data,
465+
pipeline_buf_size, decoded_iov, iov_count);
466+
467+
ompio_req->req_num_subreqs = ceil((double)max_data/pipeline_buf_size);
468+
ompio_req->req_size = pipeline_buf_size;
469+
ompio_req->req_max_data = max_data;
470+
ompio_req->req_post_next_subreq = mca_common_ompio_post_next_read_subreq;
471+
ompio_req->req_fh = fh;
472+
ompio_req->req_ompi.req_status.MPI_ERROR = MPI_SUCCESS;
473+
474+
mca_common_ompio_post_next_read_subreq (ompio_req, 0);
422475
}
423476
else {
424-
mca_common_ompio_decode_datatype (fh,
425-
datatype,
426-
count,
427-
buf,
428-
&max_data,
429-
fh->f_mem_convertor,
430-
&decoded_iov,
431-
&iov_count);
432-
}
433-
434-
if ( 0 < max_data && 0 == fh->f_iov_count ) {
435-
ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
436-
ompio_req->req_ompi.req_status._ucount = 0;
437-
ompi_request_complete (&ompio_req->req_ompi, false);
438-
*request = (ompi_request_t *) ompio_req;
439-
if (NULL != decoded_iov) {
440-
free (decoded_iov);
441-
decoded_iov = NULL;
442-
}
443-
444-
return OMPI_SUCCESS;
445-
}
446-
447-
// Non-blocking operations have to occur in a single cycle
448-
j = fh->f_index_in_file_view;
449-
450-
mca_common_ompio_build_io_array ( fh,
451-
0, // index
452-
1, // no. of cyces
453-
max_data, // setting bytes per cycle to match data
454-
max_data,
455-
iov_count,
456-
decoded_iov,
457-
&i,
458-
&j,
459-
&total_bytes_read,
460-
&spc,
461-
&fh->f_io_array,
462-
&fh->f_num_of_io_entries);
463-
464-
if (fh->f_num_of_io_entries) {
465-
fh->f_fbtl->fbtl_ipreadv (fh, (ompi_request_t *) ompio_req);
466-
}
467-
468-
mca_common_ompio_register_progress ();
469-
470-
fh->f_num_of_io_entries = 0;
471-
if (NULL != fh->f_io_array) {
472-
free (fh->f_io_array);
473-
fh->f_io_array = NULL;
474-
}
477+
int i = 0;
478+
size_t spc = 0, tbr = 0;
479+
mca_common_ompio_decode_datatype (fh, datatype, count, buf,
480+
&max_data, fh->f_mem_convertor,
481+
&decoded_iov, &iov_count);
482+
483+
/**
484+
* Non-blocking operations have to occur in a single cycle
485+
* If the f_io_array is too long, the fbtl will chunk it up
486+
* internally.
487+
*/
488+
mca_common_ompio_build_io_array (fh, 0, 1, max_data, max_data,
489+
iov_count, decoded_iov, &i,
490+
&tbr, &spc,
491+
&fh->f_io_array, &fh->f_num_of_io_entries);
475492

476-
if (NULL != decoded_iov) {
477-
free (decoded_iov);
478-
decoded_iov = NULL;
479-
}
493+
fh->f_fbtl->fbtl_ipreadv (fh, (ompi_request_t *) ompio_req);
494+
}
480495
}
481496
else {
482-
// This fbtl does not support non-blocking operations
483-
ompi_status_public_t status;
484-
ret = mca_common_ompio_file_read (fh, buf, count, datatype, &status);
497+
// This fbtl does not support non-blocking operations
498+
ompi_status_public_t status;
499+
ret = mca_common_ompio_file_read (fh, buf, count, datatype, &status);
485500

486-
ompio_req->req_ompi.req_status.MPI_ERROR = ret;
487-
ompio_req->req_ompi.req_status._ucount = status._ucount;
488-
ompi_request_complete (&ompio_req->req_ompi, false);
501+
ompio_req->req_ompi.req_status.MPI_ERROR = ret;
502+
ompio_req->req_ompi.req_status._ucount = status._ucount;
503+
ompi_request_complete (&ompio_req->req_ompi, false);
489504
}
490505

506+
fh->f_num_of_io_entries = 0;
507+
free (fh->f_io_array);
508+
fh->f_io_array = NULL;
509+
free (decoded_iov);
510+
491511
*request = (ompi_request_t *) ompio_req;
492512
return ret;
493513
}
494514

495-
496515
int mca_common_ompio_file_iread_at (ompio_file_t *fh,
497516
OMPI_MPI_OFFSET_TYPE offset,
498517
void *buf,

0 commit comments

Comments
 (0)