Skip to content

Release 2 6 refactoring #464

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Jan 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 68 additions & 44 deletions src/archive.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* archive.c: - pg_probackup specific archive commands for archive backups.
*
*
* Portions Copyright (c) 2018-2021, Postgres Professional
* Portions Copyright (c) 2018-2022, Postgres Professional
*
*-------------------------------------------------------------------------
*/
Expand Down Expand Up @@ -364,7 +364,7 @@ push_file(WALSegno *xlogfile, const char *archive_status_dir,
elog(VERBOSE, "Rename \"%s\" to \"%s\"", wal_file_ready, wal_file_done);

/* do not error out, if rename failed */
if (fio_rename(wal_file_ready, wal_file_done, FIO_DB_HOST) < 0)
if (fio_rename(FIO_DB_HOST, wal_file_ready, wal_file_done) < 0)
elog(WARNING, "Cannot rename ready file \"%s\" to \"%s\": %s",
wal_file_ready, wal_file_done, strerror(errno));
}
Expand Down Expand Up @@ -418,7 +418,7 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
snprintf(to_fullpath_part, sizeof(to_fullpath_part), "%s.part", to_fullpath);

/* Grab lock by creating temp file in exclusive mode */
out = fio_open(to_fullpath_part, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, FIO_BACKUP_HOST);
out = fio_open(FIO_BACKUP_HOST, to_fullpath_part, O_RDWR | O_CREAT | O_EXCL | PG_BINARY);
if (out < 0)
{
if (errno != EEXIST)
Expand All @@ -444,12 +444,12 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d

while (partial_try_count < archive_timeout)
{
if (fio_stat(to_fullpath_part, &st, false, FIO_BACKUP_HOST) < 0)
if (fio_stat(FIO_BACKUP_HOST, to_fullpath_part, &st, false) < 0)
{
if (errno == ENOENT)
{
//part file is gone, lets try to grab it
out = fio_open(to_fullpath_part, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, FIO_BACKUP_HOST);
out = fio_open(FIO_BACKUP_HOST, to_fullpath_part, O_RDWR | O_CREAT | O_EXCL | PG_BINARY);
if (out < 0)
{
if (errno != EEXIST)
Expand Down Expand Up @@ -497,9 +497,10 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d

/* Partial segment is considered stale, so reuse it */
elog(LOG, "Reusing stale temp WAL file \"%s\"", to_fullpath_part);
fio_unlink(to_fullpath_part, FIO_BACKUP_HOST);
if (fio_remove(FIO_BACKUP_HOST, to_fullpath_part, false) != 0)
elog(ERROR, "Cannot remove stale temp WAL file \"%s\": %s", to_fullpath_part, strerror(errno));

out = fio_open(to_fullpath_part, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, FIO_BACKUP_HOST);
out = fio_open(FIO_BACKUP_HOST, to_fullpath_part, O_RDWR | O_CREAT | O_EXCL | PG_BINARY);
if (out < 0)
elog(ERROR, "Cannot open temp WAL file \"%s\": %s", to_fullpath_part, strerror(errno));
}
Expand All @@ -512,8 +513,8 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
pg_crc32 crc32_src;
pg_crc32 crc32_dst;

crc32_src = fio_get_crc32(from_fullpath, FIO_DB_HOST, false);
crc32_dst = fio_get_crc32(to_fullpath, FIO_BACKUP_HOST, false);
crc32_src = fio_get_crc32(FIO_DB_HOST, from_fullpath, false);
crc32_dst = fio_get_crc32(FIO_BACKUP_HOST, to_fullpath, false);

if (crc32_src == crc32_dst)
{
Expand All @@ -522,7 +523,8 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
/* cleanup */
fclose(in);
fio_close(out);
fio_unlink(to_fullpath_part, FIO_BACKUP_HOST);
if (fio_remove(FIO_BACKUP_HOST, to_fullpath_part, false) != 0)
elog(WARNING, "Cannot remove temp WAL file \"%s\": %s", to_fullpath_part, strerror(errno));
return 1;
}
else
Expand All @@ -535,7 +537,8 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
/* Overwriting is forbidden,
* so we must unlink partial file and exit with error.
*/
fio_unlink(to_fullpath_part, FIO_BACKUP_HOST);
if (fio_remove(FIO_BACKUP_HOST, to_fullpath_part, false) != 0)
elog(WARNING, "Cannot remove temp WAL file \"%s\": %s", to_fullpath_part, strerror(errno));
elog(ERROR, "WAL file already exists in archive with "
"different checksum: \"%s\"", to_fullpath);
}
Expand All @@ -552,16 +555,20 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d

if (ferror(in))
{
fio_unlink(to_fullpath_part, FIO_BACKUP_HOST);
int save_errno = errno;
if (fio_remove(FIO_BACKUP_HOST, to_fullpath_part, false) != 0)
elog(WARNING, "Cannot remove temp WAL file \"%s\": %s", to_fullpath_part, strerror(errno));
elog(ERROR, "Cannot read source file \"%s\": %s",
from_fullpath, strerror(errno));
from_fullpath, strerror(save_errno));
}

if (read_len > 0 && fio_write_async(out, buf, read_len) != read_len)
{
fio_unlink(to_fullpath_part, FIO_BACKUP_HOST);
int save_errno = errno;
if (fio_remove(FIO_BACKUP_HOST, to_fullpath_part, false) != 0)
elog(WARNING, "Cannot cleanup temp WAL file \"%s\": %s", to_fullpath_part, strerror(errno));
elog(ERROR, "Cannot write to destination temp file \"%s\": %s",
to_fullpath_part, strerror(errno));
to_fullpath_part, strerror(save_errno));
}

if (feof(in))
Expand All @@ -574,23 +581,26 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
/* Writing is asynchronous in case of push in remote mode, so check agent status */
if (fio_check_error_fd(out, &errmsg))
{
fio_unlink(to_fullpath_part, FIO_BACKUP_HOST);
if (fio_remove(FIO_BACKUP_HOST, to_fullpath_part, false) != 0)
elog(WARNING, "Cannot cleanup temp WAL file \"%s\": %s", to_fullpath_part, strerror(errno));
elog(ERROR, "Cannot write to the remote file \"%s\": %s",
to_fullpath_part, errmsg);
}

/* close temp file */
if (fio_close(out) != 0)
{
fio_unlink(to_fullpath_part, FIO_BACKUP_HOST);
int save_errno = errno;
if (fio_remove(FIO_BACKUP_HOST, to_fullpath_part, false) != 0)
elog(WARNING, "Cannot cleanup temp WAL file \"%s\": %s", to_fullpath_part, strerror(errno));
elog(ERROR, "Cannot close temp WAL file \"%s\": %s",
to_fullpath_part, strerror(errno));
to_fullpath_part, strerror(save_errno));
}

