Skip to content

Commit

Permalink
Update backup compression options that call pg_basebackup.
Browse files Browse the repository at this point in the history
add `backup_compression=none` to create an uncompressed archive
that only supports `backup_compression_level=0`.
for pg >= 15:
update `zstd` compression level range from `-131072` to `22`
update `lz4` compression level range from `0` to `12`

For PG < 15:
Allow using `backup_compression_level=0` with `backup_compression=gzip` algorithm.

In configuration 'none' can be used as value and not identified as None value.
  • Loading branch information
didiermichel committed Sep 26, 2023
1 parent cb42b10 commit 5811ec6
Show file tree
Hide file tree
Showing 7 changed files with 502 additions and 57 deletions.
13 changes: 10 additions & 3 deletions barman/command_wrappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1010,9 +1010,16 @@ def _get_compression_args(self, version, compression):
compression_args.append(compress_arg)
# For clients < 15 we use the old style argument format
else:
compression_args.append("--%s" % compression.config.type)
if compression.config.level:
compression_args.append("--compress=%d" % compression.config.level)
if compression.config.type == "none":
compression_args.append("--compress=0")
else:
if compression.config.level is not None:
compression_args.append(
"--compress=%d" % compression.config.level
)
# --gzip must be positioned after --compress when compression level=0
# so `base.tar.gz` can be created. Otherwise `.gz` won't be added.
compression_args.append("--%s" % compression.config.type)
return compression_args


Expand Down
173 changes: 147 additions & 26 deletions barman/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,36 +467,41 @@ def get_pg_basebackup_compression(server):
server.config.backup_compression_location,
server.config.backup_compression_workers,
)

base_backup_compression_option = None
compression = None
if server.config.backup_compression == GZipCompression.name:
# Create PgBaseBackupCompressionOption
base_backup_compression_option = GZipPgBaseBackupCompressionOption(
pg_base_backup_cfg
)
compression = GZipCompression(unix_command_factory())
return PgBaseBackupCompression(
pg_base_backup_cfg, base_backup_compression_option, compression
)

if server.config.backup_compression == LZ4Compression.name:
base_backup_compression_option = LZ4PgBaseBackupCompressionOption(
pg_base_backup_cfg
)
compression = LZ4Compression(unix_command_factory())
return PgBaseBackupCompression(
pg_base_backup_cfg, base_backup_compression_option, compression
)

