From 32cd40e93458ed359000b21456a75226ac85c539 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Nowacki?= Date: Fri, 24 Nov 2023 13:33:22 +0100 Subject: [PATCH 1/3] setuptools and packaging now need to specifically added to requirements.txt --- b2sdk/version_utils.py | 6 +++--- requirements.txt | 2 ++ 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/b2sdk/version_utils.py b/b2sdk/version_utils.py index e36d53dbb..b67a0d06a 100644 --- a/b2sdk/version_utils.py +++ b/b2sdk/version_utils.py @@ -14,7 +14,7 @@ from abc import ABCMeta, abstractmethod from functools import wraps -from pkg_resources import parse_version +from packaging.version import parse from b2sdk.version import VERSION @@ -28,7 +28,7 @@ def __init__(self, changed_version, cutoff_version=None, reason='', current_vers """ if current_version is None: # this is for tests only current_version = VERSION # TODO autodetect by going up the qualname tree and trying getattr(part, '__version__') - self.current_version = parse_version(current_version) #: current version + self.current_version = parse(current_version) #: current version self.reason = reason self.changed_version = self._parse_if_not_none( @@ -42,7 +42,7 @@ def __init__(self, changed_version, cutoff_version=None, reason='', current_vers def _parse_if_not_none(cls, version): if version is None: return None - return parse_version(version) + return parse(version) @abstractmethod def __call__(self, func): diff --git a/requirements.txt b/requirements.txt index e99bd83ef..7092e1928 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,5 @@ logfury>=1.0.1,<2.0.0 requests>=2.9.1,<3.0.0 tqdm>=4.5.0,<5.0.0 typing-extensions>=4.7.1; python_version < '3.12' +setuptools>=60.0 +packaging>=23.0 From 240ed35bb55e06ea51f4ba424e207676aace3524 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Nowacki?= Date: Fri, 24 Nov 2023 16:19:55 +0100 Subject: [PATCH 2/3] fix closing of passed progress listeners internally --- b2sdk/sync/action.py | 59 ++++++++++------- b2sdk/transfer/emerge/executor.py | 43 ++++++------ b2sdk/transfer/outbound/copy_manager.py | 81 +++++++++++------------ b2sdk/transfer/outbound/upload_manager.py | 77 ++++++++++----------- changelog.d/+progress_listeners.fixed.md | 1 + test/unit/bucket/test_bucket.py | 8 +-- test/unit/v0/test_bucket.py | 7 +- 7 files changed, 133 insertions(+), 143 deletions(-) create mode 100644 changelog.d/+progress_listeners.fixed.md diff --git a/b2sdk/sync/action.py b/b2sdk/sync/action.py index 718f0fdda..592dc50f5 100644 --- a/b2sdk/sync/action.py +++ b/b2sdk/sync/action.py @@ -9,6 +9,7 @@ ###################################################################### from __future__ import annotations +import contextlib import logging import os from abc import ABCMeta, abstractmethod @@ -159,14 +160,17 @@ def do_action(self, bucket: Bucket, reporter: ProgressReport) -> None: # The upload will be incremental, calculate the large_file_sha1 large_file_sha1 = self._upload_source.get_content_sha1() - bucket.concatenate( - sources, - self.b2_file_name, - progress_listener=progress_listener, - file_info=file_info, - encryption=encryption, - large_file_sha1=large_file_sha1, - ) + with contextlib.ExitStack() as exit_stack: + if progress_listener: + exit_stack.enter_context(progress_listener) + bucket.concatenate( + sources, + self.b2_file_name, + progress_listener=progress_listener, + file_info=file_info, + encryption=encryption, + large_file_sha1=large_file_sha1, + ) def do_report(self, bucket: Bucket, reporter: ProgressReport) -> None: """ @@ -315,13 +319,15 @@ def do_action(self, bucket: Bucket, reporter: ProgressReport) -> None: bucket=bucket, file_version=self.source_path.selected_version, ) - - downloaded_file = bucket.download_file_by_id( - self.source_path.selected_version.id_, - progress_listener=progress_listener, - encryption=encryption, - ) - downloaded_file.save_to(download_path) + with contextlib.ExitStack() as exit_stack: + if progress_listener: + exit_stack.enter_context(progress_listener) + downloaded_file = bucket.download_file_by_id( + self.source_path.selected_version.id_, + progress_listener=progress_listener, + encryption=encryption, + ) + downloaded_file.save_to(download_path) # Move the file into place with suppress(OSError): @@ -404,16 +410,19 @@ def do_action(self, bucket: Bucket, reporter: ProgressReport) -> None: dest_b2_file_name=self.dest_b2_file_name, ) - bucket.copy( - self.source_path.selected_version.id_, - self.dest_b2_file_name, - length=self.source_path.size, - progress_listener=progress_listener, - destination_encryption=destination_encryption, - source_encryption=source_encryption, - source_file_info=self.source_path.selected_version.file_info, - source_content_type=self.source_path.selected_version.content_type, - ) + with contextlib.ExitStack() as exit_stack: + if progress_listener: + exit_stack.enter_context(progress_listener) + bucket.copy( + self.source_path.selected_version.id_, + self.dest_b2_file_name, + length=self.source_path.size, + progress_listener=progress_listener, + destination_encryption=destination_encryption, + source_encryption=source_encryption, + source_file_info=self.source_path.selected_version.file_info, + source_content_type=self.source_path.selected_version.content_type, + ) def do_report(self, bucket: Bucket, reporter: ProgressReport) -> None: """ diff --git a/b2sdk/transfer/emerge/executor.py b/b2sdk/transfer/emerge/executor.py index bed0fcff3..7654076d1 100644 --- a/b2sdk/transfer/emerge/executor.py +++ b/b2sdk/transfer/emerge/executor.py @@ -208,28 +208,27 @@ def execute_plan(self, emerge_plan: StreamingEmergePlan): ) file_id = unfinished_file.file_id - with self.progress_listener: - large_file_upload_state = LargeFileUploadState(self.progress_listener) - - part_futures = [] - for part_number, emerge_part in emerge_plan.enumerate_emerge_parts(): - execution_step_factory = LargeFileEmergeExecutionStepFactory( - self, - emerge_part, - part_number, - file_id, - large_file_upload_state, - finished_parts=finished_parts, - # it already knows encryption from BaseMergeExecution being passed as self - ) - execution_step = execution_step_factory.get_execution_step() - future = self._execute_step(execution_step) - part_futures.append(future) - - # Collect the sha1 checksums of the parts as the uploads finish. - # If any of them raised an exception, that same exception will - # be raised here by result() - part_sha1_array = [f.result()['contentSha1'] for f in part_futures] + large_file_upload_state = LargeFileUploadState(self.progress_listener) + + part_futures = [] + for part_number, emerge_part in emerge_plan.enumerate_emerge_parts(): + execution_step_factory = LargeFileEmergeExecutionStepFactory( + self, + emerge_part, + part_number, + file_id, + large_file_upload_state, + finished_parts=finished_parts, + # it already knows encryption from BaseMergeExecution being passed as self + ) + execution_step = execution_step_factory.get_execution_step() + future = self._execute_step(execution_step) + part_futures.append(future) + + # Collect the sha1 checksums of the parts as the uploads finish. + # If any of them raised an exception, that same exception will + # be raised here by result() + part_sha1_array = [f.result()['contentSha1'] for f in part_futures] # Finish the large file response = self.services.session.finish_large_file(file_id, part_sha1_array) diff --git a/b2sdk/transfer/outbound/copy_manager.py b/b2sdk/transfer/outbound/copy_manager.py index c62a3e9ba..0c754fff3 100644 --- a/b2sdk/transfer/outbound/copy_manager.py +++ b/b2sdk/transfer/outbound/copy_manager.py @@ -150,48 +150,45 @@ def _copy_small_file( legal_hold: LegalHold | None = None, file_retention: FileRetentionSetting | None = None, ): - with progress_listener: - progress_listener.set_total_bytes(copy_source.get_content_length() or 0) - - bytes_range = copy_source.get_bytes_range() - - if content_type is None: - if file_info is not None: - raise CopyArgumentsMismatch( - 'File info can be set only when content type is set' - ) - metadata_directive = MetadataDirectiveMode.COPY - else: - if file_info is None: - raise CopyArgumentsMismatch( - 'File info can be not set only when content type is not set' - ) - metadata_directive = MetadataDirectiveMode.REPLACE - metadata_directive, file_info, content_type = self.establish_sse_c_file_metadata( - metadata_directive=metadata_directive, - destination_file_info=file_info, - destination_content_type=content_type, - destination_server_side_encryption=destination_encryption, - source_server_side_encryption=source_encryption, - source_file_info=copy_source.source_file_info, - source_content_type=copy_source.source_content_type, - ) - response = self.services.session.copy_file( - copy_source.file_id, - file_name, - bytes_range=bytes_range, - metadata_directive=metadata_directive, - content_type=content_type, - file_info=file_info, - destination_bucket_id=destination_bucket_id, - destination_server_side_encryption=destination_encryption, - source_server_side_encryption=source_encryption, - legal_hold=legal_hold, - file_retention=file_retention, - ) - file_version = self.services.api.file_version_factory.from_api_response(response) - if progress_listener is not None: - progress_listener.bytes_completed(file_version.size) + progress_listener.set_total_bytes(copy_source.get_content_length() or 0) + + bytes_range = copy_source.get_bytes_range() + + if content_type is None: + if file_info is not None: + raise CopyArgumentsMismatch('File info can be set only when content type is set') + metadata_directive = MetadataDirectiveMode.COPY + else: + if file_info is None: + raise CopyArgumentsMismatch( + 'File info can be not set only when content type is not set' + ) + metadata_directive = MetadataDirectiveMode.REPLACE + metadata_directive, file_info, content_type = self.establish_sse_c_file_metadata( + metadata_directive=metadata_directive, + destination_file_info=file_info, + destination_content_type=content_type, + destination_server_side_encryption=destination_encryption, + source_server_side_encryption=source_encryption, + source_file_info=copy_source.source_file_info, + source_content_type=copy_source.source_content_type, + ) + response = self.services.session.copy_file( + copy_source.file_id, + file_name, + bytes_range=bytes_range, + metadata_directive=metadata_directive, + content_type=content_type, + file_info=file_info, + destination_bucket_id=destination_bucket_id, + destination_server_side_encryption=destination_encryption, + source_server_side_encryption=source_encryption, + legal_hold=legal_hold, + file_retention=file_retention, + ) + file_version = self.services.api.file_version_factory.from_api_response(response) + if progress_listener is not None: + progress_listener.bytes_completed(file_version.size) return file_version diff --git a/b2sdk/transfer/outbound/upload_manager.py b/b2sdk/transfer/outbound/upload_manager.py index 370eb1f41..2d25838cc 100644 --- a/b2sdk/transfer/outbound/upload_manager.py +++ b/b2sdk/transfer/outbound/upload_manager.py @@ -202,46 +202,41 @@ def _upload_small_file( content_length = upload_source.get_content_length() exception_info_list = [] progress_listener.set_total_bytes(content_length) - with progress_listener: - for _ in range(self.MAX_UPLOAD_ATTEMPTS): - try: - with upload_source.open() as file: - input_stream = ReadingStreamWithProgress( - file, progress_listener, length=content_length - ) - if upload_source.is_sha1_known(): - content_sha1 = upload_source.get_content_sha1() - else: - input_stream = StreamWithHash( - input_stream, stream_length=content_length - ) - content_sha1 = HEX_DIGITS_AT_END - # it is important that `len()` works on `input_stream` - response = self.services.session.upload_file( - bucket_id, - file_name, - len(input_stream), - content_type, - content_sha1, - file_info, - input_stream, - server_side_encryption=encryption, # todo: client side encryption - file_retention=file_retention, - legal_hold=legal_hold, - custom_upload_timestamp=custom_upload_timestamp, - ) - if content_sha1 == HEX_DIGITS_AT_END: - content_sha1 = input_stream.hash - assert content_sha1 == 'do_not_verify' or content_sha1 == response[ - 'contentSha1'], '{} != {}'.format( - content_sha1, response['contentSha1'] - ) - return self.services.api.file_version_factory.from_api_response(response) - - except B2Error as e: - if not e.should_retry_upload(): - raise - exception_info_list.append(e) - self.account_info.clear_bucket_upload_data(bucket_id) + for _ in range(self.MAX_UPLOAD_ATTEMPTS): + try: + with upload_source.open() as file: + input_stream = ReadingStreamWithProgress( + file, progress_listener, length=content_length + ) + if upload_source.is_sha1_known(): + content_sha1 = upload_source.get_content_sha1() + else: + input_stream = StreamWithHash(input_stream, stream_length=content_length) + content_sha1 = HEX_DIGITS_AT_END + # it is important that `len()` works on `input_stream` + response = self.services.session.upload_file( + bucket_id, + file_name, + len(input_stream), + content_type, + content_sha1, + file_info, + input_stream, + server_side_encryption=encryption, # todo: client side encryption + file_retention=file_retention, + legal_hold=legal_hold, + custom_upload_timestamp=custom_upload_timestamp, + ) + if content_sha1 == HEX_DIGITS_AT_END: + content_sha1 = input_stream.hash + assert content_sha1 == 'do_not_verify' or content_sha1 == response[ + 'contentSha1'], '{} != {}'.format(content_sha1, response['contentSha1']) + return self.services.api.file_version_factory.from_api_response(response) + + except B2Error as e: + if not e.should_retry_upload(): + raise + exception_info_list.append(e) + self.account_info.clear_bucket_upload_data(bucket_id) raise MaxRetriesExceeded(self.MAX_UPLOAD_ATTEMPTS, exception_info_list) diff --git a/changelog.d/+progress_listeners.fixed.md b/changelog.d/+progress_listeners.fixed.md new file mode 100644 index 000000000..b1a239f20 --- /dev/null +++ b/changelog.d/+progress_listeners.fixed.md @@ -0,0 +1 @@ +Fix closing of passed progress listeners in `Bucket.upload` and `Bucket.copy` \ No newline at end of file diff --git a/test/unit/bucket/test_bucket.py b/test/unit/bucket/test_bucket.py index d8bc73138..e77cec4d7 100644 --- a/test/unit/bucket/test_bucket.py +++ b/test/unit/bucket/test_bucket.py @@ -189,9 +189,7 @@ def is_valid(self, **kwargs): valid, _ = self.is_valid_reason(**kwargs) return valid - def is_valid_reason( - self, check_closed=True, check_progress=True, check_monotonic_progress=False - ): + def is_valid_reason(self, check_progress=True, check_monotonic_progress=False): progress_end = -1 if self.history[progress_end] == 'closed': progress_end = -2 @@ -206,8 +204,6 @@ def is_valid_reason( prev = val if self.total != self.last_byte_count: return False, 'total different than last_byte_count' - if check_closed and self.history[-1] != 'closed': - return False, 'no "closed" at the end of history' if check_progress and len(self.history[1:progress_end]) < 2: return False, 'progress in history has less than 2 entries' return True, '' @@ -1995,7 +1991,6 @@ def _verify(self, expected_result, check_progress_listener=True): self._assert_downloaded_data(expected_result) if check_progress_listener: valid, reason = self.progress_listener.is_valid_reason( - check_closed=False, check_progress=False, check_monotonic_progress=True, ) @@ -2613,7 +2608,6 @@ def _verify(self, expected_result, check_progress_listener=True): self._assert_downloaded_data(expected_result) if check_progress_listener: valid, reason = self.progress_listener.is_valid_reason( - check_closed=False, check_progress=False, check_monotonic_progress=True, ) diff --git a/test/unit/v0/test_bucket.py b/test/unit/v0/test_bucket.py index ff2aee33b..fd8754e2c 100644 --- a/test/unit/v0/test_bucket.py +++ b/test/unit/v0/test_bucket.py @@ -127,9 +127,7 @@ def is_valid(self, **kwargs): valid, _ = self.is_valid_reason(**kwargs) return valid - def is_valid_reason( - self, check_closed=True, check_progress=True, check_monotonic_progress=False - ): + def is_valid_reason(self, check_progress=True, check_monotonic_progress=False): progress_end = -1 if self.history[progress_end] == 'closed': progress_end = -2 @@ -144,8 +142,6 @@ def is_valid_reason( prev = val if self.total != self.last_byte_count: return False, 'total different than last_byte_count' - if check_closed and self.history[-1] != 'closed': - return False, 'no "closed" at the end of history' if check_progress and len(self.history[1:progress_end]) < 2: return False, 'progress in history has less than 2 entries' return True, '' @@ -1075,7 +1071,6 @@ def _verify(self, expected_result, check_progress_listener=True): assert self.download_dest.get_bytes_written() == expected_result.encode() if check_progress_listener: valid, reason = self.progress_listener.is_valid_reason( - check_closed=False, check_progress=False, check_monotonic_progress=True, ) From a27198769339b3364f0196b90cc7debe550da944 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Nowacki?= Date: Sun, 26 Nov 2023 21:02:07 +0100 Subject: [PATCH 3/3] post review fixes --- b2sdk/transfer/outbound/copy_manager.py | 6 +++--- changelog.d/+setuptools.changed.md | 1 + requirements.txt | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) create mode 100644 changelog.d/+setuptools.changed.md diff --git a/b2sdk/transfer/outbound/copy_manager.py b/b2sdk/transfer/outbound/copy_manager.py index 0c754fff3..0714ed805 100644 --- a/b2sdk/transfer/outbound/copy_manager.py +++ b/b2sdk/transfer/outbound/copy_manager.py @@ -15,6 +15,7 @@ from b2sdk.exception import AlreadyFailed, CopyArgumentsMismatch, SSECKeyIdMismatchInCopy from b2sdk.file_lock import FileRetentionSetting, LegalHold from b2sdk.http_constants import SSE_C_KEY_ID_FILE_INFO_KEY_NAME +from b2sdk.progress import AbstractProgressListener from b2sdk.raw_api import MetadataDirectiveMode from b2sdk.transfer.transfer_manager import TransferManager from b2sdk.utils.thread_pool import ThreadPoolMixin @@ -144,7 +145,7 @@ def _copy_small_file( content_type, file_info, destination_bucket_id, - progress_listener, + progress_listener: AbstractProgressListener, destination_encryption: EncryptionSetting | None, source_encryption: EncryptionSetting | None, legal_hold: LegalHold | None = None, @@ -187,8 +188,7 @@ def _copy_small_file( file_retention=file_retention, ) file_version = self.services.api.file_version_factory.from_api_response(response) - if progress_listener is not None: - progress_listener.bytes_completed(file_version.size) + progress_listener.bytes_completed(file_version.size) return file_version diff --git a/changelog.d/+setuptools.changed.md b/changelog.d/+setuptools.changed.md new file mode 100644 index 000000000..9b1dab25f --- /dev/null +++ b/changelog.d/+setuptools.changed.md @@ -0,0 +1 @@ +Add dependency on `setuptools` and `packaging` as they are not shipped by cpython 3.12 and are used in production code. \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 7092e1928..62561d9f5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,4 @@ requests>=2.9.1,<3.0.0 tqdm>=4.5.0,<5.0.0 typing-extensions>=4.7.1; python_version < '3.12' setuptools>=60.0 -packaging>=23.0 +packaging>=21.0