Skip to content

Commit

Permalink
A very hacky implementation of managed transfers for s3
Browse files Browse the repository at this point in the history
Adds a new option `--managed-transfer` to `barman-cloud-backup` which,
if used with the `aws-s3` cloud provider, causes the backup objects
to be uploaded in single calls to boto3 - boto3 then manages the
transfer itself, potentially using the multipart API if necessary.

This allows backup uploads to use a TransferConfig object which allows
things like bandwidth limits to be set. This commit adds such a
TransferConfig which hard codes a bandwidth limit of 1MB/s. Not
practically much use but we just want to see something working at
this point.

The implementation is by creating a specialisation of S3CloudInterface
where the `MAX_CHUNKS_PER_FILE` is set to `1` and the `MIN_CHUNK_SIZE`
is set to the `MAX_ARCHIVE_SIZE` value. This means each backup object
(e.g. data_0000.tar)is uploaded in a single call to _upload_part which
now just redirects to upload_fileobj. upload_fileobj is updated to
add a TransferConfig which could add various options supplied by
the user.

The remaining multipart methods are turned into noop methods which
just return whatever is expected by the existing backup upload code
that relies on CloudInterface.
  • Loading branch information
mikewallace1979 committed Aug 25, 2023
1 parent d8ae97c commit 392edcb
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 2 deletions.
5 changes: 5 additions & 0 deletions barman/clients/cloud_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,11 @@ def parse_arguments(args=None):
help="The name of the AWS region containing the EC2 VM and storage volumes "
"defined by the --snapshot-instance and --snapshot-disk arguments.",
)
s3_arguments.add_argument(
"--managed-transfer",
help="Use a boto3 managed transfer to upload the backup data",
action="store_true",
)
azure_arguments.add_argument(
"--encryption-scope",
help="The name of an encryption scope defined in the Azure Blob Storage "
Expand Down
8 changes: 6 additions & 2 deletions barman/cloud_providers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ def _update_kwargs(kwargs, config, args):


def _make_s3_cloud_interface(config, cloud_interface_kwargs):
from barman.cloud_providers.aws_s3 import S3CloudInterface
from barman.cloud_providers.aws_s3 import S3CloudInterface, S3CloudInterface2

s3_cls = S3CloudInterface
if "managed_transfer" in config and config.managed_transfer:
s3_cls = S3CloudInterface2

cloud_interface_kwargs.update(
{
Expand All @@ -64,7 +68,7 @@ def _make_s3_cloud_interface(config, cloud_interface_kwargs):
'Encryption type must be "aws:kms" if SSE KMS Key ID is specified'
)
cloud_interface_kwargs["sse_kms_key_id"] = config.sse_kms_key_id
return S3CloudInterface(**cloud_interface_kwargs)
return s3_cls(**cloud_interface_kwargs)


def _get_azure_credential(credential_type):
Expand Down
128 changes: 128 additions & 0 deletions barman/cloud_providers/aws_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,134 @@ def delete_under_prefix(self, prefix):
raise CloudProviderError()


class S3CloudInterface2(S3CloudInterface):
# This implementation uses boto3 managed transfers so we do not do our own chunking.
# A file is uploaded in a single chunk and boto3 will use the multipart API for us.
MAX_CHUNKS_PER_FILE = 1

# Since there is only on chunk min size is the same as max archive size
MIN_CHUNK_SIZE = 1 << 40

def __init__(
self,
url,
encryption=None,
jobs=2,
profile_name=None,
endpoint_url=None,
tags=None,
delete_batch_size=None,
read_timeout=None,
sse_kms_key_id=None,
):
"""
Create a new S3 interface given the S3 destination url and the profile
name
:param str url: Full URL of the cloud destination/source
:param str|None encryption: Encryption type string
:param int jobs: How many sub-processes to use for asynchronous
uploading, defaults to 2.
:param str profile_name: Amazon auth profile identifier
:param str endpoint_url: override default endpoint detection strategy
with this one
:param int|None delete_batch_size: the maximum number of objects to be
deleted in a single request
:param int|None read_timeout: the time in seconds until a timeout is
raised when waiting to read from a connection
:param str|None sse_kms_key_id: the AWS KMS key ID that should be used
for encrypting uploaded data in S3
"""
super(S3CloudInterface2, self).__init__(
url=url,
encryption=encryption,
jobs=1, # TODO this be a hack
profile_name=profile_name,
endpoint_url=endpoint_url,
tags=tags,
delete_batch_size=delete_batch_size,
read_timeout=read_timeout,
sse_kms_key_id=sse_kms_key_id,
)

def create_multipart_upload(self, key):
"""
A noop because we will upload complete files via a single boto3 call.
:param key: The key to use in the cloud service
:return: The multipart upload handle
:rtype: dict[str, str]
"""
return []

def upload_fileobj(self, fileobj, key, override_tags=None):
"""
Upload the content of a file-like object to a cloud key using a boto3
managed transfer.
:param fileobj IOBase: File-like object to upload
:param str key: The key to identify the uploaded object
:param List[tuple] override_tags: List of k,v tuples which should override any
tags already defined in the cloud interface
"""
extra_args = self._extra_upload_args.copy()
tags = override_tags or self.tags
if tags is not None:
extra_args["Tagging"] = urlencode(tags)
transfer_config = boto3.s3.transfer.TransferConfig(
multipart_chunksize=1,
multipart_threshold=1,
max_concurrency=10,
max_bandwidth=1048576,
)
self.s3.meta.client.upload_fileobj(
Fileobj=fileobj,
Bucket=self.bucket_name,
Key=key,
Config=transfer_config,
ExtraArgs=extra_args,
)

def _upload_part(self, upload_metadata, key, body, part_number):
"""
Upload a complete file
:param dict upload_metadata: The multipart upload handle
:param str key: The key to use in the cloud service
:param object body: A stream-like object to upload
:param int part_number: Part number, starting from 1
:return: The part handle
:rtype: dict[str, None|str]
"""
self.upload_fileobj(body, key)
return {
"PartNumber": part_number,
}

def _complete_multipart_upload(self, upload_metadata, key, parts):
"""
A noop because we will upload complete files via a single boto3 call.
Finish a certain multipart upload
:param dict upload_metadata: The multipart upload handle
:param str key: The key to use in the cloud service
:param parts: The list of parts composing the multipart upload
"""
pass

def _abort_multipart_upload(self, upload_metadata, key):
"""
A noop because we will upload complete files via a single boto3 call.
If a "multipart" upload needs aborting there is nothing to do.
:param dict upload_metadata: The multipart upload handle
:param str key: The key to use in the cloud service
"""
pass


class AwsCloudSnapshotInterface(CloudSnapshotInterface):
"""
Implementation of CloudSnapshotInterface for EBS snapshots as implemented in AWS
Expand Down

0 comments on commit 392edcb

Please sign in to comment.