/* sync temp file to disk */
if (!no_sync)
{
if (fio_sync(to_fullpath_part, FIO_BACKUP_HOST) != 0)
if (fio_sync(FIO_BACKUP_HOST, to_fullpath_part) != 0)
elog(ERROR, "Failed to sync file \"%s\": %s",
to_fullpath_part, strerror(errno));
}
Expand All @@ -600,11 +610,13 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
//copy_file_attributes(from_path, FIO_DB_HOST, to_path_temp, FIO_BACKUP_HOST, true);

/* Rename temp file to destination file */
if (fio_rename(to_fullpath_part, to_fullpath, FIO_BACKUP_HOST) < 0)
if (fio_rename(FIO_BACKUP_HOST, to_fullpath_part, to_fullpath) < 0)
{
fio_unlink(to_fullpath_part, FIO_BACKUP_HOST);
int save_errno = errno;
if (fio_remove(FIO_BACKUP_HOST, to_fullpath_part, false) != 0)
elog(WARNING, "Cannot cleanup temp WAL file \"%s\": %s", to_fullpath_part, strerror(errno));
elog(ERROR, "Cannot rename file \"%s\" to \"%s\": %s",
to_fullpath_part, to_fullpath, strerror(errno));
to_fullpath_part, to_fullpath, strerror(save_errno));
}

