Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BAR-113: Add --min-chunk-size option to barman-cloud-backup #849

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions barman/clients/cloud_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def main(args=None):
"server_name": config.server_name,
"compression": config.compression,
"max_archive_size": config.max_archive_size,
"min_chunk_size": config.min_chunk_size,
"cloud_interface": cloud_interface,
}
if __is_hook_script():
Expand Down Expand Up @@ -313,6 +314,14 @@ def parse_arguments(args=None):
"(default: 100GB)",
default="100GB",
)
parser.add_argument(
"--min-chunk-size",
type=check_size,
help="minimum size of an individual chunk when uploading to cloud storage "
"(default: 5MB for aws-s3, 64KB for azure-blob-storage, not applicable for "
"google-cloud-storage)",
default=None, # Defer to the cloud interface if nothing is specified
)
parser.add_argument(
"-d",
"--dbname",
Expand Down
41 changes: 31 additions & 10 deletions barman/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class CloudTarUploader(object):
NamedTemporaryFile, delete=False, prefix="barman-upload-", suffix=".part"
)

def __init__(self, cloud_interface, key, compression=None, chunk_size=None):
def __init__(self, cloud_interface, key, chunk_size, compression=None):
"""
A tar archive that resides on cloud storage

Expand All @@ -182,11 +182,8 @@ def __init__(self, cloud_interface, key, compression=None, chunk_size=None):
"""
self.cloud_interface = cloud_interface
self.key = key
self.chunk_size = chunk_size
self.upload_metadata = None
if chunk_size is None:
self.chunk_size = cloud_interface.MIN_CHUNK_SIZE
else:
self.chunk_size = max(chunk_size, cloud_interface.MIN_CHUNK_SIZE)
self.buffer = None
self.counter = 0
self.compressor = None
Expand Down Expand Up @@ -253,14 +250,22 @@ def close(self):


class CloudUploadController(object):
def __init__(self, cloud_interface, key_prefix, max_archive_size, compression):
def __init__(
self,
cloud_interface,
key_prefix,
max_archive_size,
compression,
min_chunk_size=None,
):
"""
Create a new controller that upload the backup in cloud storage

:param CloudInterface cloud_interface: cloud interface instance
:param str|None key_prefix: path inside the bucket
:param int max_archive_size: the maximum size of an archive
:param str|None compression: required compression
:param int|None min_chunk_size: the minimum size of a single upload part
"""