if server.config.backup_compression == ZSTDCompression.name:
base_backup_compression_option = ZSTDPgBaseBackupCompressionOption(
pg_base_backup_cfg
)
compression = ZSTDCompression(unix_command_factory())
return PgBaseBackupCompression(
pg_base_backup_cfg, base_backup_compression_option, compression

if server.config.backup_compression == NoneCompression.name:
base_backup_compression_option = NonePgBaseBackupCompressionOption(
pg_base_backup_cfg
)
# We got to the point where the compression is not handled
raise CompressionException(
"Barman does not support pg_basebackup compression: %s"
% server.config.backup_compression
compression = NoneCompression(unix_command_factory())

if base_backup_compression_option is None or compression is None:
# We got to the point where the compression is not handled
raise CompressionException(
"Barman does not support pg_basebackup compression: %s"
% server.config.backup_compression
)
return PgBaseBackupCompression(
pg_base_backup_cfg, base_backup_compression_option, compression
)


Expand Down Expand Up @@ -574,13 +579,32 @@ def validate(self, pg_server_version, remote_status):
issues = super(GZipPgBaseBackupCompressionOption, self).validate(
pg_server_version, remote_status
)
if self.config.level is not None and (
self.config.level < 1 or self.config.level > 9
levels = list(range(1, 10))
levels.append(-1)
if self.config.level is not None and remote_status[
"pg_basebackup_version"
] < Version("15"):
# version prior to 15 allowed gzip compression 0
levels.append(0)
if self.config.level not in levels:
issues.append(
"backup_compression_level %d unsupported by compression algorithm."
" %s expects a compression level between -1 and 9 (-1 will use default compression level)."
% (self.config.level, self.config.type)
)
if (
self.config.level is not None
and remote_status["pg_basebackup_version"] >= Version("15")
and self.config.level not in levels
):
issues.append(
"backup_compression_level %d unsupported by "
"pg_basebackup compression %s" % (self.config.level, self.config.type)
msg = (
"backup_compression_level %d unsupported by compression algorithm."
" %s expects a compression level between 1 and 9 (-1 will use default compression level)."
% (self.config.level, self.config.type)
)
if self.config.level == 0:
msg += "\nIf you need to create an uncompress archive, you should set `backup_compression = none`."
issues.append(msg)
if self.config.workers is not None:
issues.append(
"backup_compression_workers is not compatible with compression %s"
Expand Down Expand Up @@ -610,15 +634,16 @@ def validate(self, pg_server_version, remote_status):
)

if self.config.level is not None and (
self.config.level < 1 or self.config.level > 12
self.config.level < 0 or self.config.level > 12
):
issues.append(
"backup_compression_level %d unsupported by "
"pg_basebackup compression %s" % (self.config.level, self.config.type)
"backup_compression_level %d unsupported by compression algorithm."
" %s expects a compression level between 1 and 12 (0 will use default compression level)."
% (self.config.level, self.config.type)
)
if self.config.workers is not None:
issues.append(
"backup_compression_workers is not compatible with compression %s"
"backup_compression_workers is not compatible with compression %s."
% self.config.type
)
return issues
Expand All @@ -644,21 +669,50 @@ def validate(self, pg_server_version, remote_status):
"pg_basebackup 15 or greater" % self.config.type
)

# Minimal config level comes from zstd library `STD_minCLevel()` and is
# commonly set to -131072.
if self.config.level is not None and (
self.config.level < 1 or self.config.level > 22
self.config.level < -131072 or self.config.level > 22
):
issues.append(
"backup_compression_level %d unsupported by "
"pg_basebackup compression %s" % (self.config.level, self.config.type)
"backup_compression_level %d unsupported by compression algorithm."
" '%s' expects a compression level between -131072 and 22 (3 will use default compression level)."
% (self.config.level, self.config.type)
)
if self.config.workers is not None and (
type(self.config.workers) is not int or self.config.workers < 0
):
issues.append(
"backup_compression_workers should be a positive integer: '%s' is invalid"
"backup_compression_workers should be a positive integer: '%s' is invalid."
% self.config.workers
)
return issues


class NonePgBaseBackupCompressionOption(PgBaseBackupCompressionOption):
def validate(self, pg_server_version, remote_status):
"""
Validate none compression specific options.
:param pg_server_version int: the server for which the
compression options should be validated.
:param dict remote_status: the status of the pg_basebackup command
:return List: List of Issues (str) or empty list
"""
issues = super(NonePgBaseBackupCompressionOption, self).validate(
pg_server_version, remote_status
)

if self.config.level is not None and (self.config.level != 0):
issues.append(
"backup_compression %s only supports backup_compression_level 0."
% self.config.type
)
if self.config.workers is not None:
issues.append(
"backup_compression_workers is not compatible with compression '%s'."
% self.config.type
)
return issues


Expand Down Expand Up @@ -972,3 +1026,70 @@ def get_file_content(self, filename, archive):
)
else:
return out


class NoneCompression(Compression):
name = "none"
file_extension = "tar"

def __init__(self, command):
"""
:param command: barman.fs.UnixLocalCommand
"""
self.command = command

def uncompress(self, src, dst, exclude=None, include_args=None):
"""
:param src: source file path without compression extension
:param dst: destination path
:param exclude: list of filepath in the archive to exclude from the extraction
:param include_args: list of filepath in the archive to extract.
:return:
"""
if src is None or src == "":
raise ValueError("Source path should be a string")
if dst is None or dst == "":
raise ValueError("Destination path should be a string")
exclude = [] if exclude is None else exclude
exclude_args = []
for name in exclude:
exclude_args.append("--exclude")
exclude_args.append(name)
include_args = [] if include_args is None else include_args
args = ["-xf", src, "--directory", dst]
args.extend(exclude_args)
args.extend(include_args)
ret = self.command.cmd("tar", args=args)
out, err = self.command.get_last_output()
if ret != 0:
raise CommandFailedException(
"Error decompressing %s into %s: %s" % (src, dst, err)
)
else:
return self.command.get_last_output()

def get_file_content(self, filename, archive):
"""
:param filename: str file to search for in the archive (requires its full path within the archive)
:param archive: str archive path/name without extension
:return: string content
"""
full_archive_name = "%s.%s" % (archive, self.file_extension)
args = ["-xf", full_archive_name, "-O", filename, "--occurrence"]
ret = self.command.cmd("tar", args=args)
out, err = self.command.get_last_output()
if ret != 0:
if "Not found in archive" in err:
raise FileNotFoundException(
err + "archive name: %s" % full_archive_name
)
else:
raise CommandFailedException(
"Error reading %s into archive %s: (%s)"
% (filename, full_archive_name, err)
)
else:
return out
8 changes: 3 additions & 5 deletions barman/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
CREATE_SLOT_VALUES = ["manual", "auto"]

# Config values relating to pg_basebackup compression
BASEBACKUP_COMPRESSIONS = ["gzip", "lz4", "zstd"]
BASEBACKUP_COMPRESSIONS = ["gzip", "lz4", "zstd", "none"]


class CsvOption(set):
Expand Down Expand Up @@ -294,8 +294,6 @@ def parse_backup_compression(value):
"""
Parse a string to a valid backup_compression value.
Valid values are contained in compression.basebackup_compressions list
:param str value: backup_compression value
:raises ValueError: if the value is invalid
"""
Expand All @@ -304,7 +302,7 @@ def parse_backup_compression(value):
if value.lower() in BASEBACKUP_COMPRESSIONS:
return value.lower()
raise ValueError(
"Invalid value (must be one in: '%s')" % ("', '".join(BASEBACKUP_COMPRESSIONS))
"Invalid value '%s'(must be one in: %s)" % (value, BASEBACKUP_COMPRESSIONS)
)


Expand Down Expand Up @@ -862,7 +860,7 @@ def get(self, section, option, defaults=None, none_value=None):
return None
try:
value = self._config.get(section, option, raw=False, vars=defaults)
if value.lower() == "none":
if value == "None":
value = none_value
if value is not None:
value = self._QUOTE_RE.sub(lambda m: m.group(2), value)
Expand Down
10 changes: 8 additions & 2 deletions barman/recovery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@
RecoveryPreconditionException,
SnapshotBackupException,
)
from barman.compression import GZipCompression, LZ4Compression, ZSTDCompression
from barman.compression import (
GZipCompression,
LZ4Compression,
ZSTDCompression,
NoneCompression,
)
import barman.fs as fs
from barman.infofile import BackupInfo, LocalBackupInfo
from barman.utils import force_str, mkpath
Expand Down Expand Up @@ -1803,7 +1808,8 @@ def recovery_executor_factory(backup_manager, command, backup_info):
return TarballRecoveryExecutor(backup_manager, LZ4Compression(command))
if compression == ZSTDCompression.name:
return TarballRecoveryExecutor(backup_manager, ZSTDCompression(command))

if compression == NoneCompression.name:
return TarballRecoveryExecutor(backup_manager, NoneCompression(command))
raise AttributeError("Unexpected compression format: %s" % compression)


Expand Down
Loading

0 comments on commit 5811ec6

Please sign in to comment.