From ed8abc3f91131791c8ff6694e838faa5cbd6a2bd Mon Sep 17 00:00:00 2001 From: Carolina Ayumi Matumoto Date: Mon, 26 Apr 2021 16:10:44 -0300 Subject: [PATCH 1/3] [DLB-1204] - Adding multipart upload to aioradio s3 functions --- HISTORY.rst | 5 +++ aioradio/aws/s3.py | 83 +++++++++++++++++++++++++++++++++++++++ aioradio/tests/s3_test.py | 29 +++++++++++++- setup.py | 2 +- 4 files changed, 117 insertions(+), 2 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index f752b8d..a09bd22 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,11 @@ History ======= +v0.13.5 (2021-04-22) +----------------------- + +* Add functions (create_multipart_upload, upload_part, complete_multipart_upload, abort_multipart_upload) to s3 multupart upload. + v0.13.4 (2021-04-22) ----------------------- diff --git a/aioradio/aws/s3.py b/aioradio/aws/s3.py index f51ccb1..f4da8e8 100644 --- a/aioradio/aws/s3.py +++ b/aioradio/aws/s3.py @@ -137,3 +137,86 @@ async def delete_s3_object(bucket: str, s3_prefix: str) -> Dict[str, Any]: response = await S3['client']['obj'].delete_object(Bucket=bucket, Key=s3_prefix) return response + +@AWS_SERVICE.active +async def create_multipart_upload(bucket: str, s3_key: str) -> Dict[str, Any]: + """Create multipart upload from s3. + + Args: + bucket (str): s3 bucket + s3_key (str): s3 prefix + + Returns: + Dict[str, Any]: response of operation + """ + + response = await S3['client']['obj'].create_multipart_upload(Bucket=bucket, Key=s3_key) + + return response + +@AWS_SERVICE.active +async def upload_part(bucket: str, s3_key: str, part: str, part_number: int, upload_id: str) -> Dict[str, Any]: + """Upload a single part to s3. + + Args: + bucket (str): s3 bucket + s3_key (str): s3 prefix + part (str): the content to be uploaded + part_number (int): part number + upload_id (str): multipart upload Id + + Returns: + Dict[str, Any]: response of operation + """ + + response = await S3['client']['obj'].upload_part( + Bucket=bucket, + Key=s3_key, + Body=part, + PartNumber=part_number, + UploadId=upload_id) + + return response + +@AWS_SERVICE.active +async def complete_multipart_upload(bucket: str, s3_key: str, parts: str, upload_id: str) -> Dict[str, Any]: + """Complete multipart upload to s3. + + Args: + bucket (str): s3 bucket + s3_key (str): s3 prefix + parts (str): array with all parts info + upload_id (str): multipart upload Id + + Returns: + Dict[str, Any]: response of operation + """ + + response = await S3['client']['obj'].complete_multipart_upload( + Bucket=bucket, + Key=s3_key, + UploadId=upload_id, + MultipartUpload={'Parts': parts}) + + return response + +@AWS_SERVICE.active +async def abort_multipart_upload(bucket: str, s3_key: str, upload_id: str) -> Dict[str, Any]: + """Complete multipart upload to s3. + + Args: + bucket (str): s3 bucket + s3_key (str): s3 prefix + upload_id (str): multipart upload Id + + Returns: + Dict[str, Any]: response of operation + """ + + response = await S3['client']['obj'].abort_multipart_upload( + Bucket=bucket, + Key=s3_key, + UploadId=upload_id, + MultipartUpload={'Parts': parts}) + + return response diff --git a/aioradio/tests/s3_test.py b/aioradio/tests/s3_test.py index e5a4969..97c7b62 100644 --- a/aioradio/tests/s3_test.py +++ b/aioradio/tests/s3_test.py @@ -8,7 +8,8 @@ from aioradio.aws.s3 import (delete_s3_object, download_file, get_object, get_s3_file_attributes, list_s3_objects, - upload_file) + upload_file, create_multipart_upload, upload_part, + complete_multipart_upload, abort_multipart_upload) LOG = logging.getLogger(__name__) @@ -76,3 +77,29 @@ async def test_get_file_attributes(): result = await get_s3_file_attributes(bucket=S3_BUCKET, s3_key=f'{S3_PREFIX}/hello_world.txt') assert result['ContentLength'] == 22 + +async def test_multipart_upload(): + """"Test a success case of multipart upload""" + + filename = 'hello_world.txt' + s3_key = f'{S3_PREFIX}/{filename}' + multipart_upload = await create_multipart_upload(bucket=S3_BUCKET s3_key=s3_key) + upload_id = multipart_upload["UploadId"] + assert upload_id is not None + + part_number=1 + part_result = await upload_part( + bucket=S3_BUCKET, + s3_key=s3_key, + part='Hello World/n', + part_number=part_number, + uploadId=upload_id) + + parts = [{'ETag': part['ETag'], 'PartNumber': part_number}] + + await complete_multipart_upload( + bucket=S3_BUCKET, + s3_key=s3_key, + parts=parts, + upload_id=upload_id + ) diff --git a/setup.py b/setup.py index 9549ac8..c981715 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ long_description = fileobj.read() setup(name='aioradio', - version='0.13.4', + version='0.13.5', description='Generic asynchronous i/o python utilities for AWS services (SQS, S3, DynamoDB, Secrets Manager), Redis, MSSQL (pyodbc), JIRA and more', long_description=long_description, long_description_content_type="text/markdown", From 0b8a2a70f9a9aeee90a9b359071a87828c9a02c3 Mon Sep 17 00:00:00 2001 From: Carolina Ayumi Matumoto Date: Tue, 27 Apr 2021 11:15:45 -0300 Subject: [PATCH 2/3] [DLB-1204] - Add S3 tests --- HISTORY.rst | 4 +-- aioradio/aws/s3.py | 29 +++++++++++++++---- aioradio/tests/s3_test.py | 59 +++++++++++++++++++++++++++++++++------ 3 files changed, 76 insertions(+), 16 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index a09bd22..702eae7 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,10 +3,10 @@ History ======= -v0.13.5 (2021-04-22) +v0.13.5 (2021-04-27) ----------------------- -* Add functions (create_multipart_upload, upload_part, complete_multipart_upload, abort_multipart_upload) to s3 multupart upload. +* Add functions (create_multipart_upload, upload_part, complete_multipart_upload, abort_multipart_upload, list_parts) to s3 multupart upload. v0.13.4 (2021-04-22) ----------------------- diff --git a/aioradio/aws/s3.py b/aioradio/aws/s3.py index f4da8e8..0373a10 100644 --- a/aioradio/aws/s3.py +++ b/aioradio/aws/s3.py @@ -179,13 +179,33 @@ async def upload_part(bucket: str, s3_key: str, part: str, part_number: int, upl return response @AWS_SERVICE.active -async def complete_multipart_upload(bucket: str, s3_key: str, parts: str, upload_id: str) -> Dict[str, Any]: +async def list_parts(bucket: str, s3_key: str, upload_id: str) -> Dict[str, Any]: + """List parts already uploaded to s3. + + Args: + bucket (str): s3 bucket + s3_key (str): s3 prefix + upload_id (str): multipart upload Id + + Returns: + Dict[str, Any]: response of operation + """ + + response = await S3['client']['obj'].list_parts( + Bucket=bucket, + Key=s3_key, + UploadId=upload_id) + + return response + +@AWS_SERVICE.active +async def complete_multipart_upload(bucket: str, s3_key: str, parts: List, upload_id: str) -> Dict[str, Any]: """Complete multipart upload to s3. Args: bucket (str): s3 bucket s3_key (str): s3 prefix - parts (str): array with all parts info + parts (List): all parts info upload_id (str): multipart upload Id Returns: @@ -202,7 +222,7 @@ async def complete_multipart_upload(bucket: str, s3_key: str, parts: str, upload @AWS_SERVICE.active async def abort_multipart_upload(bucket: str, s3_key: str, upload_id: str) -> Dict[str, Any]: - """Complete multipart upload to s3. + """Abort multipart upload. Args: bucket (str): s3 bucket @@ -216,7 +236,6 @@ async def abort_multipart_upload(bucket: str, s3_key: str, upload_id: str) -> Di response = await S3['client']['obj'].abort_multipart_upload( Bucket=bucket, Key=s3_key, - UploadId=upload_id, - MultipartUpload={'Parts': parts}) + UploadId=upload_id) return response diff --git a/aioradio/tests/s3_test.py b/aioradio/tests/s3_test.py index 97c7b62..3adc8e3 100644 --- a/aioradio/tests/s3_test.py +++ b/aioradio/tests/s3_test.py @@ -6,10 +6,11 @@ import pytest -from aioradio.aws.s3 import (delete_s3_object, download_file, get_object, - get_s3_file_attributes, list_s3_objects, - upload_file, create_multipart_upload, upload_part, - complete_multipart_upload, abort_multipart_upload) +from aioradio.aws.s3 import (abort_multipart_upload, complete_multipart_upload, + create_multipart_upload, delete_s3_object, + download_file, get_object, get_s3_file_attributes, + list_parts, list_s3_objects, upload_file, + upload_part) LOG = logging.getLogger(__name__) @@ -79,27 +80,67 @@ async def test_get_file_attributes(): assert result['ContentLength'] == 22 async def test_multipart_upload(): - """"Test a success case of multipart upload""" + """"Test a success case of multipart upload.""" - filename = 'hello_world.txt' + filename = 'multipart_upload_hello_world.txt' s3_key = f'{S3_PREFIX}/{filename}' - multipart_upload = await create_multipart_upload(bucket=S3_BUCKET s3_key=s3_key) + + # First delete the file from s3 if it exists and verify it is gone from s3 + await delete_s3_object(bucket=S3_BUCKET, s3_prefix=s3_key) + assert s3_key not in await list_s3_objects(bucket=S3_BUCKET, s3_prefix=S3_PREFIX) + + # Create the multipart upload + multipart_upload = await create_multipart_upload(bucket=S3_BUCKET, s3_key=s3_key) upload_id = multipart_upload["UploadId"] assert upload_id is not None + # Upload a part part_number=1 part_result = await upload_part( bucket=S3_BUCKET, s3_key=s3_key, part='Hello World/n', part_number=part_number, - uploadId=upload_id) + upload_id=upload_id) + parts = [{'ETag': part_result['ETag'], 'PartNumber': part_number}] - parts = [{'ETag': part['ETag'], 'PartNumber': part_number}] + # Verify if the part was listed + uploaded_parts = await list_parts( + bucket=S3_BUCKET, + s3_key=s3_key, + upload_id=upload_id) + assert len(uploaded_parts['Parts'])==part_number + # If everythin looks fine, complete the multipart upload await complete_multipart_upload( bucket=S3_BUCKET, s3_key=s3_key, parts=parts, upload_id=upload_id ) + # Confirm the file now exists in s3 + assert s3_key in await list_s3_objects(bucket=S3_BUCKET, s3_prefix=S3_PREFIX) + +async def test_abort_multipart_upload(): + """"Test aborting a multipart upload.""" + + filename = 'multipart_upload_hello_world.txt' + s3_key = f'{S3_PREFIX}/{filename}' + + # First delete the file from s3 if it exists and verify it is gone from s3 + await delete_s3_object(bucket=S3_BUCKET, s3_prefix=s3_key) + assert s3_key not in await list_s3_objects(bucket=S3_BUCKET, s3_prefix=S3_PREFIX) + + # Create multipart upload, so we can abort it + multipart_upload = await create_multipart_upload(bucket=S3_BUCKET, s3_key=s3_key) + upload_id = multipart_upload["UploadId"] + assert upload_id is not None + + await abort_multipart_upload( + bucket=S3_BUCKET, + s3_key=s3_key, + upload_id=upload_id + ) + + # Verify if the proccess was successfully aborted + assert s3_key not in await list_s3_objects(bucket=S3_BUCKET, s3_prefix=S3_PREFIX) From 0a306bf2ae250b2263a62d89a5dfe1b198118350 Mon Sep 17 00:00:00 2001 From: Carolina Ayumi Matumoto Date: Tue, 27 Apr 2021 14:55:05 -0300 Subject: [PATCH 3/3] [DLB-1204] - FIx typo --- HISTORY.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/HISTORY.rst b/HISTORY.rst index 702eae7..ebc28b0 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -6,7 +6,7 @@ History v0.13.5 (2021-04-27) ----------------------- -* Add functions (create_multipart_upload, upload_part, complete_multipart_upload, abort_multipart_upload, list_parts) to s3 multupart upload. +* Add functions (create_multipart_upload, upload_part, complete_multipart_upload, abort_multipart_upload, list_parts) to s3 multipart upload. v0.13.4 (2021-04-22) -----------------------