self.cloud_interface = cloud_interface
Expand All @@ -275,10 +280,19 @@ def __init__(self, cloud_interface, key_prefix, max_archive_size, compression):
pretty_size(self.cloud_interface.MAX_ARCHIVE_SIZE),
)
self.max_archive_size = self.cloud_interface.MAX_ARCHIVE_SIZE
# We aim to a maximum of MAX_CHUNKS_PER_FILE / 2 chinks per file
self.chunk_size = 2 * int(
# We aim to a maximum of MAX_CHUNKS_PER_FILE / 2 chunks per file
calculated_chunk_size = 2 * int(
max_archive_size / self.cloud_interface.MAX_CHUNKS_PER_FILE
)
# Use whichever is higher - the calculated chunk_size, the requested
# min_chunk_size or the cloud interface MIN_CHUNK_SIZE.
possible_min_chunk_sizes = [
calculated_chunk_size,
cloud_interface.MIN_CHUNK_SIZE,
]
if min_chunk_size is not None:
possible_min_chunk_sizes.append(min_chunk_size)
self.chunk_size = max(possible_min_chunk_sizes)
self.compression = compression
self.tar_list = {}

Expand Down Expand Up @@ -322,8 +336,8 @@ def _get_tar(self, name):
CloudTarUploader(
cloud_interface=self.cloud_interface,
key=os.path.join(self.key_prefix, self._build_dest_name(name)),
compression=self.compression,
chunk_size=self.chunk_size,
compression=self.compression,
)
]
# If the current uploading file size is over DEFAULT_MAX_TAR_SIZE
Expand All @@ -337,8 +351,8 @@ def _get_tar(self, name):
self.key_prefix,
self._build_dest_name(name, len(self.tar_list[name])),
),
compression=self.compression,
chunk_size=self.chunk_size,
compression=self.compression,
)
self.tar_list[name].append(uploader)
return uploader.tar
Expand Down Expand Up @@ -1469,6 +1483,7 @@ def __init__(
postgres,
compression=None,
backup_name=None,
min_chunk_size=None,
):
"""
Base constructor.
Expand All @@ -1482,6 +1497,7 @@ def __init__(
:param str compression: Compression algorithm to use
:param str|None backup_name: A friendly name which can be used to reference
this backup in the future.
:param int min_chunk_size: the minimum size of a single upload part
"""
super(CloudBackupUploader, self).__init__(
server_name,
Expand All @@ -1492,6 +1508,7 @@ def __init__(

self.compression = compression
self.max_archive_size = max_archive_size
self.min_chunk_size = min_chunk_size

# Object properties set at backup time
self.controller = None
Expand Down Expand Up @@ -1531,6 +1548,7 @@ def _create_upload_controller(self, backup_id):
key_prefix,
self.max_archive_size,
self.compression,
self.min_chunk_size,
)

def _backup_data_files(
Expand Down Expand Up @@ -1726,6 +1744,7 @@ def __init__(
backup_dir,
backup_id,
compression=None,
min_chunk_size=None,
):
"""
Create the cloud storage upload client for a backup in the specified
Expand All @@ -1739,13 +1758,15 @@ def __init__(
be uploaded
:param str backup_id: The id of the backup to upload
:param str compression: Compression algorithm to use
:param int min_chunk_size: the minimum size of a single upload part
"""
super(CloudBackupUploaderBarman, self).__init__(
server_name,
cloud_interface,
max_archive_size,
compression=compression,
postgres=None,
min_chunk_size=None,
)
self.backup_dir = backup_dir
self.backup_id = backup_id
Expand Down
35 changes: 31 additions & 4 deletions tests/test_barman_cloud_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ class TestCloudBackup(object):
Test that we get the intended behaviour when called directly
"""

@pytest.mark.parametrize(
("barman_cloud_args", "expected_max_archive_size", "expected_min_chunk_size"),
(
([], 100 << 30, None),
(["--max-archive-size=10GB"], 10 << 30, None),
(["--min-chunk-size=50MB"], 100 << 30, 50 << 20),
(["--max-archive-size=10GB", "--min-chunk-size=50MB"], 10 << 30, 50 << 20),
),
)
@mock.patch.dict(
os.environ, {"AZURE_STORAGE_CONNECTION_STRING": "connection_string"}
)
Expand All @@ -46,17 +55,21 @@ def test_uses_postgres_backup_uploader(
postgres_connection,
_rmtree_mock,
_tempfile_mock,
barman_cloud_args,
expected_max_archive_size,
expected_min_chunk_size,
):
uploader = uploader_mock.return_value
cloud_backup.main(["cloud_storage_url", "test_server"])
cloud_backup.main(["cloud_storage_url", "test_server"] + barman_cloud_args)
postgres_connection.assert_called_once()
cloud_interface_mock.assert_called_once()
uploader_mock.assert_called_once_with(
server_name="test_server",
compression=None,
backup_name=None,
postgres=postgres_connection.return_value,
max_archive_size=107374182400,
max_archive_size=expected_max_archive_size,
min_chunk_size=expected_min_chunk_size,
cloud_interface=cloud_interface_mock.return_value,
)
uploader.backup.assert_called_once()
Expand Down Expand Up @@ -410,6 +423,15 @@ class TestCloudBackupHookScript(object):
Test that we get the intended behaviour when called as a hook script
"""

@pytest.mark.parametrize(
("barman_cloud_args", "expected_max_archive_size", "expected_min_chunk_size"),
(
([], 100 << 30, None),
(["--max-archive-size=10GB"], 10 << 30, None),
(["--min-chunk-size=50MB"], 100 << 30, 50 << 20),
(["--max-archive-size=10GB", "--min-chunk-size=50MB"], 10 << 30, 50 << 20),
),
)
@mock.patch.dict(
os.environ,
{
Expand All @@ -429,14 +451,18 @@ def test_uses_barman_backup_uploader_when_running_as_hook(
cloud_interface_mock,
rmtree_mock,
tempfile_mock,
barman_cloud_args,
expected_max_archive_size,
expected_min_chunk_size,
):
uploader = uploader_mock.return_value
cloud_backup.main(["cloud_storage_url", "test_server"])
cloud_backup.main(["cloud_storage_url", "test_server"] + barman_cloud_args)
cloud_interface_mock.assert_called_once()
uploader_mock.assert_called_once_with(
server_name="test_server",
compression=None,
max_archive_size=107374182400,
max_archive_size=expected_max_archive_size,
min_chunk_size=expected_min_chunk_size,
cloud_interface=cloud_interface_mock.return_value,
backup_dir=EXAMPLE_BACKUP_DIR,
backup_id=EXAMPLE_BACKUP_ID,
Expand Down Expand Up @@ -470,6 +496,7 @@ def test_uses_barman_backup_uploader_when_running_as_retry_hook(
server_name="test_server",
compression=None,
max_archive_size=107374182400,
min_chunk_size=None,
cloud_interface=cloud_interface_mock.return_value,
backup_dir=EXAMPLE_BACKUP_DIR,
backup_id=EXAMPLE_BACKUP_ID,
Expand Down
93 changes: 91 additions & 2 deletions tests/test_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
CloudBackupUploaderBarman,
CloudProviderError,
CloudTarUploader,
CloudUploadController,
CloudUploadingError,
FileUploadStatistics,
DEFAULT_DELIMITER,
Expand Down Expand Up @@ -3352,15 +3353,17 @@ def test_add(self, mock_cloud_interface, compression, tmpdir):
the bytes passed to async_upload_part represent a valid tar file.
"""
# GIVEN a cloud interface
mock_cloud_interface.MIN_CHUNK_SIZE = 5 << 20
# AND a source directory containing one file
src_file = "arbitrary_file_name"
content = "arbitrary strong representing file content"
key = "arbitrary/path/in/the/cloud"
with open(os.path.join(str(tmpdir), src_file), "w") as f:
f.write(content)
# AND a CloudTarUploader using the configured compression
uploader = CloudTarUploader(mock_cloud_interface, key, compression=compression)
chunk_size = 5 << 20
uploader = CloudTarUploader(
mock_cloud_interface, key, chunk_size=chunk_size, compression=compression
)

# WHEN the file is added to the tar uploader
uploader.tar.add(
Expand Down Expand Up @@ -3394,6 +3397,92 @@ def test_add(self, mock_cloud_interface, compression, tmpdir):
tf.extractall(path=dest_path)
with open(os.path.join(dest_path, src_file), "r") as result:
assert result.read() == content
# AND the supplied chunk_size was set
assert uploader.chunk_size == chunk_size


class TestCloudUploadController(object):
"""Tests for the CloudUploadController class."""

@pytest.mark.parametrize(
("max_archive_size_arg", "max_archive_size_property"),
((100, 1000), (100, 1000)),
)
@mock.patch("barman.cloud.CloudInterface")
def test_init_max_archive_size(
self, mock_cloud_interface, max_archive_size_arg, max_archive_size_property
):
"""Test creation of CloudUploadController with max_archive_size values."""
# GIVEN a mock cloud interface with the specified MAX_ARCHIVE_SIZE value
# and an arbitrary MIN_CHUNK_SIZE value
mock_cloud_interface.MAX_ARCHIVE_SIZE = max_archive_size_property
mock_cloud_interface.MIN_CHUNK_SIZE = 5 << 20

# WHEN a CloudUploadController is created with the requested max_archive_size
controller = CloudUploadController(
mock_cloud_interface, "prefix", max_archive_size_arg, None
)

# THEN the max_archive_size is set to the lower of requested max_archive_size
# and the cloud interface MAX_ARCHIVE_SIZE
assert controller.max_archive_size == min(
max_archive_size_arg, max_archive_size_property
)

@pytest.mark.parametrize(
(
"min_chunk_size_arg",
"min_chunk_size_property",
"max_archive_size",
"expected_chunk_size",
),
(
# When the supplied min_chunk_size is larger than
# CloudInterface.MIN_CHUNK_SIZE and larger than the chunk size calculated
# from max_archive_size and CloudInterface.MAX_CHUNKS_PER_FILE then we
# expect CloudUploadController.chunk_size to be min_chunk_size.
(10 << 20, 5 << 20, 1 << 30, 10 << 20),
# When CloudInterface.MIN_CHUNK_SIZE is larger than the supplied
# min_chunk_size and larger than the chunk size calculated from
# max_archive_size and CloudInterface.MAX_CHUNKS_PER_FILE then we
# expect CloudUploadController.chunk_size to be
# CloudInterface.MIN_CHUNK_SIZE.
(5 << 20, 10 << 20, 1 << 30, 10 << 20),
# When the chunk size calculated from max_archive_size and
# CloudInterface.MAX_CHUNKS_PER_FILE is larger than the supplied
# min_chunk_size and CloudInterface.MIN_CHUNK_SIZE then we
# expect CloudUploadController.chunk_size to be the calculated
# value.
(5 << 10, 5 << 10, 1 << 30, 214748),
),
)
@mock.patch("barman.cloud.CloudInterface")
def test_init_min_chunk_size(
self,
mock_cloud_interface,
min_chunk_size_arg,
min_chunk_size_property,
max_archive_size,
expected_chunk_size,
):
"""Test creation of CloudUploadController with max_archive_size values."""
# GIVEN a CloudInterface with a specified MIN_CHUNK_SIZE and MAX_ARCHIVE_SIZE
# and a fixed MAX_CHUNKS_PER_FILE value of 10000
mock_cloud_interface.MIN_CHUNK_SIZE = min_chunk_size_property
mock_cloud_interface.MAX_ARCHIVE_SIZE = max_archive_size
mock_cloud_interface.MAX_CHUNKS_PER_FILE = 10000

# WHEN a CloudUploadController is created with the requested min_chunk_size
controller = CloudUploadController(
mock_cloud_interface,
"prefix",
max_archive_size,
None,
min_chunk_size=min_chunk_size_arg,
)

# THEN the chunk_size is set to the expected value
assert controller.chunk_size == expected_chunk_size


class TestCloudBackupUploader(object):
Expand Down
Loading