Skip to content

Commit

Permalink
Add zstd compression support to Barman Cloud
Browse files Browse the repository at this point in the history
  • Loading branch information
Jellyfrog authored and gcalacoci committed Oct 28, 2024
1 parent 01e7230 commit 0536907
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 14 deletions.
7 changes: 7 additions & 0 deletions barman/clients/cloud_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,13 @@ def parse_arguments(args=None):
const="snappy",
dest="compression",
)
compression.add_argument(
"--zstd",
help="zstd-compress the backup while uploading to the cloud",
action="store_const",
const="zst",
dest="compression",
)
parser.add_argument(
"-h",
"--host",
Expand Down
71 changes: 65 additions & 6 deletions barman/clients/cloud_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ def _try_import_snappy():
return snappy


def _try_import_zstd():
try:
import zstandard
except ImportError:
raise SystemExit("Missing required python module: zstandard")
return zstandard


class ChunkedCompressor(with_metaclass(ABCMeta, object)):
"""
Base class for all ChunkedCompressors
Expand Down Expand Up @@ -92,20 +100,56 @@ def decompress(self, data):
return self.decompressor.decompress(data)


class ZstdCompressor(ChunkedCompressor):
"""
A ChunkedCompressor implementation based on zstandard
"""

def __init__(self):
zstd = _try_import_zstd()
self.compressor = zstd.ZstdCompressor()
self.decompressor = zstd.ZstdDecompressor().decompressobj(
read_across_frames=True
)

def add_chunk(self, data):
"""
Compresses the supplied data and returns all the compressed bytes.
:param bytes data: The chunk of data to be compressed
:return: The compressed data
:rtype: bytes
"""
return self.compressor.compress(data)

def decompress(self, data):
"""
Decompresses the supplied chunk of data and returns at least part of the
uncompressed data.
:param bytes data: The chunk of data to be decompressed
:return: The decompressed data
:rtype: bytes
"""
return self.decompressor.decompress(data)


def get_compressor(compression):
"""
Helper function which returns a ChunkedCompressor for the specified compression
algorithm. Currently only snappy is supported. The other compression algorithms
algorithm. Currently snappy and zstd is supported. The other compression algorithms
supported by barman cloud use the decompression built into TarFile.
:param str compression: The compression algorithm to use. Can be set to snappy
:param str compression: The compression algorithm to use. Can be set to snappy, zstd
or any compression supported by the TarFile mode string.
:return: A ChunkedCompressor capable of compressing and decompressing using the
specified compression.
:rtype: ChunkedCompressor
"""
if compression == "snappy":
return SnappyCompressor()
elif compression == "zstd" or compression == "zst":
return ZstdCompressor()
return None


Expand All @@ -115,7 +159,7 @@ def compress(wal_file, compression):
compressed data.
:param IOBase wal_file: A file-like object containing the WAL file data.
:param str compression: The compression algorithm to apply. Can be one of:
bzip2, gzip, snappy.
bzip2, gzip, snappy, zstd.
:return: The compressed data
:rtype: BytesIO
"""
Expand All @@ -125,6 +169,12 @@ def compress(wal_file, compression):
snappy.stream_compress(wal_file, in_mem_snappy)
in_mem_snappy.seek(0)
return in_mem_snappy
elif compression == "zstd":
in_mem_zstd = BytesIO()
zstd = _try_import_zstd()
zstd.ZstdCompressor().copy_stream(wal_file, in_mem_zstd)
in_mem_zstd.seek(0)
return in_mem_zstd
elif compression == "gzip":
# Create a BytesIO for in memory compression
in_mem_gzip = BytesIO()
Expand All @@ -150,12 +200,17 @@ def get_streaming_tar_mode(mode, compression):
ignored so that barman-cloud can apply them itself.
:param str mode: The file mode to use, either r or w.
:param str compression: The compression algorithm to use. Can be set to snappy
:param str compression: The compression algorithm to use. Can be set to snappy, zstd
or any compression supported by the TarFile mode string.
:return: The full filemode for a streaming tar file
:rtype: str
"""
if compression == "snappy" or compression is None:
if (
compression == "snappy"
or compression == "zstd"
or compression == "zst"
or compression is None
):
return "%s|" % mode
else:
return "%s|%s" % (mode, compression)
Expand All @@ -170,13 +225,17 @@ def decompress_to_file(blob, dest_file, compression):
:param IOBase dest_file: A file-like object into which the uncompressed data
should be written.
:param str compression: The compression algorithm to apply. Can be one of:
bzip2, gzip, snappy.
bzip2, gzip, snappy, zstd.
:rtype: None
"""
if compression == "snappy":
snappy = _try_import_snappy()
snappy.stream_decompress(blob, dest_file)
return
elif compression == "zstd":
zstd = _try_import_zstd()
zstd.ZstdDecompressor().copy_stream(blob, dest_file)
return
elif compression == "gzip":
source_file = gzip.GzipFile(fileobj=blob, mode="rb")
elif compression == "bzip2":
Expand Down
12 changes: 12 additions & 0 deletions barman/clients/cloud_walarchive.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ def parse_arguments(args=None):
const="snappy",
dest="compression",
)
compression.add_argument(
"--zstd",
help="zstd-compress the WAL while uploading to the cloud "
"(requires optional zstandard library)",
action="store_const",
const="zstd",
dest="compression",
)
add_tag_argument(
parser,
name="tags",
Expand Down Expand Up @@ -319,6 +327,10 @@ def retrieve_wal_name(self, wal_path):
elif self.compression == "snappy":
# add snappy extension
return "%s.snappy" % wal_name

elif self.compression == "zstd":
# add zstd extension
return "%s.zst" % wal_name
else:
raise ValueError("Unknown compression type: %s" % self.compression)

Expand Down
13 changes: 11 additions & 2 deletions barman/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@
LOGGING_FORMAT = "%(asctime)s [%(process)s] %(levelname)s: %(message)s"

# Allowed compression algorithms
ALLOWED_COMPRESSIONS = {".gz": "gzip", ".bz2": "bzip2", ".snappy": "snappy"}
ALLOWED_COMPRESSIONS = {
".gz": "gzip",
".bz2": "bzip2",
".snappy": "snappy",
".zst": "zstd",
}

DEFAULT_DELIMITER = "/"

Expand Down Expand Up @@ -193,7 +198,7 @@ def __init__(
self.buffer = None
self.counter = 0
self.compressor = None
# Some supported compressions (e.g. snappy) require CloudTarUploader to apply
# Some supported compressions (e.g. snappy, zstd) require CloudTarUploader to apply
# compression manually rather than relying on the tar file.
self.compressor = cloud_compression.get_compressor(compression)
# If the compression is supported by tar then it will be added to the filemode
Expand Down Expand Up @@ -366,6 +371,8 @@ def _build_dest_name(self, name, count=0):
components.append(".bz2")
elif self.compression == "snappy":
components.append(".snappy")
elif self.compression == "zst":
components.append(".zst")
return "".join(components)

def _get_tar(self, name):
Expand Down Expand Up @@ -2287,6 +2294,8 @@ def get_backup_files(self, backup_info, allow_missing=False):
info.compression = "bzip2"
elif ext == "tar.snappy":
info.compression = "snappy"
elif ext == "tar.zst":
info.compression = "zstd"
else:
logging.warning("Skipping unknown extension: %s", ext)
continue
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
"azure": ["azure-identity", "azure-storage-blob"],
"azure-snapshots": ["azure-identity", "azure-mgmt-compute"],
"cloud": ["boto3"],
"zstd": ["zstandard"],
"google": [
"google-cloud-storage",
],
Expand Down
1 change: 1 addition & 0 deletions tests/requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
.[cloud]
.[azure]
.[snappy]
.[zstd]
.[google]
pytest
mock
Expand Down
30 changes: 30 additions & 0 deletions tests/test_barman_cloud_wal_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging
import os
import snappy
import zstandard as zstd

import mock
import pytest
Expand Down Expand Up @@ -537,6 +538,23 @@ def test_retrieve_snappy_file_obj(self, tmpdir):
open_file.read()
) == "something".encode("utf-8")

def test_retrieve_zstd_file_obj(self, tmpdir):
"""
Test the retrieve_file_obj method with a zstd file
"""
# Setup the WAL
source = tmpdir.join("wal_dir/000000080000ABFF000000C1")
source.write("something".encode("utf-8"), ensure=True)
# Create a simple CloudWalUploader obj
uploader = CloudWalUploader(mock.MagicMock(), "test-server", compression="zstd")
open_file = uploader.retrieve_file_obj(source.strpath)
# Check the in memory file received
assert open_file
# Decompress on the fly to check content
assert zstd.ZstdDecompressor().decompressobj(
read_across_frames=True
).decompress(open_file.read()) == "something".encode("utf-8")

def test_retrieve_normal_file_name(self):
"""
Test the retrieve_wal_name method with an uncompressed file
Expand Down Expand Up @@ -589,6 +607,18 @@ def test_retrieve_snappy_file_name(self):
assert wal_final_name
assert wal_final_name == "000000080000ABFF000000C1.snappy"

def test_retrieve_zstd_file_name(self):
"""
Test the retrieve_wal_name method with zstd compression
"""
# Create a fake source name
source = "wal_dir/000000080000ABFF000000C1"
uploader = CloudWalUploader(mock.MagicMock(), "test-server", compression="zstd")
wal_final_name = uploader.retrieve_wal_name(source)
# Check the file name received
assert wal_final_name
assert wal_final_name == "000000080000ABFF000000C1.zst"

@mock.patch("barman.cloud.CloudInterface")
@mock.patch("barman.clients.cloud_walarchive.CloudWalUploader.retrieve_file_obj")
def test_upload_wal(self, rfo_mock, cloud_interface_mock):
Expand Down
37 changes: 31 additions & 6 deletions tests/test_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from mock.mock import MagicMock
import pytest
import snappy
import zstandard as zstd

from barman.exceptions import BackupPreconditionException
from barman.infofile import BackupInfo
Expand Down Expand Up @@ -98,6 +99,9 @@ def _compression_helper(src, compression):
if compression == "snappy":
dest = BytesIO()
snappy.stream_compress(src, dest)
elif compression == "zstd":
dest = BytesIO()
zstd.ZstdCompressor().copy_stream(src, dest)
elif compression == "gzip":
dest = BytesIO()
with gzip.GzipFile(fileobj=dest, mode="wb") as gz:
Expand Down Expand Up @@ -975,7 +979,7 @@ def test_delete_objects_partial_failure(self, boto_mock, caplog):
) in caplog.text

@pytest.mark.skipif(sys.version_info < (3, 0), reason="Requires Python 3 or higher")
@pytest.mark.parametrize("compression", (None, "bzip2", "gzip", "snappy"))
@pytest.mark.parametrize("compression", (None, "bzip2", "gzip", "snappy", "zstd"))
@mock.patch("barman.cloud_providers.aws_s3.boto3")
def test_download_file(self, boto_mock, compression, tmpdir):
"""Verifies that cloud_interface.download_file decompresses correctly."""
Expand Down Expand Up @@ -1007,7 +1011,13 @@ def test_download_file(self, boto_mock, compression, tmpdir):

@pytest.mark.parametrize(
("compression", "file_ext"),
((None, ""), ("bzip2", ".bz2"), ("gzip", ".gz"), ("snappy", ".snappy")),
(
(None, ""),
("bzip2", ".bz2"),
("gzip", ".gz"),
("snappy", ".snappy"),
("zstd", ".zst"),
),
)
@mock.patch("barman.cloud_providers.aws_s3.boto3")
def test_extract_tar(self, boto_mock, compression, file_ext, tmpdir):
Expand Down Expand Up @@ -2028,7 +2038,7 @@ def test_delete_objects_404_not_failure(self, container_client_mock, caplog):
) in caplog.text

@pytest.mark.skipif(sys.version_info < (3, 0), reason="Requires Python 3 or higher")
@pytest.mark.parametrize("compression", (None, "bzip2", "gzip", "snappy"))
@pytest.mark.parametrize("compression", (None, "bzip2", "gzip", "snappy", "zstd"))
@mock.patch.dict(
os.environ, {"AZURE_STORAGE_CONNECTION_STRING": "connection_string"}
)
Expand Down Expand Up @@ -2077,7 +2087,13 @@ def test_download_file(self, container_client_mock, compression, tmpdir):

@pytest.mark.parametrize(
("compression", "file_ext"),
((None, ""), ("bzip2", ".bz2"), ("gzip", ".gz"), ("snappy", ".snappy")),
(
(None, ""),
("bzip2", ".bz2"),
("gzip", ".gz"),
("snappy", ".snappy"),
("zstd", ".zst"),
),
)
@mock.patch.dict(
os.environ, {"AZURE_STORAGE_CONNECTION_STRING": "connection_string"}
Expand Down Expand Up @@ -2564,6 +2580,9 @@ def test_download_file(self, gcs_storage_mock, open_mock):
"snappy_compression": {
"compression": "snappy",
},
"zstd_compression": {
"compression": "zstd",
},
}
for test_name, test_case in test_cases.items():
with self.subTest(msg=test_name, compression=test_case["compression"]):
Expand Down Expand Up @@ -3008,7 +3027,7 @@ def _verify_wal_is_in_catalog(self, wal_name, wal_path):
suffix,
),
]
for suffix in ("", ".gz", ".bz2", ".snappy")
for suffix in ("", ".gz", ".bz2", ".snappy", ".zst")
]
for spec in spec_group
],
Expand Down Expand Up @@ -3347,7 +3366,7 @@ class TestCloudTarUploader(object):
"compression",
# The CloudTarUploader expects the short form compression args set by the
# cloud_backup argument parser
(None, "bz2", "gz", "snappy"),
(None, "bz2", "gz", "snappy", "zstd"),
)
@mock.patch("barman.cloud.CloudInterface")
def test_add(self, mock_cloud_interface, compression, tmpdir):
Expand Down Expand Up @@ -3393,6 +3412,12 @@ def test_add(self, mock_cloud_interface, compression, tmpdir):
tar_fileobj = BytesIO()
snappy.stream_decompress(uploaded_data, tar_fileobj)
tar_fileobj.seek(0)
elif compression == "zstd":
tar_mode = "r|"
# We must manually decompress the zstd bytes before extracting
tar_fileobj = BytesIO()
zstd.ZstdDecompressor().copy_stream(uploaded_data, tar_fileobj)
tar_fileobj.seek(0)
else:
tar_mode = "r|%s" % compression
with open_tar(fileobj=tar_fileobj, mode=tar_mode) as tf:
Expand Down

0 comments on commit 0536907

Please sign in to comment.