pg_free(buf);
Expand Down Expand Up @@ -663,7 +675,7 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
setvbuf(in, NULL, _IONBF, BUFSIZ);

/* Grab lock by creating temp file in exclusive mode */
out = fio_gzopen(to_fullpath_gz_part, PG_BINARY_W, compress_level, FIO_BACKUP_HOST);
out = fio_gzopen(FIO_BACKUP_HOST, to_fullpath_gz_part, PG_BINARY_W, compress_level);
if (out == NULL)
{
if (errno != EEXIST)
Expand All @@ -689,12 +701,12 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,

while (partial_try_count < archive_timeout)
{
if (fio_stat(to_fullpath_gz_part, &st, false, FIO_BACKUP_HOST) < 0)
if (fio_stat(FIO_BACKUP_HOST, to_fullpath_gz_part, &st, false) < 0)
{
if (errno == ENOENT)
{
//part file is gone, lets try to grab it
out = fio_gzopen(to_fullpath_gz_part, PG_BINARY_W, compress_level, FIO_BACKUP_HOST);
out = fio_gzopen(FIO_BACKUP_HOST, to_fullpath_gz_part, PG_BINARY_W, compress_level);
if (out == NULL)
{
if (errno != EEXIST)
Expand Down Expand Up @@ -743,9 +755,10 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,

/* Partial segment is considered stale, so reuse it */
elog(LOG, "Reusing stale temp WAL file \"%s\"", to_fullpath_gz_part);
fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST);
if (fio_remove(FIO_BACKUP_HOST, to_fullpath_gz_part, false) != 0)
elog(ERROR, "Cannot remove stale compressed temp WAL file \"%s\": %s", to_fullpath_gz_part, strerror(errno));

out = fio_gzopen(to_fullpath_gz_part, PG_BINARY_W, compress_level, FIO_BACKUP_HOST);
out = fio_gzopen(FIO_BACKUP_HOST, to_fullpath_gz_part, PG_BINARY_W, compress_level);
if (out == NULL)
elog(ERROR, "Cannot open temp WAL file \"%s\": %s",
to_fullpath_gz_part, strerror(errno));
Expand All @@ -761,8 +774,8 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
pg_crc32 crc32_dst;

/* TODO: what if one of them goes missing? */
crc32_src = fio_get_crc32(from_fullpath, FIO_DB_HOST, false);
crc32_dst = fio_get_crc32(to_fullpath_gz, FIO_BACKUP_HOST, true);
crc32_src = fio_get_crc32(FIO_DB_HOST, from_fullpath, false);
crc32_dst = fio_get_crc32(FIO_BACKUP_HOST, to_fullpath_gz, true);

if (crc32_src == crc32_dst)
{
Expand All @@ -771,7 +784,8 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
/* cleanup */
fclose(in);
fio_gzclose(out);
fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST);
if (fio_remove(FIO_BACKUP_HOST, to_fullpath_gz_part, false) != 0)
elog(WARNING, "Cannot remove compressed temp WAL file \"%s\": %s", to_fullpath_gz_part, strerror(errno));
return 1;
}
else
Expand All @@ -784,7 +798,8 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
/* Overwriting is forbidden,
* so we must unlink partial file and exit with error.
*/
fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST);
if (fio_remove(FIO_BACKUP_HOST, to_fullpath_gz_part, false) != 0)
elog(WARNING, "Cannot remove compressed temp WAL file \"%s\": %s", to_fullpath_gz_part, strerror(errno));
elog(ERROR, "WAL file already exists in archive with "
"different checksum: \"%s\"", to_fullpath_gz);
}
Expand All @@ -801,16 +816,20 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,

if (ferror(in))
{
fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST);
int save_errno = errno;
if (fio_remove(FIO_BACKUP_HOST, to_fullpath_gz_part, false) != 0)
elog(WARNING, "Cannot remove compressed temp WAL file \"%s\": %s", to_fullpath_gz_part, strerror(errno));
elog(ERROR, "Cannot read from source file \"%s\": %s",
from_fullpath, strerror(errno));
from_fullpath, strerror(save_errno));
}

if (read_len > 0 && fio_gzwrite(out, buf, read_len) != read_len)
{
fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST);
int save_errno = errno;
if (fio_remove(FIO_BACKUP_HOST, to_fullpath_gz_part, false) != 0)
elog(WARNING, "Cannot cleanup compressed temp WAL file \"%s\": %s", to_fullpath_gz_part, strerror(errno));
elog(ERROR, "Cannot write to compressed temp WAL file \"%s\": %s",
to_fullpath_gz_part, get_gz_error(out, errno));
to_fullpath_gz_part, get_gz_error(out, save_errno));
}

if (feof(in))
Expand All @@ -823,23 +842,26 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
/* Writing is asynchronous in case of push in remote mode, so check agent status */
if (fio_check_error_fd_gz(out, &errmsg))
{
fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST);
if (fio_remove(FIO_BACKUP_HOST, to_fullpath_gz_part, false) != 0)
elog(WARNING, "Cannot cleanup remote compressed temp WAL file \"%s\": %s", to_fullpath_gz_part, strerror(errno));
elog(ERROR, "Cannot write to the remote compressed file \"%s\": %s",
to_fullpath_gz_part, errmsg);
}

/* close temp file, TODO: make it synchronous */
if (fio_gzclose(out) != 0)
{
fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST);
int save_errno = errno;
if (fio_remove(FIO_BACKUP_HOST, to_fullpath_gz_part, false) != 0)
elog(WARNING, "Cannot cleanup compressed temp WAL file \"%s\": %s", to_fullpath_gz_part, strerror(errno));
elog(ERROR, "Cannot close compressed temp WAL file \"%s\": %s",
to_fullpath_gz_part, strerror(errno));
to_fullpath_gz_part, strerror(save_errno));
}

/* sync temp file to disk */
if (!no_sync)
{
if (fio_sync(to_fullpath_gz_part, FIO_BACKUP_HOST) != 0)
if (fio_sync(FIO_BACKUP_HOST, to_fullpath_gz_part) != 0)
elog(ERROR, "Failed to sync file \"%s\": %s",
to_fullpath_gz_part, strerror(errno));
}
Expand All @@ -850,11 +872,13 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
//copy_file_attributes(from_path, FIO_DB_HOST, to_path_temp, FIO_BACKUP_HOST, true);

/* Rename temp file to destination file */
if (fio_rename(to_fullpath_gz_part, to_fullpath_gz, FIO_BACKUP_HOST) < 0)
if (fio_rename(FIO_BACKUP_HOST, to_fullpath_gz_part, to_fullpath_gz) < 0)
{
fio_unlink(to_fullpath_gz_part, FIO_BACKUP_HOST);
int save_errno = errno;
if (fio_remove(FIO_BACKUP_HOST, to_fullpath_gz_part, false) != 0)
elog(WARNING, "Cannot cleanup compressed temp WAL file \"%s\": %s", to_fullpath_gz_part, strerror(errno));
elog(ERROR, "Cannot rename file \"%s\" to \"%s\": %s",
to_fullpath_gz_part, to_fullpath_gz, strerror(errno));
to_fullpath_gz_part, to_fullpath_gz, strerror(save_errno));
}

pg_free(buf);
Expand Down Expand Up @@ -889,15 +913,15 @@ get_gz_error(gzFile gzf, int errnum)
//{
// struct stat st;
//
// if (fio_stat(from_path, &st, true, from_location) == -1)
// if (fio_stat(from_location, from_path, &st, true) == -1)
// {
// if (unlink_on_error)
// fio_unlink(to_path, to_location);
// elog(ERROR, "Cannot stat file \"%s\": %s",
// from_path, strerror(errno));
// }
//
// if (fio_chmod(to_path, st.st_mode, to_location) == -1)
// if (fio_chmod(to_location, to_path, st.st_mode) == -1)
// {
// if (unlink_on_error)
// fio_unlink(to_path, to_location);
Expand Down
19 changes: 10 additions & 9 deletions src/backup.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* backup.c: backup DB cluster, archived WAL
*
* Portions Copyright (c) 2009-2013, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
* Portions Copyright (c) 2015-2019, Postgres Professional
* Portions Copyright (c) 2015-2022, Postgres Professional
*
*-------------------------------------------------------------------------
*/
Expand Down Expand Up @@ -136,7 +136,8 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn,
#if PG_VERSION_NUM >= 90600
current.tli = get_current_timeline(backup_conn);
#else
current.tli = get_current_timeline_from_control(instance_config.pgdata, FIO_DB_HOST, false);
/* PG-9.5 */
current.tli = get_current_timeline_from_control(FIO_DB_HOST, instance_config.pgdata, false);
#endif

