From 0536907e70753f6f55d7c5b5e050df476149fe25 Mon Sep 17 00:00:00 2001 From: Jellyfrog Date: Wed, 10 Jan 2024 02:42:46 +0100 Subject: [PATCH] Add zstd compression support to Barman Cloud --- barman/clients/cloud_backup.py | 7 +++ barman/clients/cloud_compression.py | 71 +++++++++++++++++++++++--- barman/clients/cloud_walarchive.py | 12 +++++ barman/cloud.py | 13 ++++- setup.py | 1 + tests/requirements_dev.txt | 1 + tests/test_barman_cloud_wal_archive.py | 30 +++++++++++ tests/test_cloud.py | 37 +++++++++++--- 8 files changed, 158 insertions(+), 14 deletions(-) diff --git a/barman/clients/cloud_backup.py b/barman/clients/cloud_backup.py index 7ec654b54..b7a5c0f16 100755 --- a/barman/clients/cloud_backup.py +++ b/barman/clients/cloud_backup.py @@ -279,6 +279,13 @@ def parse_arguments(args=None): const="snappy", dest="compression", ) + compression.add_argument( + "--zstd", + help="zstd-compress the backup while uploading to the cloud", + action="store_const", + const="zst", + dest="compression", + ) parser.add_argument( "-h", "--host", diff --git a/barman/clients/cloud_compression.py b/barman/clients/cloud_compression.py index 20fb97a4c..c17e428b3 100644 --- a/barman/clients/cloud_compression.py +++ b/barman/clients/cloud_compression.py @@ -33,6 +33,14 @@ def _try_import_snappy(): return snappy +def _try_import_zstd(): + try: + import zstandard + except ImportError: + raise SystemExit("Missing required python module: zstandard") + return zstandard + + class ChunkedCompressor(with_metaclass(ABCMeta, object)): """ Base class for all ChunkedCompressors @@ -92,13 +100,47 @@ def decompress(self, data): return self.decompressor.decompress(data) +class ZstdCompressor(ChunkedCompressor): + """ + A ChunkedCompressor implementation based on zstandard + """ + + def __init__(self): + zstd = _try_import_zstd() + self.compressor = zstd.ZstdCompressor() + self.decompressor = zstd.ZstdDecompressor().decompressobj( + read_across_frames=True + ) + + def add_chunk(self, data): + """ + Compresses the supplied data and returns all the compressed bytes. + + :param bytes data: The chunk of data to be compressed + :return: The compressed data + :rtype: bytes + """ + return self.compressor.compress(data) + + def decompress(self, data): + """ + Decompresses the supplied chunk of data and returns at least part of the + uncompressed data. + + :param bytes data: The chunk of data to be decompressed + :return: The decompressed data + :rtype: bytes + """ + return self.decompressor.decompress(data) + + def get_compressor(compression): """ Helper function which returns a ChunkedCompressor for the specified compression - algorithm. Currently only snappy is supported. The other compression algorithms + algorithm. Currently snappy and zstd is supported. The other compression algorithms supported by barman cloud use the decompression built into TarFile. - :param str compression: The compression algorithm to use. Can be set to snappy + :param str compression: The compression algorithm to use. Can be set to snappy, zstd or any compression supported by the TarFile mode string. :return: A ChunkedCompressor capable of compressing and decompressing using the specified compression. @@ -106,6 +148,8 @@ def get_compressor(compression): """ if compression == "snappy": return SnappyCompressor() + elif compression == "zstd" or compression == "zst": + return ZstdCompressor() return None @@ -115,7 +159,7 @@ def compress(wal_file, compression): compressed data. :param IOBase wal_file: A file-like object containing the WAL file data. :param str compression: The compression algorithm to apply. Can be one of: - bzip2, gzip, snappy. + bzip2, gzip, snappy, zstd. :return: The compressed data :rtype: BytesIO """ @@ -125,6 +169,12 @@ def compress(wal_file, compression): snappy.stream_compress(wal_file, in_mem_snappy) in_mem_snappy.seek(0) return in_mem_snappy + elif compression == "zstd": + in_mem_zstd = BytesIO() + zstd = _try_import_zstd() + zstd.ZstdCompressor().copy_stream(wal_file, in_mem_zstd) + in_mem_zstd.seek(0) + return in_mem_zstd elif compression == "gzip": # Create a BytesIO for in memory compression in_mem_gzip = BytesIO() @@ -150,12 +200,17 @@ def get_streaming_tar_mode(mode, compression): ignored so that barman-cloud can apply them itself. :param str mode: The file mode to use, either r or w. - :param str compression: The compression algorithm to use. Can be set to snappy + :param str compression: The compression algorithm to use. Can be set to snappy, zstd or any compression supported by the TarFile mode string. :return: The full filemode for a streaming tar file :rtype: str """ - if compression == "snappy" or compression is None: + if ( + compression == "snappy" + or compression == "zstd" + or compression == "zst" + or compression is None + ): return "%s|" % mode else: return "%s|%s" % (mode, compression) @@ -170,13 +225,17 @@ def decompress_to_file(blob, dest_file, compression): :param IOBase dest_file: A file-like object into which the uncompressed data should be written. :param str compression: The compression algorithm to apply. Can be one of: - bzip2, gzip, snappy. + bzip2, gzip, snappy, zstd. :rtype: None """ if compression == "snappy": snappy = _try_import_snappy() snappy.stream_decompress(blob, dest_file) return + elif compression == "zstd": + zstd = _try_import_zstd() + zstd.ZstdDecompressor().copy_stream(blob, dest_file) + return elif compression == "gzip": source_file = gzip.GzipFile(fileobj=blob, mode="rb") elif compression == "bzip2": diff --git a/barman/clients/cloud_walarchive.py b/barman/clients/cloud_walarchive.py index e69b00fef..4e140f6c9 100755 --- a/barman/clients/cloud_walarchive.py +++ b/barman/clients/cloud_walarchive.py @@ -155,6 +155,14 @@ def parse_arguments(args=None): const="snappy", dest="compression", ) + compression.add_argument( + "--zstd", + help="zstd-compress the WAL while uploading to the cloud " + "(requires optional zstandard library)", + action="store_const", + const="zstd", + dest="compression", + ) add_tag_argument( parser, name="tags", @@ -319,6 +327,10 @@ def retrieve_wal_name(self, wal_path): elif self.compression == "snappy": # add snappy extension return "%s.snappy" % wal_name + + elif self.compression == "zstd": + # add zstd extension + return "%s.zst" % wal_name else: raise ValueError("Unknown compression type: %s" % self.compression) diff --git a/barman/cloud.py b/barman/cloud.py index 2c4ac4506..d29e749bf 100644 --- a/barman/cloud.py +++ b/barman/cloud.py @@ -72,7 +72,12 @@ LOGGING_FORMAT = "%(asctime)s [%(process)s] %(levelname)s: %(message)s" # Allowed compression algorithms -ALLOWED_COMPRESSIONS = {".gz": "gzip", ".bz2": "bzip2", ".snappy": "snappy"} +ALLOWED_COMPRESSIONS = { + ".gz": "gzip", + ".bz2": "bzip2", + ".snappy": "snappy", + ".zst": "zstd", +} DEFAULT_DELIMITER = "/" @@ -193,7 +198,7 @@ def __init__( self.buffer = None self.counter = 0 self.compressor = None - # Some supported compressions (e.g. snappy) require CloudTarUploader to apply + # Some supported compressions (e.g. snappy, zstd) require CloudTarUploader to apply # compression manually rather than relying on the tar file. self.compressor = cloud_compression.get_compressor(compression) # If the compression is supported by tar then it will be added to the filemode @@ -366,6 +371,8 @@ def _build_dest_name(self, name, count=0): components.append(".bz2") elif self.compression == "snappy": components.append(".snappy") + elif self.compression == "zst": + components.append(".zst") return "".join(components) def _get_tar(self, name): @@ -2287,6 +2294,8 @@ def get_backup_files(self, backup_info, allow_missing=False): info.compression = "bzip2" elif ext == "tar.snappy": info.compression = "snappy" + elif ext == "tar.zst": + info.compression = "zstd" else: logging.warning("Skipping unknown extension: %s", ext) continue diff --git a/setup.py b/setup.py index 0da075f07..956af4c19 100755 --- a/setup.py +++ b/setup.py @@ -104,6 +104,7 @@ "azure": ["azure-identity", "azure-storage-blob"], "azure-snapshots": ["azure-identity", "azure-mgmt-compute"], "cloud": ["boto3"], + "zstd": ["zstandard"], "google": [ "google-cloud-storage", ], diff --git a/tests/requirements_dev.txt b/tests/requirements_dev.txt index bee6e3834..9740fb92d 100644 --- a/tests/requirements_dev.txt +++ b/tests/requirements_dev.txt @@ -2,6 +2,7 @@ .[cloud] .[azure] .[snappy] +.[zstd] .[google] pytest mock diff --git a/tests/test_barman_cloud_wal_archive.py b/tests/test_barman_cloud_wal_archive.py index aaecb9465..ed92f39e4 100644 --- a/tests/test_barman_cloud_wal_archive.py +++ b/tests/test_barman_cloud_wal_archive.py @@ -21,6 +21,7 @@ import logging import os import snappy +import zstandard as zstd import mock import pytest @@ -537,6 +538,23 @@ def test_retrieve_snappy_file_obj(self, tmpdir): open_file.read() ) == "something".encode("utf-8") + def test_retrieve_zstd_file_obj(self, tmpdir): + """ + Test the retrieve_file_obj method with a zstd file + """ + # Setup the WAL + source = tmpdir.join("wal_dir/000000080000ABFF000000C1") + source.write("something".encode("utf-8"), ensure=True) + # Create a simple CloudWalUploader obj + uploader = CloudWalUploader(mock.MagicMock(), "test-server", compression="zstd") + open_file = uploader.retrieve_file_obj(source.strpath) + # Check the in memory file received + assert open_file + # Decompress on the fly to check content + assert zstd.ZstdDecompressor().decompressobj( + read_across_frames=True + ).decompress(open_file.read()) == "something".encode("utf-8") + def test_retrieve_normal_file_name(self): """ Test the retrieve_wal_name method with an uncompressed file @@ -589,6 +607,18 @@ def test_retrieve_snappy_file_name(self): assert wal_final_name assert wal_final_name == "000000080000ABFF000000C1.snappy" + def test_retrieve_zstd_file_name(self): + """ + Test the retrieve_wal_name method with zstd compression + """ + # Create a fake source name + source = "wal_dir/000000080000ABFF000000C1" + uploader = CloudWalUploader(mock.MagicMock(), "test-server", compression="zstd") + wal_final_name = uploader.retrieve_wal_name(source) + # Check the file name received + assert wal_final_name + assert wal_final_name == "000000080000ABFF000000C1.zst" + @mock.patch("barman.cloud.CloudInterface") @mock.patch("barman.clients.cloud_walarchive.CloudWalUploader.retrieve_file_obj") def test_upload_wal(self, rfo_mock, cloud_interface_mock): diff --git a/tests/test_cloud.py b/tests/test_cloud.py index 48203ff6e..036ffee86 100644 --- a/tests/test_cloud.py +++ b/tests/test_cloud.py @@ -36,6 +36,7 @@ from mock.mock import MagicMock import pytest import snappy +import zstandard as zstd from barman.exceptions import BackupPreconditionException from barman.infofile import BackupInfo @@ -98,6 +99,9 @@ def _compression_helper(src, compression): if compression == "snappy": dest = BytesIO() snappy.stream_compress(src, dest) + elif compression == "zstd": + dest = BytesIO() + zstd.ZstdCompressor().copy_stream(src, dest) elif compression == "gzip": dest = BytesIO() with gzip.GzipFile(fileobj=dest, mode="wb") as gz: @@ -975,7 +979,7 @@ def test_delete_objects_partial_failure(self, boto_mock, caplog): ) in caplog.text @pytest.mark.skipif(sys.version_info < (3, 0), reason="Requires Python 3 or higher") - @pytest.mark.parametrize("compression", (None, "bzip2", "gzip", "snappy")) + @pytest.mark.parametrize("compression", (None, "bzip2", "gzip", "snappy", "zstd")) @mock.patch("barman.cloud_providers.aws_s3.boto3") def test_download_file(self, boto_mock, compression, tmpdir): """Verifies that cloud_interface.download_file decompresses correctly.""" @@ -1007,7 +1011,13 @@ def test_download_file(self, boto_mock, compression, tmpdir): @pytest.mark.parametrize( ("compression", "file_ext"), - ((None, ""), ("bzip2", ".bz2"), ("gzip", ".gz"), ("snappy", ".snappy")), + ( + (None, ""), + ("bzip2", ".bz2"), + ("gzip", ".gz"), + ("snappy", ".snappy"), + ("zstd", ".zst"), + ), ) @mock.patch("barman.cloud_providers.aws_s3.boto3") def test_extract_tar(self, boto_mock, compression, file_ext, tmpdir): @@ -2028,7 +2038,7 @@ def test_delete_objects_404_not_failure(self, container_client_mock, caplog): ) in caplog.text @pytest.mark.skipif(sys.version_info < (3, 0), reason="Requires Python 3 or higher") - @pytest.mark.parametrize("compression", (None, "bzip2", "gzip", "snappy")) + @pytest.mark.parametrize("compression", (None, "bzip2", "gzip", "snappy", "zstd")) @mock.patch.dict( os.environ, {"AZURE_STORAGE_CONNECTION_STRING": "connection_string"} ) @@ -2077,7 +2087,13 @@ def test_download_file(self, container_client_mock, compression, tmpdir): @pytest.mark.parametrize( ("compression", "file_ext"), - ((None, ""), ("bzip2", ".bz2"), ("gzip", ".gz"), ("snappy", ".snappy")), + ( + (None, ""), + ("bzip2", ".bz2"), + ("gzip", ".gz"), + ("snappy", ".snappy"), + ("zstd", ".zst"), + ), ) @mock.patch.dict( os.environ, {"AZURE_STORAGE_CONNECTION_STRING": "connection_string"} @@ -2564,6 +2580,9 @@ def test_download_file(self, gcs_storage_mock, open_mock): "snappy_compression": { "compression": "snappy", }, + "zstd_compression": { + "compression": "zstd", + }, } for test_name, test_case in test_cases.items(): with self.subTest(msg=test_name, compression=test_case["compression"]): @@ -3008,7 +3027,7 @@ def _verify_wal_is_in_catalog(self, wal_name, wal_path): suffix, ), ] - for suffix in ("", ".gz", ".bz2", ".snappy") + for suffix in ("", ".gz", ".bz2", ".snappy", ".zst") ] for spec in spec_group ], @@ -3347,7 +3366,7 @@ class TestCloudTarUploader(object): "compression", # The CloudTarUploader expects the short form compression args set by the # cloud_backup argument parser - (None, "bz2", "gz", "snappy"), + (None, "bz2", "gz", "snappy", "zstd"), ) @mock.patch("barman.cloud.CloudInterface") def test_add(self, mock_cloud_interface, compression, tmpdir): @@ -3393,6 +3412,12 @@ def test_add(self, mock_cloud_interface, compression, tmpdir): tar_fileobj = BytesIO() snappy.stream_decompress(uploaded_data, tar_fileobj) tar_fileobj.seek(0) + elif compression == "zstd": + tar_mode = "r|" + # We must manually decompress the zstd bytes before extracting + tar_fileobj = BytesIO() + zstd.ZstdDecompressor().copy_stream(uploaded_data, tar_fileobj) + tar_fileobj.seek(0) else: tar_mode = "r|%s" % compression with open_tar(fileobj=tar_fileobj, mode=tar_mode) as tf: