@@ -1044,6 +1044,86 @@ int fio_stat(char const* path, struct stat* st, bool follow_symlink, fio_locatio
1044
1044
}
1045
1045
}
1046
1046
1047
+ /*
1048
+ * Calculate size of the file without trailing spaces in the end.
1049
+ * Save result in statbuf->st_size.
1050
+ *
1051
+ * It is used to avoid sending trailing zeros in CFS map files.
1052
+ */
1053
+ static int getFileNonZeroSize (const char * path , struct stat * statbuf )
1054
+ {
1055
+ char buf [BLCKSZ ];
1056
+ uint64 * word = (uint64 * )buf ;
1057
+ pgoff_t size ;
1058
+ int i ;
1059
+ FILE * fp ;
1060
+
1061
+ stat (path , statbuf );
1062
+ size = statbuf -> st_size ;
1063
+
1064
+ fp = fopen (path , PG_BINARY_R );
1065
+
1066
+ while (size > BLCKSZ && fseek (fp , size - BLCKSZ , SEEK_SET ) == 0 )
1067
+ {
1068
+ int rc = fread (buf , 1 , BLCKSZ , fp );
1069
+
1070
+ if (rc != BLCKSZ )
1071
+ break ;
1072
+
1073
+ for (i = 0 ; i < BLCKSZ /8 ; i ++ )
1074
+ {
1075
+ if (word [i ] != 0 )
1076
+ goto stop ;
1077
+ }
1078
+ size -= BLCKSZ ;
1079
+ }
1080
+ stop :
1081
+
1082
+ statbuf -> st_size = size ;
1083
+ fclose (fp );
1084
+
1085
+ //TODO handle possible errors
1086
+ return 0 ;
1087
+ }
1088
+
1089
+ /*
1090
+ * This function is wrapper for both local and remote calls.
1091
+ *
1092
+ * Calculate size of the file without trailing spaces in the end.
1093
+ * Return result via statbuf->st_size.
1094
+ */
1095
+ int fio_find_non_zero_size (char const * path , struct stat * st , bool remote )
1096
+ {
1097
+ if (remote )
1098
+ {
1099
+ fio_header hdr ;
1100
+ size_t path_len = strlen (path ) + 1 ;
1101
+
1102
+ hdr .cop = FIO_NON_ZERO_SIZE ;
1103
+ hdr .arg = 0 ;
1104
+ hdr .handle = -1 ;
1105
+ hdr .size = path_len ;
1106
+
1107
+ IO_CHECK (fio_write_all (fio_stdout , & hdr , sizeof (hdr )), sizeof (hdr ));
1108
+ IO_CHECK (fio_write_all (fio_stdout , path , path_len ), path_len );
1109
+
1110
+ IO_CHECK (fio_read_all (fio_stdin , & hdr , sizeof (hdr )), sizeof (hdr ));
1111
+ Assert (hdr .cop == FIO_NON_ZERO_SIZE );
1112
+ IO_CHECK (fio_read_all (fio_stdin , st , sizeof (* st )), sizeof (* st ));
1113
+
1114
+ if (hdr .arg != 0 )
1115
+ {
1116
+ errno = hdr .arg ;
1117
+ return -1 ;
1118
+ }
1119
+ return 0 ;
1120
+ }
1121
+ else
1122
+ {
1123
+ return getFileNonZeroSize (path , st );
1124
+ }
1125
+ }
1126
+
1047
1127
/* Check presence of the file */
1048
1128
int fio_access (char const * path , int mode , fio_location location )
1049
1129
{
@@ -2085,6 +2165,7 @@ int fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* o
2085
2165
z_stream * strm = NULL ;
2086
2166
2087
2167
hdr .cop = FIO_SEND_FILE ;
2168
+ hdr .arg = 0 ; //read till EOF
2088
2169
hdr .size = path_len ;
2089
2170
2090
2171
// elog(VERBOSE, "Thread [%d]: Attempting to open remote compressed WAL file '%s'",
@@ -2223,6 +2304,7 @@ int fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* o
2223
2304
return exit_code ;
2224
2305
}
2225
2306
2307
+
2226
2308
/* Receive chunks of data and write them to destination file.
2227
2309
* Return codes:
2228
2310
* SEND_OK (0)
@@ -2242,7 +2324,19 @@ int fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out,
2242
2324
size_t path_len = strlen (from_fullpath ) + 1 ;
2243
2325
char * buf = pgut_malloc (CHUNK_SIZE ); /* buffer */
2244
2326
2327
+ struct stat statbuf ;
2328
+ ssize_t cfm_non_zero_size = 0 ;
2329
+
2330
+
2331
+ if (file && file -> forkName == cfm )
2332
+ {
2333
+ if (fio_find_non_zero_size (from_fullpath , & statbuf , true) != 0 )
2334
+ elog (ERROR , "fio_find_non_zero_size failed" );
2335
+ cfm_non_zero_size = statbuf .st_size ;
2336
+ }
2337
+
2245
2338
hdr .cop = FIO_SEND_FILE ;
2339
+ hdr .arg = cfm_non_zero_size ; //read till this length
2246
2340
hdr .size = path_len ;
2247
2341
2248
2342
// elog(VERBOSE, "Thread [%d]: Attempting to open remote WAL file '%s'",
@@ -2314,13 +2408,19 @@ int fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out,
2314
2408
* FIO_PAGE
2315
2409
* FIO_SEND_FILE_EOF
2316
2410
*
2411
+ * If we only want to read a part of the file, pass len_wanted argument.
2412
+ * Currently we do not differentiate exit codes and return with FIO_SEND_FILE_EOF
2413
+ * when expected amount of bytes was send.
2414
+ *
2415
+ * len_wanted == 0 means read till the end of file.
2317
2416
*/
2318
- static void fio_send_file_impl (int out , char const * path )
2417
+ static void fio_send_file_impl (int out , char const * path , size_t len_wanted )
2319
2418
{
2320
2419
FILE * fp ;
2321
2420
fio_header hdr ;
2322
2421
char * buf = pgut_malloc (CHUNK_SIZE );
2323
2422
size_t read_len = 0 ;
2423
+ size_t real_send_len = 0 ;
2324
2424
char * errormsg = NULL ;
2325
2425
2326
2426
/* open source file for read */
@@ -2389,6 +2489,10 @@ static void fio_send_file_impl(int out, char const* path)
2389
2489
IO_CHECK (fio_write_all (out , buf , read_len ), read_len );
2390
2490
}
2391
2491
2492
+ real_send_len += read_len ;
2493
+ if (len_wanted && real_send_len >= len_wanted )
2494
+ break ; //TODO pass real_send_len somewhere
2495
+
2392
2496
if (feof (fp ))
2393
2497
break ;
2394
2498
}
@@ -2543,6 +2647,7 @@ static void fio_list_dir_impl(int out, char* buf)
2543
2647
fio_file .linked_len = 0 ;
2544
2648
2545
2649
hdr .cop = FIO_SEND_FILE ;
2650
+ hdr .arg = 0 ; //read till EOF
2546
2651
hdr .size = strlen (file -> rel_path ) + 1 ;
2547
2652
2548
2653
/* send rel_path first */
@@ -2931,7 +3036,7 @@ void fio_communicate(int in, int out)
2931
3036
fio_send_pages_impl (out , buf );
2932
3037
break ;
2933
3038
case FIO_SEND_FILE :
2934
- fio_send_file_impl (out , buf );
3039
+ fio_send_file_impl (out , buf , hdr . arg );
2935
3040
break ;
2936
3041
case FIO_SYNC :
2937
3042
/* open file and fsync it */
@@ -2980,6 +3085,14 @@ void fio_communicate(int in, int out)
2980
3085
case FIO_GET_ASYNC_ERROR :
2981
3086
fio_get_async_error_impl (out );
2982
3087
break ;
3088
+ case FIO_NON_ZERO_SIZE : /* Get non-zero length of the file in specified path.
3089
+ * Currently used for cfm optimization */
3090
+ hdr .size = sizeof (st );
3091
+ rc = getFileNonZeroSize (buf , & st );
3092
+ hdr .arg = rc < 0 ? errno : 0 ;
3093
+ IO_CHECK (fio_write_all (out , & hdr , sizeof (hdr )), sizeof (hdr ));
3094
+ IO_CHECK (fio_write_all (out , & st , sizeof (st )), sizeof (st ));
3095
+ break ;
2983
3096
default :
2984
3097
Assert (false);
2985
3098
}
0 commit comments