Skip to content

Commit 615f4ef

Browse files
committed
fbtl/posix: first cut on atomicity support
tests pass, but some more cleanup required. Signed-off-by: Edgar Gabriel <[email protected]>
1 parent ab3ae3c commit 615f4ef

File tree

7 files changed

+214
-92
lines changed

7 files changed

+214
-92
lines changed

ompi/mca/fbtl/posix/fbtl_posix.c

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -144,34 +144,38 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
144144
data->aio_req_status[i] = EINPROGRESS;
145145
start_offset = data->aio_reqs[i].aio_offset;
146146
total_length = data->aio_reqs[i].aio_nbytes;
147+
/* release previous lock */
148+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
149+
147150
if ( data->aio_req_type == FBTL_POSIX_WRITE ) {
148-
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
151+
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length,
152+
OMPIO_LOCK_ENTIRE_REGION, &data->aio_lock_counter );
149153
if ( 0 < ret_code ) {
150154
opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
151155
/* Just in case some part of the lock actually succeeded. */
152-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
156+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
153157
return OMPI_ERROR;
154158
}
155159
if (-1 == aio_write(&data->aio_reqs[i])) {
156160
opal_output(1, "mca_fbtl_posix_progress: error in aio_write()");
157-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
161+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
158162
return OMPI_ERROR;
159163
}
160164
}
161165
else if ( data->aio_req_type == FBTL_POSIX_READ ) {
162-
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
166+
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length,
167+
OMPIO_LOCK_ENTIRE_REGION, &data->aio_lock_counter );
163168
if ( 0 < ret_code ) {
164169
opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
165170
/* Just in case some part of the lock actually succeeded. */
166-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
171+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
167172
return OMPI_ERROR;
168173
}
169174
if (-1 == aio_read(&data->aio_reqs[i])) {
170175
opal_output(1, "mca_fbtl_posix_progress: error in aio_read()");
171-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
176+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
172177
return OMPI_ERROR;
173178
}
174-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
175179
}
176180
}
177181
else {
@@ -199,10 +203,9 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
199203
#if 0
200204
printf("lcount=%d open_reqs=%d\n", lcount, data->aio_open_reqs );
201205
#endif
202-
203206
if ( (lcount == data->aio_req_chunks) && (0 != data->aio_open_reqs )) {
204207
/* release the lock of the previous operations */
205-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
208+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
206209

207210
/* post the next batch of operations */
208211
data->aio_first_active_req = data->aio_last_active_req;
@@ -218,30 +221,32 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
218221
total_length = (end_offset - start_offset);
219222

220223
if ( FBTL_POSIX_READ == data->aio_req_type ) {
221-
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
224+
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length,
225+
OMPIO_LOCK_ENTIRE_REGION, &data->aio_lock_counter );
222226
}
223227
else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
224-
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
228+
ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length,
229+
OMPIO_LOCK_ENTIRE_REGION, &data->aio_lock_counter );
225230
}
226231
if ( 0 < ret_code ) {
227232
opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
228233
/* Just in case some part of the lock actually succeeded. */
229-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
234+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
230235
return OMPI_ERROR;
231236
}
232237