/*
Expand Down Expand Up @@ -242,7 +243,7 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn,
if (current.backup_mode == BACKUP_MODE_DIFF_PAGE || !current.stream)
{
/* Check that archive_dir can be reached */
if (fio_access(instanceState->instance_wal_subdir_path, F_OK, FIO_BACKUP_HOST) != 0)
if (fio_access(FIO_BACKUP_HOST, instanceState->instance_wal_subdir_path, F_OK) != 0)
elog(ERROR, "WAL archive directory is not accessible \"%s\": %s",
instanceState->instance_wal_subdir_path, strerror(errno));

Expand All @@ -260,7 +261,7 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn,
char stream_xlog_path[MAXPGPATH];

join_path_components(stream_xlog_path, current.database_dir, PG_XLOG_DIR);
fio_mkdir(stream_xlog_path, DIR_PERMISSION, FIO_BACKUP_HOST);
fio_mkdir(FIO_BACKUP_HOST, stream_xlog_path, DIR_PERMISSION, false);

start_WAL_streaming(backup_conn, stream_xlog_path, &instance_config.conn_opt,
current.start_lsn, current.tli, true);
Expand Down Expand Up @@ -413,7 +414,7 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn,
join_path_components(dirpath, current.database_dir, file->rel_path);

elog(VERBOSE, "Create directory '%s'", dirpath);
fio_mkdir(dirpath, DIR_PERMISSION, FIO_BACKUP_HOST);
fio_mkdir(FIO_BACKUP_HOST, dirpath, DIR_PERMISSION, false);
}

}
Expand Down Expand Up @@ -528,7 +529,7 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn,
{
cleanup_header_map(&(current.hdr_map));

if (fio_sync(current.hdr_map.path, FIO_BACKUP_HOST) != 0)
if (fio_sync(FIO_BACKUP_HOST, current.hdr_map.path) != 0)
elog(ERROR, "Cannot sync file \"%s\": %s", current.hdr_map.path, strerror(errno));
}

Expand Down Expand Up @@ -587,7 +588,7 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn,
join_path_components(to_fullpath, external_dst, file->rel_path);
}

if (fio_sync(to_fullpath, FIO_BACKUP_HOST) != 0)
if (fio_sync(FIO_BACKUP_HOST, to_fullpath) != 0)
elog(ERROR, "Cannot sync file \"%s\": %s", to_fullpath, strerror(errno));
}

Expand Down Expand Up @@ -943,7 +944,7 @@ check_system_identifiers(PGconn *conn, const char *pgdata)
uint64 system_id_conn;
uint64 system_id_pgdata;

system_id_pgdata = get_system_identifier(pgdata, FIO_DB_HOST, false);
system_id_pgdata = get_system_identifier(FIO_DB_HOST, pgdata, false);
system_id_conn = get_remote_system_identifier(conn);

/* for checkdb check only system_id_pgdata and system_id_conn */
Expand Down Expand Up @@ -1788,7 +1789,7 @@ pg_stop_backup_write_file_helper(const char *path, const char *filename, const c
char full_filename[MAXPGPATH];

join_path_components(full_filename, path, filename);
fp = fio_fopen(full_filename, PG_BINARY_W, FIO_BACKUP_HOST);
fp = fio_fopen(FIO_BACKUP_HOST, full_filename, PG_BINARY_W);
if (fp == NULL)
elog(ERROR, "can't open %s file \"%s\": %s",
error_msg_filename, full_filename, strerror(errno));
Expand Down
Loading