diff --git a/barman/clients/cloud_backup.py b/barman/clients/cloud_backup.py index 4470dcbb8..ff6f1ca95 100755 --- a/barman/clients/cloud_backup.py +++ b/barman/clients/cloud_backup.py @@ -395,6 +395,11 @@ def parse_arguments(args=None): help="The name of the AWS region containing the EC2 VM and storage volumes " "defined by the --snapshot-instance and --snapshot-disk arguments.", ) + s3_arguments.add_argument( + "--managed-transfer", + help="Use a boto3 managed transfer to upload the backup data", + action="store_true", + ) azure_arguments.add_argument( "--encryption-scope", help="The name of an encryption scope defined in the Azure Blob Storage " diff --git a/barman/cloud_providers/__init__.py b/barman/cloud_providers/__init__.py index 9e2279b52..b318af9f6 100644 --- a/barman/cloud_providers/__init__.py +++ b/barman/cloud_providers/__init__.py @@ -43,7 +43,11 @@ def _update_kwargs(kwargs, config, args): def _make_s3_cloud_interface(config, cloud_interface_kwargs): - from barman.cloud_providers.aws_s3 import S3CloudInterface + from barman.cloud_providers.aws_s3 import S3CloudInterface, S3CloudInterface2 + + s3_cls = S3CloudInterface + if "managed_transfer" in config and config.managed_transfer: + s3_cls = S3CloudInterface2 cloud_interface_kwargs.update( { @@ -64,7 +68,7 @@ def _make_s3_cloud_interface(config, cloud_interface_kwargs): 'Encryption type must be "aws:kms" if SSE KMS Key ID is specified' ) cloud_interface_kwargs["sse_kms_key_id"] = config.sse_kms_key_id - return S3CloudInterface(**cloud_interface_kwargs) + return s3_cls(**cloud_interface_kwargs) def _get_azure_credential(credential_type): diff --git a/barman/cloud_providers/aws_s3.py b/barman/cloud_providers/aws_s3.py index 2e0960879..f4a1aadb7 100644 --- a/barman/cloud_providers/aws_s3.py +++ b/barman/cloud_providers/aws_s3.py @@ -454,6 +454,134 @@ def delete_under_prefix(self, prefix): raise CloudProviderError() +class S3CloudInterface2(S3CloudInterface): + # This implementation uses boto3 managed transfers so we do not do our own chunking. + # A file is uploaded in a single chunk and boto3 will use the multipart API for us. + MAX_CHUNKS_PER_FILE = 1 + + # Since there is only on chunk min size is the same as max archive size + MIN_CHUNK_SIZE = 1 << 40 + + def __init__( + self, + url, + encryption=None, + jobs=2, + profile_name=None, + endpoint_url=None, + tags=None, + delete_batch_size=None, + read_timeout=None, + sse_kms_key_id=None, + ): + """ + Create a new S3 interface given the S3 destination url and the profile + name + + :param str url: Full URL of the cloud destination/source + :param str|None encryption: Encryption type string + :param int jobs: How many sub-processes to use for asynchronous + uploading, defaults to 2. + :param str profile_name: Amazon auth profile identifier + :param str endpoint_url: override default endpoint detection strategy + with this one + :param int|None delete_batch_size: the maximum number of objects to be + deleted in a single request + :param int|None read_timeout: the time in seconds until a timeout is + raised when waiting to read from a connection + :param str|None sse_kms_key_id: the AWS KMS key ID that should be used + for encrypting uploaded data in S3 + """ + super(S3CloudInterface2, self).__init__( + url=url, + encryption=encryption, + jobs=1, # TODO this be a hack + profile_name=profile_name, + endpoint_url=endpoint_url, + tags=tags, + delete_batch_size=delete_batch_size, + read_timeout=read_timeout, + sse_kms_key_id=sse_kms_key_id, + ) + + def create_multipart_upload(self, key): + """ + A noop because we will upload complete files via a single boto3 call. + + :param key: The key to use in the cloud service + :return: The multipart upload handle + :rtype: dict[str, str] + """ + return [] + + def upload_fileobj(self, fileobj, key, override_tags=None): + """ + Upload the content of a file-like object to a cloud key using a boto3 + managed transfer. + + :param fileobj IOBase: File-like object to upload + :param str key: The key to identify the uploaded object + :param List[tuple] override_tags: List of k,v tuples which should override any + tags already defined in the cloud interface + """ + extra_args = self._extra_upload_args.copy() + tags = override_tags or self.tags + if tags is not None: + extra_args["Tagging"] = urlencode(tags) + transfer_config = boto3.s3.transfer.TransferConfig( + multipart_chunksize=1, + multipart_threshold=1, + max_concurrency=10, + max_bandwidth=1048576, + ) + self.s3.meta.client.upload_fileobj( + Fileobj=fileobj, + Bucket=self.bucket_name, + Key=key, + Config=transfer_config, + ExtraArgs=extra_args, + ) + + def _upload_part(self, upload_metadata, key, body, part_number): + """ + Upload a complete file + + :param dict upload_metadata: The multipart upload handle + :param str key: The key to use in the cloud service + :param object body: A stream-like object to upload + :param int part_number: Part number, starting from 1 + :return: The part handle + :rtype: dict[str, None|str] + """ + self.upload_fileobj(body, key) + return { + "PartNumber": part_number, + } + + def _complete_multipart_upload(self, upload_metadata, key, parts): + """ + A noop because we will upload complete files via a single boto3 call. + + Finish a certain multipart upload + + :param dict upload_metadata: The multipart upload handle + :param str key: The key to use in the cloud service + :param parts: The list of parts composing the multipart upload + """ + pass + + def _abort_multipart_upload(self, upload_metadata, key): + """ + A noop because we will upload complete files via a single boto3 call. + + If a "multipart" upload needs aborting there is nothing to do. + + :param dict upload_metadata: The multipart upload handle + :param str key: The key to use in the cloud service + """ + pass + + class AwsCloudSnapshotInterface(CloudSnapshotInterface): """ Implementation of CloudSnapshotInterface for EBS snapshots as implemented in AWS