233238
for ( i=data->aio_first_active_req; i< data->aio_last_active_req; i++ ) {
234239
if ( FBTL_POSIX_READ == data->aio_req_type ) {
235240
if (-1 == aio_read(&data->aio_reqs[i])) {
236241
opal_output(1, "mca_fbtl_posix_progress: error in aio_read()");
237-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
242+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
238243
return OMPI_ERROR;
239244
}
240245
}
241246
else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
242247
if (-1 == aio_write(&data->aio_reqs[i])) {
243248
opal_output(1, "mca_fbtl_posix_progress: error in aio_write()");
244-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
249+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
245250
return OMPI_ERROR;
246251
}
247252
}
@@ -255,8 +260,13 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
255260
/* all pending operations are finished for this request */
256261
req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
257262
req->req_ompi.req_status._ucount = data->aio_total_len;
258-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
259-
ret = true;
263+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
264+
265+
if ( data->aio_fh->f_atomicity ) {
266+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
267+
}
268+
269+
ret = true;
260270
}
261271
#endif
262272
return ret;
@@ -268,8 +278,8 @@ void mca_fbtl_posix_request_free ( mca_ompio_request_t *req)
268278
/* Free the fbtl specific data structures */
269279
mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
270280
if (NULL != data ) {
271-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
272-
if ( NULL != data->aio_reqs ) {
281+
282+
if ( NULL != data->aio_reqs ) {
273283
free ( data->aio_reqs);
274284
}
275285
if ( NULL != data->aio_req_status ) {

ompi/mca/fbtl/posix/fbtl_posix.h

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
* University of Stuttgart. All rights reserved.
1010
* Copyright (c) 2004-2005 The Regents of the University of California.
1111
* All rights reserved.
12-
* Copyright (c) 2008-2020 University of Houston. All rights reserved.
12+
* Copyright (c) 2008-2021 University of Houston. All rights reserved.
1313
* Copyright (c) 2018 Research Organization for Information Science
1414
* and Technology (RIST). All rights reserved.
1515
* $COPYRIGHT$
@@ -66,8 +66,9 @@ bool mca_fbtl_posix_progress ( mca_ompio_request_t *req);
6666
void mca_fbtl_posix_request_free ( mca_ompio_request_t *req);
6767

6868
int mca_fbtl_posix_lock ( struct flock *lock, ompio_file_t *fh, int op,
69-
OMPI_MPI_OFFSET_TYPE iov_offset, off_t len, int flags);
70-
void mca_fbtl_posix_unlock ( struct flock *lock, ompio_file_t *fh );
69+
OMPI_MPI_OFFSET_TYPE iov_offset, off_t len, int flags,
70+
int *lock_counter);
71+
void mca_fbtl_posix_unlock ( struct flock *lock, ompio_file_t *fh, int *lock_counter );
7172

7273

7374
struct mca_fbtl_posix_request_data_t {
@@ -78,9 +79,10 @@ struct mca_fbtl_posix_request_data_t {
7879
int aio_first_active_req; /* first active posted req */
7980
int aio_last_active_req; /* last currently active poted req */
8081
struct aiocb *aio_reqs; /* pointer array of req structures */
81-
int *aio_req_status; /* array of statuses */
82+
int *aio_req_status; /* array of statuses */
8283
ssize_t aio_total_len; /* total amount of data written */
8384
struct flock aio_lock; /* lock used for certain file systems */
85+
int aio_lock_counter; /* to keep track of no. of lock calls */
8486
ompio_file_t *aio_fh; /* pointer back to the mca_io_ompio_fh structure */
8587
};
8688
typedef struct mca_fbtl_posix_request_data_t mca_fbtl_posix_request_data_t;

ompi/mca/fbtl/posix/fbtl_posix_ipreadv.c

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,24 @@ ssize_t mca_fbtl_posix_ipreadv (ompio_file_t *fh,
7070
free(data);
7171
return 0;
7272
}
73+
data->aio_lock_counter = 0;
7374
data->aio_fh = fh;
7475

76+
if ( fh->f_atomicity ) {
77+
// save flags and disable file system specific requirements
78+
int32_t orig_flags = fh->f_flags;
79+
fh->f_flags &= ~OMPIO_LOCK_NEVER;
80+
fh->f_flags &= ~OMPIO_LOCK_NOT_THIS_OP;
81+
82+
// Set a lock on the entire region that is being modified
83+
off_t end_offset = (off_t)fh->f_io_array[fh->f_num_of_io_entries-1].offset +
84+
(off_t)fh->f_io_array[fh->f_num_of_io_entries-1].length;
85+
off_t len = end_offset - (off_t)fh->f_io_array[0].offset;
86+
ret = mca_fbtl_posix_lock ( &data->aio_lock, fh, F_RDLCK, (off_t)fh->f_io_array[0].offset,
87+
len, OMPIO_LOCK_ENTIRE_REGION, &data->aio_lock_counter);
88+
fh->f_flags = orig_flags;
89+
}
90+
7591
for ( i=0; i<fh->f_num_of_io_entries; i++ ) {
7692
data->aio_reqs[i].aio_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)
7793
fh->f_io_array[i].offset;
@@ -94,10 +110,11 @@ ssize_t mca_fbtl_posix_ipreadv (ompio_file_t *fh,
94110
start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset;
95111
end_offset = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes;
96112
total_length = (end_offset - start_offset);
97-
ret = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
113+
ret = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length,
114+
OMPIO_LOCK_ENTIRE_REGION, &data->aio_lock_counter );
98115
if ( 0 < ret ) {
99116
opal_output(1, "mca_fbtl_posix_ipreadv: error in mca_fbtl_posix_lock() error ret=%d %s", ret, strerror(errno));
100-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
117+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
101118
free(data->aio_reqs);
102119
free(data->aio_req_status);
103120
free(data);
@@ -115,7 +132,7 @@ ssize_t mca_fbtl_posix_ipreadv (ompio_file_t *fh,
115132
}
116133
if ( MAX_ATTEMPTS == counter ) {
117134
opal_output(1, "mca_fbtl_posix_ipreadv: error in aio_read(): errno %d %s", errno, strerror(errno));
118-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
135+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
119136
free(data->aio_reqs);
120137
free(data->aio_req_status);
121138
free(data);

ompi/mca/fbtl/posix/fbtl_posix_ipwritev.c

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,24 @@ ssize_t mca_fbtl_posix_ipwritev (ompio_file_t *fh,
6969
free(data);
7070
return 0;
7171
}
72+
data->aio_lock_counter = 0;
7273
data->aio_fh = fh;
74+
if ( fh->f_atomicity ) {
75+
// save flags and disable file system specific requirements
76+
int32_t orig_flags = fh->f_flags;
77+
fh->f_flags &= ~OMPIO_LOCK_NEVER;
78+
fh->f_flags &= ~OMPIO_LOCK_NOT_THIS_OP;
7379

80+
// Set a lock on the entire region that is being modified
81+
off_t end_offset = (off_t)fh->f_io_array[fh->f_num_of_io_entries-1].offset +
82+
(off_t)fh->f_io_array[fh->f_num_of_io_entries-1].length;
83+
off_t len = end_offset - (off_t)fh->f_io_array[0].offset;
84+
ret = mca_fbtl_posix_lock ( &data->aio_lock, fh, F_WRLCK, (off_t)fh->f_io_array[0].offset,
85+
len, OMPIO_LOCK_ENTIRE_REGION, &data->aio_lock_counter);
86+
fh->f_flags = orig_flags;
87+
}
88+
89+
7490
for ( i=0; i<fh->f_num_of_io_entries; i++ ) {
7591
data->aio_reqs[i].aio_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)
7692
fh->f_io_array[i].offset;
@@ -93,10 +109,11 @@ ssize_t mca_fbtl_posix_ipwritev (ompio_file_t *fh,
93109
start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset;
94110
end_offset = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes;
95111
total_length = (end_offset - start_offset);
96-
ret = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
112+
ret = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length,
113+
OMPIO_LOCK_ENTIRE_REGION, &data->aio_lock_counter );
97114
if ( 0 < ret ) {
98115
opal_output(1, "mca_fbtl_posix_ipwritev: error in mca_fbtl_posix_lock() error ret=%d %s", ret, strerror(errno));
99-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
116+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
100117
free(data->aio_reqs);
101118
free(data->aio_req_status);
102119
free(data);
@@ -114,7 +131,7 @@ ssize_t mca_fbtl_posix_ipwritev (ompio_file_t *fh,
114131
}
115132
if ( MAX_ATTEMPTS == counter ) {
116133
opal_output(1, "mca_fbtl_posix_ipwritev: error in aio_write(): %s", strerror(errno));
117-
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
134+
mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh, &data->aio_lock_counter );
118135
free(data->aio_req_status);
119136
free(data->aio_reqs);
120137
free(data);

ompi/mca/fbtl/posix/fbtl_posix_lock.c

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
* University of Stuttgart. All rights reserved.
1010
* Copyright (c) 2004-2005 The Regents of the University of California.
1111
* All rights reserved.
12-
* Copyright (c) 2017 University of Houston. All rights reserved.
12+
* Copyright (c) 2017-2021 University of Houston. All rights reserved.
1313
* Copyright (c) 2018 Research Organization for Information Science
1414
* and Technology (RIST). All rights reserved.
1515
* $COPYRIGHT$
@@ -23,13 +23,17 @@
2323
#include "fbtl_posix.h"
2424

2525
#include "mpi.h"
26+
27+
#include <fcntl.h>
28+
2629
#include <unistd.h>
2730
#include <sys/uio.h>
2831
#include <errno.h>
2932
#include <limits.h>
3033
#include "ompi/constants.h"
3134
#include "ompi/mca/fbtl/fbtl.h"
3235

36+
3337
#define MAX_ERRCOUNT 100
3438

3539
/*
@@ -44,15 +48,27 @@
4448
*/
4549

4650
int mca_fbtl_posix_lock ( struct flock *lock, ompio_file_t *fh, int op,
47-
OMPI_MPI_OFFSET_TYPE offset, off_t len, int flags)
51+
OMPI_MPI_OFFSET_TYPE offset, off_t len, int flags,
52+
int *lock_counter)
4853
{
4954
off_t lmod, bmod;
5055
int ret, err_count;
5156

57+
*lock_counter = *lock_counter + 1;
58+
if ( (*lock_counter) > 1 ) {
59+
/*
60+
** This lock was already initialized, most likely through an atomicity operation.
61+
** No need to do anything except increment the lock_counter;
62+
*/
63+
return 0;
64+
}
65+
5266
lock->l_type = op;
5367
lock->l_whence = SEEK_SET;
5468
lock->l_start =-1;
5569
lock->l_len =-1;
70+
lock->l_pid = 0;
71+
5672
if ( 0 == len ) {
5773
return 0;
5874
}
@@ -117,9 +133,9 @@ int mca_fbtl_posix_lock ( struct flock *lock, ompio_file_t *fh, int op,
117133
printf("%d: acquiring lock for offset %ld length %ld requested offset %ld request len %ld \n",
118134
fh->f_rank, lock->l_start, lock->l_len, offset, len);
119135
#endif
120-
errno=0;
121136
err_count=0;
122137
do {
138+
errno=0;
123139
ret = fcntl ( fh->fd, F_SETLKW, lock);
124140
if ( ret ) {
125141
#ifdef OMPIO_DEBUG
@@ -133,8 +149,13 @@ int mca_fbtl_posix_lock ( struct flock *lock, ompio_file_t *fh, int op,
133149
return ret;
134150
}
135151

136-
void mca_fbtl_posix_unlock ( struct flock *lock, ompio_file_t *fh )
152+
void mca_fbtl_posix_unlock ( struct flock *lock, ompio_file_t *fh, int *lock_counter)
137153
{
154+
*lock_counter = *lock_counter - 1;
155+
if ( (*lock_counter) > 0 ) {
156+
return;
157+
}
158+
138159
if ( -1 == lock->l_start && -1 == lock->l_len ) {
139160
return;
140161
}

0 commit comments

Comments
 (0)