diff --git a/src/archive.c b/src/archive.c index 2ae86bd6a..734602cac 100644 --- a/src/archive.c +++ b/src/archive.c @@ -1375,11 +1375,11 @@ get_wal_file(const char *filename, const char *from_fullpath, #ifdef HAVE_LIBZ /* If requested file is regular WAL segment, then try to open it with '.gz' suffix... */ if (IsXLogFileName(filename)) - rc = fio_send_file_gz(from_fullpath_gz, to_fullpath, out, &errmsg); + rc = fio_send_file_gz(from_fullpath_gz, out, &errmsg); if (rc == FILE_MISSING) #endif /* ... failing that, use uncompressed */ - rc = fio_send_file(from_fullpath, to_fullpath, out, NULL, &errmsg); + rc = fio_send_file(from_fullpath, out, false, NULL, &errmsg); /* When not in prefetch mode, try to use partial file */ if (rc == FILE_MISSING && !prefetch_mode && IsXLogFileName(filename)) @@ -1389,13 +1389,13 @@ get_wal_file(const char *filename, const char *from_fullpath, #ifdef HAVE_LIBZ /* '.gz.partial' goes first ... */ snprintf(from_partial, sizeof(from_partial), "%s.gz.partial", from_fullpath); - rc = fio_send_file_gz(from_partial, to_fullpath, out, &errmsg); + rc = fio_send_file_gz(from_partial, out, &errmsg); if (rc == FILE_MISSING) #endif { /* ... failing that, use '.partial' */ snprintf(from_partial, sizeof(from_partial), "%s.partial", from_fullpath); - rc = fio_send_file(from_partial, to_fullpath, out, NULL, &errmsg); + rc = fio_send_file(from_partial, out, false, NULL, &errmsg); } if (rc == SEND_OK) diff --git a/src/catalog.c b/src/catalog.c index 03099d1a2..561ab876e 100644 --- a/src/catalog.c +++ b/src/catalog.c @@ -1069,6 +1069,7 @@ get_backup_filelist(pgBackup *backup, bool strict) char linked[MAXPGPATH]; char compress_alg_string[MAXPGPATH]; int64 write_size, + uncompressed_size, mode, /* bit length of mode_t depends on platforms */ is_datafile, is_cfs, @@ -1132,6 +1133,11 @@ get_backup_filelist(pgBackup *backup, bool strict) if (get_control_value_int64(buf, "hdr_size", &hdr_size, false)) file->hdr_size = (int) hdr_size; + if (get_control_value_int64(buf, "full_size", &uncompressed_size, false)) + file->uncompressed_size = uncompressed_size; + else + file->uncompressed_size = write_size; + if (file->external_dir_num == 0) set_forkname(file); @@ -2561,6 +2567,11 @@ write_backup_filelist(pgBackup *backup, parray *files, const char *root, file->external_dir_num, file->dbOid); + if (file->uncompressed_size != 0 && + file->uncompressed_size != file->write_size) + len += sprintf(line+len, ",\"full_size\":\"" INT64_FORMAT "\"", + file->uncompressed_size); + if (file->is_datafile) len += sprintf(line+len, ",\"segno\":\"%d\"", file->segno); diff --git a/src/data.c b/src/data.c index 753f247f7..2a8806cde 100644 --- a/src/data.c +++ b/src/data.c @@ -799,6 +799,7 @@ backup_non_data_file(pgFile *file, pgFile *prev_file, * and its mtime is less than parent backup start time ... */ if ((pg_strcasecmp(file->name, RELMAPPER_FILENAME) != 0) && (prev_file && file->exists_in_prev && + file->size == prev_file->size && file->mtime <= parent_backup_time)) { /* @@ -1330,7 +1331,12 @@ restore_non_data_file(parray *parent_chain, pgBackup *dest_backup, if (already_exists) { /* compare checksums of already existing file and backup file */ - pg_crc32 file_crc = fio_get_crc32(to_fullpath, FIO_DB_HOST, false, false); + pg_crc32 file_crc; + if (tmp_file->forkName == cfm && + tmp_file->uncompressed_size > tmp_file->write_size) + file_crc = fio_get_crc32_truncated(to_fullpath, FIO_DB_HOST); + else + file_crc = fio_get_crc32(to_fullpath, FIO_DB_HOST, false, false); if (file_crc == tmp_file->crc) { @@ -1387,10 +1393,12 @@ backup_non_data_file_internal(const char *from_fullpath, const char *to_fullpath, pgFile *file, bool missing_ok) { - FILE *in = NULL; FILE *out = NULL; - ssize_t read_len = 0; - char *buf = NULL; + char *errmsg = NULL; + int rc; + bool cut_zero_tail; + + cut_zero_tail = file->forkName == cfm; INIT_FILE_CRC32(true, file->crc); @@ -1412,107 +1420,44 @@ backup_non_data_file_internal(const char *from_fullpath, /* backup remote file */ if (fio_is_remote(FIO_DB_HOST)) - { - char *errmsg = NULL; - int rc = fio_send_file(from_fullpath, to_fullpath, out, file, &errmsg); + rc = fio_send_file(from_fullpath, out, cut_zero_tail, file, &errmsg); + else + rc = fio_send_file_local(from_fullpath, out, cut_zero_tail, file, &errmsg); - /* handle errors */ - if (rc == FILE_MISSING) - { - /* maybe deleted, it's not error in case of backup */ - if (missing_ok) - { - elog(LOG, "File \"%s\" is not found", from_fullpath); - file->write_size = FILE_NOT_FOUND; - goto cleanup; - } - else - elog(ERROR, "File \"%s\" is not found", from_fullpath); - } - else if (rc == WRITE_FAILED) - elog(ERROR, "Cannot write to \"%s\": %s", to_fullpath, strerror(errno)); - else if (rc != SEND_OK) + /* handle errors */ + if (rc == FILE_MISSING) + { + /* maybe deleted, it's not error in case of backup */ + if (missing_ok) { - if (errmsg) - elog(ERROR, "%s", errmsg); - else - elog(ERROR, "Cannot access remote file \"%s\"", from_fullpath); + elog(LOG, "File \"%s\" is not found", from_fullpath); + file->write_size = FILE_NOT_FOUND; + goto cleanup; } - - pg_free(errmsg); + else + elog(ERROR, "File \"%s\" is not found", from_fullpath); } - /* backup local file */ - else + else if (rc == WRITE_FAILED) + elog(ERROR, "Cannot write to \"%s\": %s", to_fullpath, strerror(errno)); + else if (rc != SEND_OK) { - /* open source file for read */ - in = fopen(from_fullpath, PG_BINARY_R); - if (in == NULL) - { - /* maybe deleted, it's not error in case of backup */ - if (errno == ENOENT) - { - if (missing_ok) - { - elog(LOG, "File \"%s\" is not found", from_fullpath); - file->write_size = FILE_NOT_FOUND; - goto cleanup; - } - else - elog(ERROR, "File \"%s\" is not found", from_fullpath); - } - - elog(ERROR, "Cannot open file \"%s\": %s", from_fullpath, - strerror(errno)); - } - - /* disable stdio buffering for local input/output files to avoid triple buffering */ - setvbuf(in, NULL, _IONBF, BUFSIZ); - setvbuf(out, NULL, _IONBF, BUFSIZ); - - /* allocate 64kB buffer */ - buf = pgut_malloc(CHUNK_SIZE); - - /* copy content and calc CRC */ - for (;;) - { - read_len = fread(buf, 1, CHUNK_SIZE, in); - - if (ferror(in)) - elog(ERROR, "Cannot read from file \"%s\": %s", - from_fullpath, strerror(errno)); - - if (read_len > 0) - { - if (fwrite(buf, 1, read_len, out) != read_len) - elog(ERROR, "Cannot write to file \"%s\": %s", to_fullpath, - strerror(errno)); - - /* update CRC */ - COMP_FILE_CRC32(true, file->crc, buf, read_len); - file->read_size += read_len; - } - - if (feof(in)) - break; - } + if (errmsg) + elog(ERROR, "%s", errmsg); + else + elog(ERROR, "Cannot access remote file \"%s\"", from_fullpath); } - file->write_size = (int64) file->read_size; - - if (file->write_size > 0) - file->uncompressed_size = file->write_size; + file->uncompressed_size = file->read_size; cleanup: + if (errmsg != NULL) + pg_free(errmsg); + /* finish CRC calculation and store into pgFile */ FIN_FILE_CRC32(true, file->crc); - if (in && fclose(in)) - elog(ERROR, "Cannot close the file \"%s\": %s", from_fullpath, strerror(errno)); - if (out && fclose(out)) elog(ERROR, "Cannot close the file \"%s\": %s", to_fullpath, strerror(errno)); - - pg_free(buf); } /* diff --git a/src/dir.c b/src/dir.c index 00e918d0f..73d6db09b 100644 --- a/src/dir.c +++ b/src/dir.c @@ -262,137 +262,6 @@ pgFileDelete(mode_t mode, const char *full_path) } } -/* - * Read the local file to compute its CRC. - * We cannot make decision about file decompression because - * user may ask to backup already compressed files and we should be - * obvious about it. - */ -pg_crc32 -pgFileGetCRC(const char *file_path, bool use_crc32c, bool missing_ok) -{ - FILE *fp; - pg_crc32 crc = 0; - char *buf; - size_t len = 0; - - INIT_FILE_CRC32(use_crc32c, crc); - - /* open file in binary read mode */ - fp = fopen(file_path, PG_BINARY_R); - if (fp == NULL) - { - if (errno == ENOENT) - { - if (missing_ok) - { - FIN_FILE_CRC32(use_crc32c, crc); - return crc; - } - } - - elog(ERROR, "Cannot open file \"%s\": %s", - file_path, strerror(errno)); - } - - /* disable stdio buffering */ - setvbuf(fp, NULL, _IONBF, BUFSIZ); - buf = pgut_malloc(STDIO_BUFSIZE); - - /* calc CRC of file */ - for (;;) - { - if (interrupted) - elog(ERROR, "interrupted during CRC calculation"); - - len = fread(buf, 1, STDIO_BUFSIZE, fp); - - if (ferror(fp)) - elog(ERROR, "Cannot read \"%s\": %s", file_path, strerror(errno)); - - /* update CRC */ - COMP_FILE_CRC32(use_crc32c, crc, buf, len); - - if (feof(fp)) - break; - } - - FIN_FILE_CRC32(use_crc32c, crc); - fclose(fp); - pg_free(buf); - - return crc; -} - -/* - * Read the local file to compute its CRC. - * We cannot make decision about file decompression because - * user may ask to backup already compressed files and we should be - * obvious about it. - */ -pg_crc32 -pgFileGetCRCgz(const char *file_path, bool use_crc32c, bool missing_ok) -{ - gzFile fp; - pg_crc32 crc = 0; - int len = 0; - int err; - char *buf; - - INIT_FILE_CRC32(use_crc32c, crc); - - /* open file in binary read mode */ - fp = gzopen(file_path, PG_BINARY_R); - if (fp == NULL) - { - if (errno == ENOENT) - { - if (missing_ok) - { - FIN_FILE_CRC32(use_crc32c, crc); - return crc; - } - } - - elog(ERROR, "Cannot open file \"%s\": %s", - file_path, strerror(errno)); - } - - buf = pgut_malloc(STDIO_BUFSIZE); - - /* calc CRC of file */ - for (;;) - { - if (interrupted) - elog(ERROR, "interrupted during CRC calculation"); - - len = gzread(fp, buf, STDIO_BUFSIZE); - - if (len <= 0) - { - /* we either run into eof or error */ - if (gzeof(fp)) - break; - else - { - const char *err_str = NULL; - - err_str = gzerror(fp, &err); - elog(ERROR, "Cannot read from compressed file %s", err_str); - } - } - - /* update CRC */ - COMP_FILE_CRC32(use_crc32c, crc, buf, len); - } - - FIN_FILE_CRC32(use_crc32c, crc); - gzclose(fp); - pg_free(buf); - - return crc; -} - void pgFileFree(void *file) { @@ -1812,7 +1681,7 @@ write_database_map(pgBackup *backup, parray *database_map, parray *backup_files_ FIO_BACKUP_HOST); file->crc = pgFileGetCRC(database_map_path, true, false); file->write_size = file->size; - file->uncompressed_size = file->read_size; + file->uncompressed_size = file->size; parray_append(backup_files_list, file); } diff --git a/src/merge.c b/src/merge.c index 1ce49f9a2..79498f48c 100644 --- a/src/merge.c +++ b/src/merge.c @@ -1078,7 +1078,7 @@ merge_files(void *arg) tmp_file->hdr_crc = file->hdr_crc; } else - tmp_file->uncompressed_size = tmp_file->write_size; + tmp_file->uncompressed_size = tmp_file->uncompressed_size; /* Copy header metadata from old map into a new one */ tmp_file->n_headers = file->n_headers; diff --git a/src/pg_probackup.h b/src/pg_probackup.h index bc9f9b8a8..d1d912045 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -345,11 +345,11 @@ typedef enum ShowFormat #define BYTES_INVALID (-1) /* file didn`t changed since previous backup, DELTA backup do not rely on it */ #define FILE_NOT_FOUND (-2) /* file disappeared during backup */ #define BLOCKNUM_INVALID (-1) -#define PROGRAM_VERSION "2.5.8" +#define PROGRAM_VERSION "2.5.9" /* update when remote agent API or behaviour changes */ -#define AGENT_PROTOCOL_VERSION 20501 -#define AGENT_PROTOCOL_VERSION_STR "2.5.1" +#define AGENT_PROTOCOL_VERSION 20509 +#define AGENT_PROTOCOL_VERSION_STR "2.5.9" /* update only when changing storage format */ #define STORAGE_FORMAT_VERSION "2.4.4" @@ -1077,6 +1077,7 @@ extern void fio_pgFileDelete(pgFile *file, const char *full_path); extern void pgFileFree(void *file); extern pg_crc32 pgFileGetCRC(const char *file_path, bool use_crc32c, bool missing_ok); +extern pg_crc32 pgFileGetCRCTruncated(const char *file_path, bool use_crc32c); extern pg_crc32 pgFileGetCRCgz(const char *file_path, bool use_crc32c, bool missing_ok); extern int pgFileMapComparePath(const void *f1, const void *f2); @@ -1240,9 +1241,11 @@ extern int fio_copy_pages(const char *to_fullpath, const char *from_fullpath, pg XLogRecPtr horizonLsn, int calg, int clevel, uint32 checksum_version, bool use_pagemap, BlockNumber *err_blknum, char **errormsg); /* return codes for fio_send_pages */ -extern int fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* out, char **errormsg); -extern int fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out, +extern int fio_send_file_gz(const char *from_fullpath, FILE* out, char **errormsg); +extern int fio_send_file(const char *from_fullpath, FILE* out, bool cut_zero_tail, pgFile *file, char **errormsg); +extern int fio_send_file_local(const char *from_fullpath, FILE* out, bool cut_zero_tail, + pgFile *file, char **errormsg); extern void fio_list_dir(parray *files, const char *root, bool exclude, bool follow_symlink, bool add_root, bool backup_logs, bool skip_hidden, int external_dir_num); diff --git a/src/utils/file.c b/src/utils/file.c index 727b48c60..b4ba30594 100644 --- a/src/utils/file.c +++ b/src/utils/file.c @@ -18,6 +18,10 @@ static __thread int fio_stdin = 0; static __thread int fio_stderr = 0; static char *async_errormsg = NULL; +#define PAGE_ZEROSEARCH_COARSE_GRANULARITY 4096 +#define PAGE_ZEROSEARCH_FINE_GRANULARITY 64 +static const char zerobuf[PAGE_ZEROSEARCH_COARSE_GRANULARITY] = {0}; + fio_location MyLocation; typedef struct @@ -1357,14 +1361,20 @@ fio_sync(char const* path, fio_location location) enum { GET_CRC32_DECOMPRESS = 1, - GET_CRC32_MISSING_OK = 2 + GET_CRC32_MISSING_OK = 2, + GET_CRC32_TRUNCATED = 4 }; /* Get crc32 of file */ -pg_crc32 -fio_get_crc32(const char *file_path, fio_location location, - bool decompress, bool missing_ok) +static pg_crc32 +fio_get_crc32_ex(const char *file_path, fio_location location, + bool decompress, bool missing_ok, bool truncated) { + if (decompress && truncated) + elog(ERROR, "Could not calculate CRC for compressed truncated file"); + if (missing_ok && truncated) + elog(ERROR, "CRC calculation for missing truncated file is forbidden"); + if (fio_is_remote(location)) { fio_header hdr; @@ -1379,6 +1389,8 @@ fio_get_crc32(const char *file_path, fio_location location, hdr.arg = GET_CRC32_DECOMPRESS; if (missing_ok) hdr.arg |= GET_CRC32_MISSING_OK; + if (truncated) + hdr.arg |= GET_CRC32_TRUNCATED; IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); IO_CHECK(fio_write_all(fio_stdout, file_path, path_len), path_len); @@ -1390,11 +1402,26 @@ fio_get_crc32(const char *file_path, fio_location location, { if (decompress) return pgFileGetCRCgz(file_path, true, missing_ok); + else if (truncated) + return pgFileGetCRCTruncated(file_path, true); else return pgFileGetCRC(file_path, true, missing_ok); } } +pg_crc32 +fio_get_crc32(const char *file_path, fio_location location, + bool decompress, bool missing_ok) +{ + return fio_get_crc32_ex(file_path, location, decompress, missing_ok, false); +} + +pg_crc32 +fio_get_crc32_truncated(const char *file_path, fio_location location) +{ + return fio_get_crc32_ex(file_path, location, false, false, true); +} + /* Remove file */ int fio_unlink(char const* path, fio_location location) @@ -2455,7 +2482,7 @@ fio_send_pages_impl(int out, char* buf) * REMOTE_ERROR (-6) */ int -fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* out, char **errormsg) +fio_send_file_gz(const char *from_fullpath, FILE* out, char **errormsg) { fio_header hdr; int exit_code = SEND_OK; @@ -2604,6 +2631,105 @@ fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* out, return exit_code; } +typedef struct send_file_state { + bool calc_crc; + uint32_t crc; + int64_t read_size; + int64_t write_size; +} send_file_state; + +/* find page border of all-zero tail */ +static size_t +find_zero_tail(char *buf, size_t len) +{ + size_t i, l; + size_t granul = sizeof(zerobuf); + + if (len == 0) + return 0; + + /* fast check for last bytes */ + l = Min(len, PAGE_ZEROSEARCH_FINE_GRANULARITY); + i = len - l; + if (memcmp(buf + i, zerobuf, l) != 0) + return len; + + /* coarse search for zero tail */ + i = (len-1) & ~(granul-1); + l = len - i; + for (;;) + { + if (memcmp(buf+i, zerobuf, l) != 0) + { + i += l; + break; + } + if (i == 0) + break; + i -= granul; + l = granul; + } + + len = i; + /* search zero tail with finer granularity */ + for (granul = sizeof(zerobuf)/2; + len > 0 && granul >= PAGE_ZEROSEARCH_FINE_GRANULARITY; + granul /= 2) + { + if (granul > l) + continue; + i = (len-1) & ~(granul-1); + l = len - i; + if (memcmp(buf+i, zerobuf, l) == 0) + len = i; + } + + return len; +} + +static void +fio_send_file_crc(send_file_state* st, char *buf, size_t len) +{ + int64_t write_size; + + if (!st->calc_crc) + return; + + write_size = st->write_size; + while (st->read_size > write_size) + { + size_t crc_len = Min(st->read_size - write_size, sizeof(zerobuf)); + COMP_FILE_CRC32(true, st->crc, zerobuf, crc_len); + write_size += crc_len; + } + + if (len > 0) + COMP_FILE_CRC32(true, st->crc, buf, len); +} + +static bool +fio_send_file_write(FILE* out, send_file_state* st, char *buf, size_t len) +{ + if (len == 0) + return true; + + if (st->read_size > st->write_size && + fseeko(out, st->read_size, SEEK_SET) != 0) + { + return false; + } + + if (fwrite(buf, 1, len, out) != len) + { + return false; + } + + st->read_size += len; + st->write_size = st->read_size; + + return true; +} + /* Receive chunks of data and write them to destination file. * Return codes: * SEND_OK (0) @@ -2616,13 +2742,22 @@ fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* out, * If pgFile is not NULL then we must calculate crc and read_size for it. */ int -fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out, +fio_send_file(const char *from_fullpath, FILE* out, bool cut_zero_tail, pgFile *file, char **errormsg) { fio_header hdr; int exit_code = SEND_OK; size_t path_len = strlen(from_fullpath) + 1; char *buf = pgut_malloc(CHUNK_SIZE); /* buffer */ + send_file_state st = {false, 0, 0, 0}; + + memset(&hdr, 0, sizeof(hdr)); + + if (file) + { + st.calc_crc = true; + st.crc = file->crc; + } hdr.cop = FIO_SEND_FILE; hdr.size = path_len; @@ -2640,6 +2775,37 @@ fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out, if (hdr.cop == FIO_SEND_FILE_EOF) { + if (st.write_size < st.read_size) + { + if (!cut_zero_tail) + { + /* + * We still need to calc crc for zero tail. + */ + fio_send_file_crc(&st, NULL, 0); + + /* + * Let's write single zero byte to the end of file to restore + * logical size. + * Well, it would be better to use ftruncate here actually, + * but then we need to change interface. + */ + st.read_size -= 1; + buf[0] = 0; + if (!fio_send_file_write(out, &st, buf, 1)) + { + exit_code = WRITE_FAILED; + break; + } + } + } + + if (file) + { + file->crc = st.crc; + file->read_size = st.read_size; + file->write_size = st.write_size; + } break; } else if (hdr.cop == FIO_ERROR) @@ -2660,17 +2826,23 @@ fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out, IO_CHECK(fio_read_all(fio_stdin, buf, hdr.size), hdr.size); /* We have received a chunk of data data, lets write it out */ - if (fwrite(buf, 1, hdr.size, out) != hdr.size) + fio_send_file_crc(&st, buf, hdr.size); + if (!fio_send_file_write(out, &st, buf, hdr.size)) { exit_code = WRITE_FAILED; break; } + } + else if (hdr.cop == FIO_PAGE_ZERO) + { + Assert(hdr.size == 0); + Assert(hdr.arg <= CHUNK_SIZE); - if (file) - { - file->read_size += hdr.size; - COMP_FILE_CRC32(true, file->crc, buf, hdr.size); - } + /* + * We have received a chunk of zero data, lets just think we + * wrote it. + */ + st.read_size += hdr.arg; } else { @@ -2686,6 +2858,128 @@ fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out, return exit_code; } +int +fio_send_file_local(const char *from_fullpath, FILE* out, bool cut_zero_tail, + pgFile *file, char **errormsg) +{ + FILE* in; + char* buf; + size_t read_len, non_zero_len; + int exit_code = SEND_OK; + send_file_state st = {false, 0, 0, 0}; + + if (file) + { + st.calc_crc = true; + st.crc = file->crc; + } + + /* open source file for read */ + in = fopen(from_fullpath, PG_BINARY_R); + if (in == NULL) + { + /* maybe deleted, it's not error in case of backup */ + if (errno == ENOENT) + return FILE_MISSING; + + + *errormsg = psprintf("Cannot open file \"%s\": %s", from_fullpath, + strerror(errno)); + return OPEN_FAILED; + } + + /* disable stdio buffering for local input/output files to avoid triple buffering */ + setvbuf(in, NULL, _IONBF, BUFSIZ); + setvbuf(out, NULL, _IONBF, BUFSIZ); + + /* allocate 64kB buffer */ + buf = pgut_malloc(CHUNK_SIZE); + + /* copy content and calc CRC */ + for (;;) + { + read_len = fread(buf, 1, CHUNK_SIZE, in); + + if (ferror(in)) + { + *errormsg = psprintf("Cannot read from file \"%s\": %s", + from_fullpath, strerror(errno)); + exit_code = READ_FAILED; + goto cleanup; + } + + if (read_len > 0) + { + non_zero_len = find_zero_tail(buf, read_len); + /* + * It is dirty trick to silence warnings in CFS GC process: + * backup at least cfs header size bytes. + */ + if (st.read_size + non_zero_len < PAGE_ZEROSEARCH_FINE_GRANULARITY && + st.read_size + read_len > 0) + { + non_zero_len = Min(PAGE_ZEROSEARCH_FINE_GRANULARITY, + st.read_size + read_len); + non_zero_len -= st.read_size; + } + if (non_zero_len > 0) + { + fio_send_file_crc(&st, buf, non_zero_len); + if (!fio_send_file_write(out, &st, buf, non_zero_len)) + { + exit_code = WRITE_FAILED; + goto cleanup; + } + } + if (non_zero_len < read_len) + { + /* Just pretend we wrote it. */ + st.read_size += read_len - non_zero_len; + } + } + + if (feof(in)) + break; + } + + if (st.write_size < st.read_size) + { + if (!cut_zero_tail) + { + /* + * We still need to calc crc for zero tail. + */ + fio_send_file_crc(&st, NULL, 0); + + /* + * Let's write single zero byte to the end of file to restore + * logical size. + * Well, it would be better to use ftruncate here actually, + * but then we need to change interface. + */ + st.read_size -= 1; + buf[0] = 0; + if (!fio_send_file_write(out, &st, buf, 1)) + { + exit_code = WRITE_FAILED; + goto cleanup; + } + } + } + + if (file) + { + file->crc = st.crc; + file->read_size = st.read_size; + file->write_size = st.write_size; + } + +cleanup: + free(buf); + fclose(in); + return exit_code; +} + /* Send file content * On error we return FIO_ERROR message with following codes * FIO_ERROR: @@ -2746,6 +3040,7 @@ fio_send_file_impl(int out, char const* path) for (;;) { read_len = fread(buf, 1, CHUNK_SIZE, fp); + memset(&hdr, 0, sizeof(hdr)); /* report error */ if (ferror(fp)) @@ -2766,10 +3061,22 @@ fio_send_file_impl(int out, char const* path) if (read_len > 0) { /* send chunk */ - hdr.cop = FIO_PAGE; - hdr.size = read_len; - IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr)); - IO_CHECK(fio_write_all(out, buf, read_len), read_len); + size_t non_zero_len = find_zero_tail(buf, read_len); + if (non_zero_len > 0) + { + hdr.cop = FIO_PAGE; + hdr.size = non_zero_len; + IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr)); + IO_CHECK(fio_write_all(out, buf, non_zero_len), non_zero_len); + } + + if (non_zero_len < read_len) + { + hdr.cop = FIO_PAGE_ZERO; + hdr.size = 0; + hdr.arg = read_len - non_zero_len; + IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr)); + } } if (feof(fp)) @@ -2788,6 +3095,193 @@ fio_send_file_impl(int out, char const* path) return; } +/* + * Read the local file to compute its CRC. + * We cannot make decision about file decompression because + * user may ask to backup already compressed files and we should be + * obvious about it. + */ +pg_crc32 +pgFileGetCRC(const char *file_path, bool use_crc32c, bool missing_ok) +{ + FILE *fp; + pg_crc32 crc = 0; + char *buf; + size_t len = 0; + + INIT_FILE_CRC32(use_crc32c, crc); + + /* open file in binary read mode */ + fp = fopen(file_path, PG_BINARY_R); + if (fp == NULL) + { + if (errno == ENOENT) + { + if (missing_ok) + { + FIN_FILE_CRC32(use_crc32c, crc); + return crc; + } + } + + elog(ERROR, "Cannot open file \"%s\": %s", + file_path, strerror(errno)); + } + + /* disable stdio buffering */ + setvbuf(fp, NULL, _IONBF, BUFSIZ); + buf = pgut_malloc(STDIO_BUFSIZE); + + /* calc CRC of file */ + for (;;) + { + if (interrupted) + elog(ERROR, "interrupted during CRC calculation"); + + len = fread(buf, 1, STDIO_BUFSIZE, fp); + + if (ferror(fp)) + elog(ERROR, "Cannot read \"%s\": %s", file_path, strerror(errno)); + + /* update CRC */ + COMP_FILE_CRC32(use_crc32c, crc, buf, len); + + if (feof(fp)) + break; + } + + FIN_FILE_CRC32(use_crc32c, crc); + fclose(fp); + pg_free(buf); + + return crc; +} + +/* + * Read the local file to compute CRC for it extened to real_size. + */ +pg_crc32 +pgFileGetCRCTruncated(const char *file_path, bool use_crc32c) +{ + FILE *fp; + char *buf; + size_t len = 0; + size_t non_zero_len; + send_file_state st = {true, 0, 0, 0}; + + INIT_FILE_CRC32(use_crc32c, st.crc); + + /* open file in binary read mode */ + fp = fopen(file_path, PG_BINARY_R); + if (fp == NULL) + { + elog(ERROR, "Cannot open file \"%s\": %s", + file_path, strerror(errno)); + } + + /* disable stdio buffering */ + setvbuf(fp, NULL, _IONBF, BUFSIZ); + buf = pgut_malloc(CHUNK_SIZE); + + /* calc CRC of file */ + for (;;) + { + if (interrupted) + elog(ERROR, "interrupted during CRC calculation"); + + len = fread(buf, 1, STDIO_BUFSIZE, fp); + + if (ferror(fp)) + elog(ERROR, "Cannot read \"%s\": %s", file_path, strerror(errno)); + + non_zero_len = find_zero_tail(buf, len); + if (non_zero_len) + { + fio_send_file_crc(&st, buf, non_zero_len); + st.write_size += st.read_size + non_zero_len; + } + st.read_size += len; + + if (feof(fp)) + break; + } + + FIN_FILE_CRC32(use_crc32c, st.crc); + fclose(fp); + pg_free(buf); + + return st.crc; +} + +/* + * Read the local file to compute its CRC. + * We cannot make decision about file decompression because + * user may ask to backup already compressed files and we should be + * obvious about it. + */ +pg_crc32 +pgFileGetCRCgz(const char *file_path, bool use_crc32c, bool missing_ok) +{ + gzFile fp; + pg_crc32 crc = 0; + int len = 0; + int err; + char *buf; + + INIT_FILE_CRC32(use_crc32c, crc); + + /* open file in binary read mode */ + fp = gzopen(file_path, PG_BINARY_R); + if (fp == NULL) + { + if (errno == ENOENT) + { + if (missing_ok) + { + FIN_FILE_CRC32(use_crc32c, crc); + return crc; + } + } + + elog(ERROR, "Cannot open file \"%s\": %s", + file_path, strerror(errno)); + } + + buf = pgut_malloc(STDIO_BUFSIZE); + + /* calc CRC of file */ + for (;;) + { + if (interrupted) + elog(ERROR, "interrupted during CRC calculation"); + + len = gzread(fp, buf, STDIO_BUFSIZE); + + if (len <= 0) + { + /* we either run into eof or error */ + if (gzeof(fp)) + break; + else + { + const char *err_str = NULL; + + err_str = gzerror(fp, &err); + elog(ERROR, "Cannot read from compressed file %s", err_str); + } + } + + /* update CRC */ + COMP_FILE_CRC32(use_crc32c, crc, buf, len); + } + + FIN_FILE_CRC32(use_crc32c, crc); + gzclose(fp); + pg_free(buf); + + return crc; +} + /* Compile the array of files located on remote machine in directory root */ static void fio_list_dir_internal(parray *files, const char *root, bool exclude, @@ -3387,9 +3881,13 @@ fio_communicate(int in, int out) IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr)); break; case FIO_GET_CRC32: + Assert((hdr.arg & GET_CRC32_TRUNCATED) == 0 || + (hdr.arg & GET_CRC32_TRUNCATED) == GET_CRC32_TRUNCATED); /* calculate crc32 for a file */ if ((hdr.arg & GET_CRC32_DECOMPRESS)) crc = pgFileGetCRCgz(buf, true, (hdr.arg & GET_CRC32_MISSING_OK) != 0); + else if ((hdr.arg & GET_CRC32_TRUNCATED)) + crc = pgFileGetCRCTruncated(buf, true); else crc = pgFileGetCRC(buf, true, (hdr.arg & GET_CRC32_MISSING_OK) != 0); IO_CHECK(fio_write_all(out, &crc, sizeof(crc)), sizeof(crc)); diff --git a/src/utils/file.h b/src/utils/file.h index ec478b451..890babf55 100644 --- a/src/utils/file.h +++ b/src/utils/file.h @@ -56,7 +56,8 @@ typedef enum FIO_CHECK_POSTMASTER, FIO_GET_ASYNC_ERROR, FIO_WRITE_ASYNC, - FIO_READLINK + FIO_READLINK, + FIO_PAGE_ZERO } fio_operations; typedef enum @@ -122,6 +123,7 @@ extern void fio_disconnect(void); extern int fio_sync(char const* path, fio_location location); extern pg_crc32 fio_get_crc32(const char *file_path, fio_location location, bool decompress, bool missing_ok); +extern pg_crc32 fio_get_crc32_truncated(const char *file_path, fio_location location); extern int fio_rename(char const* old_path, char const* new_path, fio_location location); extern int fio_symlink(char const* target, char const* link_path, bool overwrite, fio_location location); diff --git a/tests/cfs_backup.py b/tests/cfs_backup.py index 436db31e7..306c2396c 100644 --- a/tests/cfs_backup.py +++ b/tests/cfs_backup.py @@ -171,12 +171,18 @@ def test_fullbackup_after_create_table(self): "ERROR: File pg_compression not found in {0}".format( os.path.join(self.backup_dir, 'node', backup_id)) ) - self.assertTrue( - find_by_extensions( - [os.path.join(self.backup_dir, 'backups', 'node', backup_id)], - ['.cfm']), - "ERROR: .cfm files not found in backup dir" - ) + + # check cfm size + cfms = find_by_extensions( + [os.path.join(self.backup_dir, 'backups', 'node', backup_id)], + ['.cfm']) + self.assertTrue(cfms, "ERROR: .cfm files not found in backup dir") + for cfm in cfms: + size = os.stat(cfm).st_size + self.assertLessEqual(size, 4096, + "ERROR: {0} is not truncated (has size {1} > 4096)".format( + cfm, size + )) # @unittest.expectedFailure # @unittest.skip("skip") diff --git a/tests/cfs_catchup.py b/tests/cfs_catchup.py new file mode 100644 index 000000000..2cbb46729 --- /dev/null +++ b/tests/cfs_catchup.py @@ -0,0 +1,127 @@ +import os +import unittest +import random +import shutil + +from .helpers.cfs_helpers import find_by_extensions, find_by_name, find_by_pattern, corrupt_file +from .helpers.ptrack_helpers import ProbackupTest, ProbackupException + +module_name = 'cfs_catchup' +tblspace_name = 'cfs_tblspace' + + +class CfsCatchupNoEncTest(ProbackupTest, unittest.TestCase): + def setUp(self): + self.fname = self.id().split('.')[3] + + @unittest.skipUnless(ProbackupTest.enterprise, 'skip') + def test_full_catchup_with_tablespace(self): + """ + Test tablespace transfers + """ + # preparation + src_pg = self.make_simple_node( + base_dir = os.path.join(module_name, self.fname, 'src'), + set_replication = True + ) + src_pg.slow_start() + tblspace1_old_path = self.get_tblspace_path(src_pg, 'tblspace1_old') + self.create_tblspace_in_node(src_pg, 'tblspace1', tblspc_path = tblspace1_old_path, cfs=True) + src_pg.safe_psql( + "postgres", + "CREATE TABLE ultimate_question TABLESPACE tblspace1 AS SELECT 42 AS answer") + src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + src_pg.safe_psql( + "postgres", + "CHECKPOINT") + + # do full catchup with tablespace mapping + dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst')) + tblspace1_new_path = self.get_tblspace_path(dst_pg, 'tblspace1_new') + self.catchup_node( + backup_mode = 'FULL', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = [ + '-d', 'postgres', + '-p', str(src_pg.port), + '--stream', + '-T', '{0}={1}'.format(tblspace1_old_path, tblspace1_new_path) + ] + ) + + # 1st check: compare data directories + self.compare_pgdata( + self.pgdata_content(src_pg.data_dir), + self.pgdata_content(dst_pg.data_dir) + ) + + # check cfm size + cfms = find_by_extensions([os.path.join(dst_pg.data_dir)], ['.cfm']) + self.assertTrue(cfms, "ERROR: .cfm files not found in backup dir") + for cfm in cfms: + size = os.stat(cfm).st_size + self.assertLessEqual(size, 4096, + "ERROR: {0} is not truncated (has size {1} > 4096)".format( + cfm, size + )) + + # make changes in master tablespace + src_pg.safe_psql( + "postgres", + "UPDATE ultimate_question SET answer = -1") + src_pg.safe_psql( + "postgres", + "CHECKPOINT") + + # run&recover catchup'ed instance + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start() + + # 2nd check: run verification query + dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy') + + # and now delta backup + dst_pg.stop() + + self.catchup_node( + backup_mode = 'DELTA', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = [ + '-d', 'postgres', + '-p', str(src_pg.port), + '--stream', + '-T', '{0}={1}'.format(tblspace1_old_path, tblspace1_new_path) + ] + ) + + # check cfm size again + cfms = find_by_extensions([os.path.join(dst_pg.data_dir)], ['.cfm']) + self.assertTrue(cfms, "ERROR: .cfm files not found in backup dir") + for cfm in cfms: + size = os.stat(cfm).st_size + self.assertLessEqual(size, 4096, + "ERROR: {0} is not truncated (has size {1} > 4096)".format( + cfm, size + )) + + # run&recover catchup'ed instance + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start() + + + # 3rd check: run verification query + src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy') + + # Cleanup + src_pg.stop() + dst_pg.stop() + self.del_test_dir(module_name, self.fname) diff --git a/tests/expected/option_version.out b/tests/expected/option_version.out index 4de288907..7c9fcbfe0 100644 --- a/tests/expected/option_version.out +++ b/tests/expected/option_version.out @@ -1 +1 @@ -pg_probackup 2.5.8 +pg_probackup 2.5.9 diff --git a/tests/helpers/ptrack_helpers.py b/tests/helpers/ptrack_helpers.py index d800f0d3e..abb715b7e 100644 --- a/tests/helpers/ptrack_helpers.py +++ b/tests/helpers/ptrack_helpers.py @@ -1709,8 +1709,18 @@ def pgdata_content(self, pgdata, ignore_ptrack=True, exclude_dirs=None): file_relpath = os.path.relpath(file_fullpath, pgdata) directory_dict['files'][file_relpath] = {'is_datafile': False} with open(file_fullpath, 'rb') as f: - directory_dict['files'][file_relpath]['md5'] = hashlib.md5(f.read()).hexdigest() - f.close() + content = f.read() + # truncate cfm's content's zero tail + if file_relpath.endswith('.cfm'): + zero64 = b"\x00"*64 + l = len(content) + while l > 64: + s = (l - 1) & ~63 + if content[s:l] != zero64[:l-s]: + break + l = s + content = content[:l] + directory_dict['files'][file_relpath]['md5'] = hashlib.md5(content).hexdigest() # directory_dict['files'][file_relpath]['md5'] = hashlib.md5( # f = open(file_fullpath, 'rb').read()).hexdigest()