Skip to content

Commit

Permalink
A very hacky implementation of managed transfers for s3
Browse files Browse the repository at this point in the history
  • Loading branch information
mikewallace1979 committed Aug 25, 2023
1 parent d8ae97c commit 8bd0ebb
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 2 deletions.
5 changes: 5 additions & 0 deletions barman/clients/cloud_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,11 @@ def parse_arguments(args=None):
help="The name of the AWS region containing the EC2 VM and storage volumes "
"defined by the --snapshot-instance and --snapshot-disk arguments.",
)
s3_arguments.add_argument(
"--managed-transfer",
help="Use a boto3 managed transfer to upload the backup data",
action="store_true",
)
azure_arguments.add_argument(
"--encryption-scope",
help="The name of an encryption scope defined in the Azure Blob Storage "
Expand Down
8 changes: 6 additions & 2 deletions barman/cloud_providers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ def _update_kwargs(kwargs, config, args):


def _make_s3_cloud_interface(config, cloud_interface_kwargs):
from barman.cloud_providers.aws_s3 import S3CloudInterface
from barman.cloud_providers.aws_s3 import S3CloudInterface, S3CloudInterface2

s3_cls = S3CloudInterface
if "managed_transfer" in config and config.managed_transfer:
s3_cls = S3CloudInterface2

cloud_interface_kwargs.update(
{
Expand All @@ -64,7 +68,7 @@ def _make_s3_cloud_interface(config, cloud_interface_kwargs):
'Encryption type must be "aws:kms" if SSE KMS Key ID is specified'
)
cloud_interface_kwargs["sse_kms_key_id"] = config.sse_kms_key_id
return S3CloudInterface(**cloud_interface_kwargs)
return s3_cls(**cloud_interface_kwargs)


def _get_azure_credential(credential_type):
Expand Down
128 changes: 128 additions & 0 deletions barman/cloud_providers/aws_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,134 @@ def delete_under_prefix(self, prefix):
raise CloudProviderError()


class S3CloudInterface2(S3CloudInterface):
# This implementation uses boto3 managed transfers so we do not do our own chunking.
# A file is uploaded in a single chunk and boto3 will use the multipart API for us.
MAX_CHUNKS_PER_FILE = 1

# Since there is only on chunk min size is the same as max archive size
MIN_CHUNK_SIZE = 1 << 40

def __init__(
self,
url,
encryption=None,
jobs=2,
profile_name=None,
endpoint_url=None,
tags=None,
delete_batch_size=None,
read_timeout=None,
sse_kms_key_id=None,
):
"""
Create a new S3 interface given the S3 destination url and the profile
name
:param str url: Full URL of the cloud destination/source
:param str|None encryption: Encryption type string
:param int jobs: How many sub-processes to use for asynchronous
uploading, defaults to 2.
:param str profile_name: Amazon auth profile identifier
:param str endpoint_url: override default endpoint detection strategy
with this one
:param int|None delete_batch_size: the maximum number of objects to be
deleted in a single request
:param int|None read_timeout: the time in seconds until a timeout is
raised when waiting to read from a connection
:param str|None sse_kms_key_id: the AWS KMS key ID that should be used
for encrypting uploaded data in S3
"""
super(S3CloudInterface2, self).__init__(
url=url,
encryption=encryption,
jobs=1, # TODO this be a hack
profile_name=profile_name,
endpoint_url=endpoint_url,
tags=tags,
delete_batch_size=delete_batch_size,
read_timeout=read_timeout,
sse_kms_key_id=sse_kms_key_id,
)

def create_multipart_upload(self, key):
"""
A noop because we will upload complete files via a single boto3 call.
:param key: The key to use in the cloud service
:return: The multipart upload handle
:rtype: dict[str, str]
"""
return []

def upload_fileobj(self, fileobj, key, override_tags=None):
"""
Upload the content of a file-like object to a cloud key using a boto3
managed transfer.
:param fileobj IOBase: File-like object to upload
:param str key: The key to identify the uploaded object
:param List[tuple] override_tags: List of k,v tuples which should override any
tags already defined in the cloud interface
"""
extra_args = self._extra_upload_args.copy()
tags = override_tags or self.tags
if tags is not None:
extra_args["Tagging"] = urlencode(tags)
transfer_config = boto3.s3.transfer.TransferConfig(
multipart_chunksize=1,
multipart_threshold=1,
max_concurrency=10,
max_bandwidth=1048576,
)
self.s3.meta.client.upload_fileobj(
Fileobj=fileobj,
Bucket=self.bucket_name,
Key=key,
Config=transfer_config,
ExtraArgs=extra_args,
)

def _upload_part(self, upload_metadata, key, body, part_number):
"""
Upload a complete file
:param dict upload_metadata: The multipart upload handle
:param str key: The key to use in the cloud service
:param object body: A stream-like object to upload
:param int part_number: Part number, starting from 1
:return: The part handle
:rtype: dict[str, None|str]
"""
self.upload_fileobj(body, key)
return {
"PartNumber": part_number,
}

def _complete_multipart_upload(self, upload_metadata, key, parts):
"""
A noop because we will upload complete files via a single boto3 call.
Finish a certain multipart upload
:param dict upload_metadata: The multipart upload handle
:param str key: The key to use in the cloud service
:param parts: The list of parts composing the multipart upload
"""
pass

def _abort_multipart_upload(self, upload_metadata, key):
"""
A noop because we will upload complete files via a single boto3 call.
If a "multipart" upload needs aborting there is nothing to do.
:param dict upload_metadata: The multipart upload handle
:param str key: The key to use in the cloud service
"""
pass


class AwsCloudSnapshotInterface(CloudSnapshotInterface):
"""
Implementation of CloudSnapshotInterface for EBS snapshots as implemented in AWS
Expand Down

0 comments on commit 8bd0ebb

Please sign in to comment.