Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Issue #449] subdirectories for WAL archive #450

Draft
wants to merge 11 commits into
base: release_2_6
Choose a base branch
from
187 changes: 170 additions & 17 deletions src/archive.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@

static int push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_dir,
const char *archive_dir, bool overwrite, bool no_sync,
uint32 archive_timeout);
uint32 archive_timeout, xlogFileType type);
#ifdef HAVE_LIBZ
static int push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
const char *archive_dir, bool overwrite, bool no_sync,
int compress_level, uint32 archive_timeout);
const char *archive_dir, bool overwrite, bool no_sync,
int compress_level, uint32 archive_timeout, xlogFileType type);
#endif
static void *push_files(void *arg);
static void *get_files(void *arg);
static bool
get_wal_file_wrapper(const char *filename, const char *archive_root_dir,
const char *to_fullpath, bool prefetch_mode);
static bool get_wal_file(const char *filename, const char *from_path, const char *to_path,
bool prefetch_mode);
static int get_wal_file_internal(const char *from_path, const char *to_path, FILE *out,
Expand Down Expand Up @@ -89,8 +92,9 @@ typedef struct

typedef struct WALSegno
{
char name[MAXFNAMELEN];
volatile pg_atomic_flag lock;
char name[MAXFNAMELEN];
volatile pg_atomic_flag lock;
xlogFileType type;
} WALSegno;

static int push_file(WALSegno *xlogfile, const char *archive_status_dir,
Expand All @@ -101,6 +105,29 @@ static int push_file(WALSegno *xlogfile, const char *archive_status_dir,

static parray *setup_push_filelist(const char *archive_status_dir,
const char *first_file, int batch_size);
static parray *setup_archive_subdirs(parray *batch_files, const char *archive_dir);

static xlogFileType
get_xlogFileType(const char *filename)
{

if IsXLogFileName(filename)
return SEGMENT;

else if IsPartialXLogFileName(filename)
return PARTIAL_SEGMENT;

else if IsBackupHistoryFileName(filename)
return BACKUP_HISTORY_FILE;

else if IsTLHistoryFileName(filename)
return HISTORY_FILE;

else if IsBackupHistoryFileName(filename)
return BACKUP_HISTORY_FILE;

return UNKNOWN;
}

/*
* At this point, we already done one roundtrip to archive server
Expand Down Expand Up @@ -137,6 +164,7 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *pg

/* files to push in multi-thread mode */
parray *batch_files = NULL;
parray *archive_subdirs = NULL;
int n_threads;

if (!no_ready_rename || batch_size > 1)
Expand All @@ -160,6 +188,20 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *pg
parray_num(batch_files), batch_size,
is_compress ? "zlib" : "none");

/* Extract subdirectories */
archive_subdirs = setup_archive_subdirs(batch_files, instanceState->instance_wal_subdir_path);
if (archive_subdirs)
{
for (i = 0; i < parray_num(archive_subdirs); i++)
{
char *subdir = (char *) parray_get(archive_subdirs, i);
if (fio_mkdir(subdir, DIR_PERMISSION, FIO_BACKUP_HOST) != 0)
elog(ERROR, "Cannot create subdirectory in WAL archive: '%s'", subdir);
pg_free(subdir);
}
parray_free(archive_subdirs);
}

num_threads = n_threads;

/* Single-thread push
Expand Down Expand Up @@ -339,12 +381,12 @@ push_file(WALSegno *xlogfile, const char *archive_status_dir,
if (!is_compress)
rc = push_file_internal_uncompressed(xlogfile->name, pg_xlog_dir,
archive_dir, overwrite, no_sync,
archive_timeout);
archive_timeout, xlogfile->type);
#ifdef HAVE_LIBZ
else
rc = push_file_internal_gz(xlogfile->name, pg_xlog_dir, archive_dir,
overwrite, no_sync, compress_level,
archive_timeout);
archive_timeout, xlogfile->type);
#endif

/* take '--no-ready-rename' flag into account */
Expand Down Expand Up @@ -383,13 +425,14 @@ push_file(WALSegno *xlogfile, const char *archive_status_dir,
int
push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_dir,
const char *archive_dir, bool overwrite, bool no_sync,
uint32 archive_timeout)
uint32 archive_timeout, xlogFileType type)
{
FILE *in = NULL;
int out = -1;
char *buf = pgut_malloc(OUT_BUF_SIZE); /* 1MB buffer */
char from_fullpath[MAXPGPATH];
char to_fullpath[MAXPGPATH];
char archive_subdir[MAXPGPATH];
/* partial handling */
struct stat st;
char to_fullpath_part[MAXPGPATH];
Expand All @@ -402,8 +445,12 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
/* from path */
join_path_components(from_fullpath, pg_xlog_dir, wal_file_name);
canonicalize_path(from_fullpath);

/* calculate subdir in WAL archive */
get_archive_subdir(archive_subdir, archive_dir, wal_file_name, type);

/* to path */
join_path_components(to_fullpath, archive_dir, wal_file_name);
join_path_components(to_fullpath, archive_subdir, wal_file_name);
canonicalize_path(to_fullpath);

/* Open source file for read */
Expand Down Expand Up @@ -622,14 +669,15 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
int
push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
const char *archive_dir, bool overwrite, bool no_sync,
int compress_level, uint32 archive_timeout)
int compress_level, uint32 archive_timeout, xlogFileType type)
{
FILE *in = NULL;
gzFile out = NULL;
char *buf = pgut_malloc(OUT_BUF_SIZE);
char from_fullpath[MAXPGPATH];
char to_fullpath[MAXPGPATH];
char to_fullpath_gz[MAXPGPATH];
char archive_subdir[MAXPGPATH];

/* partial handling */
struct stat st;
Expand All @@ -644,8 +692,12 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
/* from path */
join_path_components(from_fullpath, pg_xlog_dir, wal_file_name);
canonicalize_path(from_fullpath);

/* calculate subdir in WAL archive */
get_archive_subdir(archive_subdir, archive_dir, wal_file_name, type);

/* to path */
join_path_components(to_fullpath, archive_dir, wal_file_name);
join_path_components(to_fullpath, archive_subdir, wal_file_name);
canonicalize_path(to_fullpath);

/* destination file with .gz suffix */
Expand Down Expand Up @@ -915,15 +967,17 @@ setup_push_filelist(const char *archive_status_dir, const char *first_file,
{
int i;
WALSegno *xlogfile = NULL;
parray *status_files = NULL;
parray *batch_files = parray_new();
parray *status_files = NULL;
parray *batch_files = parray_new();

/* guarantee that first filename is in batch list */
xlogfile = palloc(sizeof(WALSegno));
pg_atomic_init_flag(&xlogfile->lock);
snprintf(xlogfile->name, MAXFNAMELEN, "%s", first_file);
parray_append(batch_files, xlogfile);

xlogfile->type = get_xlogFileType(xlogfile->name);

if (batch_size < 2)
return batch_files;

Expand Down Expand Up @@ -955,6 +1009,8 @@ setup_push_filelist(const char *archive_status_dir, const char *first_file,
pg_atomic_init_flag(&xlogfile->lock);

snprintf(xlogfile->name, MAXFNAMELEN, "%s", filename);

xlogfile->type = get_xlogFileType(xlogfile->name);
parray_append(batch_files, xlogfile);

if (parray_num(batch_files) >= batch_size)
Expand Down Expand Up @@ -1023,7 +1079,7 @@ do_archive_get(InstanceState *instanceState, InstanceConfig *instance, const cha

/* full filepath to WAL file in archive directory.
* $BACKUP_PATH/wal/instance_name/000000010000000000000001 */
join_path_components(backup_wal_file_path, instanceState->instance_wal_subdir_path, wal_file_name);
//join_path_components(backup_wal_file_path, instanceState->instance_wal_subdir_path, wal_file_name);

INSTR_TIME_SET_CURRENT(start_time);
if (num_threads > batch_size)
Expand Down Expand Up @@ -1152,7 +1208,7 @@ do_archive_get(InstanceState *instanceState, InstanceConfig *instance, const cha

while (fail_count < 3)
{
if (get_wal_file(wal_file_name, backup_wal_file_path, absolute_wal_file_path, false))
if (get_wal_file_wrapper(wal_file_name, instanceState->instance_wal_subdir_path, absolute_wal_file_path, false))
{
fail_count = 0;
elog(INFO, "pg_probackup archive-get copied WAL file %s", wal_file_name);
Expand Down Expand Up @@ -1235,7 +1291,7 @@ uint32 run_wal_prefetch(const char *prefetch_dir, const char *archive_dir,
/* It is ok, maybe requested batch is greater than the number of available
* files in the archive
*/
if (!get_wal_file(xlogfile->name, from_fullpath, to_fullpath, true))
if (!get_wal_file_wrapper(xlogfile->name, archive_dir, to_fullpath, true))
{
elog(LOG, "Thread [%d]: Failed to prefetch WAL segment %s", 0, xlogfile->name);
break;
Expand Down Expand Up @@ -1309,7 +1365,7 @@ get_files(void *arg)
join_path_components(from_fullpath, args->archive_dir, xlogfile->name);
join_path_components(to_fullpath, args->prefetch_dir, xlogfile->name);

if (!get_wal_file(xlogfile->name, from_fullpath, to_fullpath, true))
if (!get_wal_file_wrapper(xlogfile->name, args->archive_dir, to_fullpath, true))
{
/* It is ok, maybe requested batch is greater than the number of available
* files in the archive
Expand All @@ -1328,6 +1384,38 @@ get_files(void *arg)
return NULL;
}

/*
* First we try to copy from WAL archive subdirectory:
* Failing that, try WAL archive root directory
*/
bool
get_wal_file_wrapper(const char *filename, const char *archive_root_dir,
const char *to_fullpath, bool prefetch_mode)
{
bool success = false;
char archive_subdir[MAXPGPATH];
char from_fullpath[MAXPGPATH];
xlogFileType type = get_xlogFileType(filename);

if (type == SEGMENT || type == PARTIAL_SEGMENT || type == BACKUP_HISTORY_FILE)
{
/* first try subdir ... */
get_archive_subdir(archive_subdir, archive_root_dir, filename, type);
join_path_components(from_fullpath, archive_subdir, filename);

success = get_wal_file(filename, from_fullpath, to_fullpath, prefetch_mode);
}

if (!success)
{
/* ... fallback to archive dir for backward compatibility purposes */
join_path_components(from_fullpath, archive_root_dir, filename);
success = get_wal_file(filename, from_fullpath, to_fullpath, prefetch_mode);
}

return success;
}

/*
* Copy WAL segment from archive catalog to pgdata with possible decompression.
* When running in prefetch mode, we should not error out.
Expand Down Expand Up @@ -1730,3 +1818,68 @@ uint32 maintain_prefetch(const char *prefetch_dir, XLogSegNo first_segno, uint32

return n_files;
}

/* Calculate subdir path in WAL archive directory. Example:
* 000000010000000200000013 -> 00000002
*/
void
get_archive_subdir(char *archive_subdir, const char *archive_dir, const char *wal_file_name, xlogFileType type)
{
if (type == SEGMENT || type == PARTIAL_SEGMENT || type == BACKUP_HISTORY_FILE)
{
int rc = 0;
char tli[MAXFNAMELEN];
char log[MAXFNAMELEN];
char suffix[MAXFNAMELEN];

rc = sscanf(wal_file_name, "%08s%08s%s",
(char *) &tli, (char *) &log, (char *) &suffix);

if (rc == 3)
{
join_path_components(archive_subdir, archive_dir, log);
return;
}
}

/* for all other files just use root directory of WAL archive */
strcpy(archive_subdir, archive_dir);
}

/* Extract array of WAL archive subdirs using push filelist */
parray*
setup_archive_subdirs(parray *batch_files, const char *archive_dir)
{
int i;
parray *subdirs = NULL;
char *cur_subdir = NULL;

/*
* - Do we need to sort batch_files?
* - No, we rely on sorting of status files
*/

for (i = 0; i < parray_num(batch_files); i++)
{
WALSegno *xlogfile = (WALSegno *) parray_get(batch_files, i);

if (xlogfile->type == SEGMENT || xlogfile->type == PARTIAL_SEGMENT || xlogfile->type == BACKUP_HISTORY_FILE)
{
char subdir[MAXPGPATH];

if (!subdirs)
subdirs = parray_new();

get_archive_subdir(subdir, archive_dir, xlogfile->name, xlogfile->type);

/* do not append the same subdir twice */
if (cur_subdir && strcmp(cur_subdir, subdir) == 0)
continue;

cur_subdir = pgut_strdup(subdir);
parray_append(subdirs, cur_subdir);
}
}

return subdirs;
}
Loading