Skip to content

Commit

Permalink
Merge pull request #450 from Backblaze/python3.12-compat
Browse files Browse the repository at this point in the history
Python3.12 compatibility and fix closing of progress_listeners
  • Loading branch information
mpnowacki-reef authored Nov 26, 2023
2 parents 15db41e + a271987 commit c1ebafd
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 147 deletions.
59 changes: 34 additions & 25 deletions b2sdk/sync/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
######################################################################
from __future__ import annotations

import contextlib
import logging
import os
from abc import ABCMeta, abstractmethod
Expand Down Expand Up @@ -159,14 +160,17 @@ def do_action(self, bucket: Bucket, reporter: ProgressReport) -> None:
# The upload will be incremental, calculate the large_file_sha1
large_file_sha1 = self._upload_source.get_content_sha1()

bucket.concatenate(
sources,
self.b2_file_name,
progress_listener=progress_listener,
file_info=file_info,
encryption=encryption,
large_file_sha1=large_file_sha1,
)
with contextlib.ExitStack() as exit_stack:
if progress_listener:
exit_stack.enter_context(progress_listener)
bucket.concatenate(
sources,
self.b2_file_name,
progress_listener=progress_listener,
file_info=file_info,
encryption=encryption,
large_file_sha1=large_file_sha1,
)

def do_report(self, bucket: Bucket, reporter: ProgressReport) -> None:
"""
Expand Down Expand Up @@ -315,13 +319,15 @@ def do_action(self, bucket: Bucket, reporter: ProgressReport) -> None:
bucket=bucket,
file_version=self.source_path.selected_version,
)

downloaded_file = bucket.download_file_by_id(
self.source_path.selected_version.id_,
progress_listener=progress_listener,
encryption=encryption,
)
downloaded_file.save_to(download_path)
with contextlib.ExitStack() as exit_stack:
if progress_listener:
exit_stack.enter_context(progress_listener)
downloaded_file = bucket.download_file_by_id(
self.source_path.selected_version.id_,
progress_listener=progress_listener,
encryption=encryption,
)
downloaded_file.save_to(download_path)

# Move the file into place
with suppress(OSError):
Expand Down Expand Up @@ -404,16 +410,19 @@ def do_action(self, bucket: Bucket, reporter: ProgressReport) -> None:
dest_b2_file_name=self.dest_b2_file_name,
)

bucket.copy(
self.source_path.selected_version.id_,
self.dest_b2_file_name,
length=self.source_path.size,
progress_listener=progress_listener,
destination_encryption=destination_encryption,
source_encryption=source_encryption,
source_file_info=self.source_path.selected_version.file_info,
source_content_type=self.source_path.selected_version.content_type,
)
with contextlib.ExitStack() as exit_stack:
if progress_listener:
exit_stack.enter_context(progress_listener)
bucket.copy(
self.source_path.selected_version.id_,
self.dest_b2_file_name,
length=self.source_path.size,
progress_listener=progress_listener,
destination_encryption=destination_encryption,
source_encryption=source_encryption,
source_file_info=self.source_path.selected_version.file_info,
source_content_type=self.source_path.selected_version.content_type,
)

def do_report(self, bucket: Bucket, reporter: ProgressReport) -> None:
"""
Expand Down
43 changes: 21 additions & 22 deletions b2sdk/transfer/emerge/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,28 +208,27 @@ def execute_plan(self, emerge_plan: StreamingEmergePlan):
)
file_id = unfinished_file.file_id

with self.progress_listener:
large_file_upload_state = LargeFileUploadState(self.progress_listener)

part_futures = []
for part_number, emerge_part in emerge_plan.enumerate_emerge_parts():
execution_step_factory = LargeFileEmergeExecutionStepFactory(
self,
emerge_part,
part_number,
file_id,
large_file_upload_state,
finished_parts=finished_parts,
# it already knows encryption from BaseMergeExecution being passed as self
)
execution_step = execution_step_factory.get_execution_step()
future = self._execute_step(execution_step)
part_futures.append(future)

# Collect the sha1 checksums of the parts as the uploads finish.
# If any of them raised an exception, that same exception will
# be raised here by result()
part_sha1_array = [f.result()['contentSha1'] for f in part_futures]
large_file_upload_state = LargeFileUploadState(self.progress_listener)

part_futures = []
for part_number, emerge_part in emerge_plan.enumerate_emerge_parts():
execution_step_factory = LargeFileEmergeExecutionStepFactory(
self,
emerge_part,
part_number,
file_id,
large_file_upload_state,
finished_parts=finished_parts,
# it already knows encryption from BaseMergeExecution being passed as self
)
execution_step = execution_step_factory.get_execution_step()
future = self._execute_step(execution_step)
part_futures.append(future)

# Collect the sha1 checksums of the parts as the uploads finish.
# If any of them raised an exception, that same exception will
# be raised here by result()
part_sha1_array = [f.result()['contentSha1'] for f in part_futures]

# Finish the large file
response = self.services.session.finish_large_file(file_id, part_sha1_array)
Expand Down
83 changes: 40 additions & 43 deletions b2sdk/transfer/outbound/copy_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from b2sdk.exception import AlreadyFailed, CopyArgumentsMismatch, SSECKeyIdMismatchInCopy
from b2sdk.file_lock import FileRetentionSetting, LegalHold
from b2sdk.http_constants import SSE_C_KEY_ID_FILE_INFO_KEY_NAME
from b2sdk.progress import AbstractProgressListener
from b2sdk.raw_api import MetadataDirectiveMode
from b2sdk.transfer.transfer_manager import TransferManager
from b2sdk.utils.thread_pool import ThreadPoolMixin
Expand Down Expand Up @@ -144,54 +145,50 @@ def _copy_small_file(
content_type,
file_info,
destination_bucket_id,
progress_listener,
progress_listener: AbstractProgressListener,
destination_encryption: EncryptionSetting | None,
source_encryption: EncryptionSetting | None,
legal_hold: LegalHold | None = None,
file_retention: FileRetentionSetting | None = None,
):
with progress_listener:
progress_listener.set_total_bytes(copy_source.get_content_length() or 0)

bytes_range = copy_source.get_bytes_range()

if content_type is None:
if file_info is not None:
raise CopyArgumentsMismatch(
'File info can be set only when content type is set'
)
metadata_directive = MetadataDirectiveMode.COPY
else:
if file_info is None:
raise CopyArgumentsMismatch(
'File info can be not set only when content type is not set'
)
metadata_directive = MetadataDirectiveMode.REPLACE
metadata_directive, file_info, content_type = self.establish_sse_c_file_metadata(
metadata_directive=metadata_directive,
destination_file_info=file_info,
destination_content_type=content_type,
destination_server_side_encryption=destination_encryption,
source_server_side_encryption=source_encryption,
source_file_info=copy_source.source_file_info,
source_content_type=copy_source.source_content_type,
)
response = self.services.session.copy_file(
copy_source.file_id,
file_name,
bytes_range=bytes_range,
metadata_directive=metadata_directive,
content_type=content_type,
file_info=file_info,
destination_bucket_id=destination_bucket_id,
destination_server_side_encryption=destination_encryption,
source_server_side_encryption=source_encryption,
legal_hold=legal_hold,
file_retention=file_retention,
)
file_version = self.services.api.file_version_factory.from_api_response(response)
if progress_listener is not None:
progress_listener.bytes_completed(file_version.size)
progress_listener.set_total_bytes(copy_source.get_content_length() or 0)

bytes_range = copy_source.get_bytes_range()

if content_type is None:
if file_info is not None:
raise CopyArgumentsMismatch('File info can be set only when content type is set')
metadata_directive = MetadataDirectiveMode.COPY
else:
if file_info is None:
raise CopyArgumentsMismatch(
'File info can be not set only when content type is not set'
)
metadata_directive = MetadataDirectiveMode.REPLACE
metadata_directive, file_info, content_type = self.establish_sse_c_file_metadata(
metadata_directive=metadata_directive,
destination_file_info=file_info,
destination_content_type=content_type,
destination_server_side_encryption=destination_encryption,
source_server_side_encryption=source_encryption,
source_file_info=copy_source.source_file_info,
source_content_type=copy_source.source_content_type,
)
response = self.services.session.copy_file(
copy_source.file_id,
file_name,
bytes_range=bytes_range,
metadata_directive=metadata_directive,
content_type=content_type,
file_info=file_info,
destination_bucket_id=destination_bucket_id,
destination_server_side_encryption=destination_encryption,
source_server_side_encryption=source_encryption,
legal_hold=legal_hold,
file_retention=file_retention,
)
file_version = self.services.api.file_version_factory.from_api_response(response)
progress_listener.bytes_completed(file_version.size)

return file_version

Expand Down
77 changes: 36 additions & 41 deletions b2sdk/transfer/outbound/upload_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,46 +202,41 @@ def _upload_small_file(
content_length = upload_source.get_content_length()
exception_info_list = []
progress_listener.set_total_bytes(content_length)
with progress_listener:
for _ in range(self.MAX_UPLOAD_ATTEMPTS):
try:
with upload_source.open() as file:
input_stream = ReadingStreamWithProgress(
file, progress_listener, length=content_length
)
if upload_source.is_sha1_known():
content_sha1 = upload_source.get_content_sha1()
else:
input_stream = StreamWithHash(
input_stream, stream_length=content_length
)
content_sha1 = HEX_DIGITS_AT_END
# it is important that `len()` works on `input_stream`
response = self.services.session.upload_file(
bucket_id,
file_name,
len(input_stream),
content_type,
content_sha1,
file_info,
input_stream,
server_side_encryption=encryption, # todo: client side encryption
file_retention=file_retention,
legal_hold=legal_hold,
custom_upload_timestamp=custom_upload_timestamp,
)
if content_sha1 == HEX_DIGITS_AT_END:
content_sha1 = input_stream.hash
assert content_sha1 == 'do_not_verify' or content_sha1 == response[
'contentSha1'], '{} != {}'.format(
content_sha1, response['contentSha1']
)
return self.services.api.file_version_factory.from_api_response(response)

except B2Error as e:
if not e.should_retry_upload():
raise
exception_info_list.append(e)
self.account_info.clear_bucket_upload_data(bucket_id)
for _ in range(self.MAX_UPLOAD_ATTEMPTS):
try:
with upload_source.open() as file:
input_stream = ReadingStreamWithProgress(
file, progress_listener, length=content_length
)
if upload_source.is_sha1_known():
content_sha1 = upload_source.get_content_sha1()
else:
input_stream = StreamWithHash(input_stream, stream_length=content_length)
content_sha1 = HEX_DIGITS_AT_END
# it is important that `len()` works on `input_stream`
response = self.services.session.upload_file(
bucket_id,
file_name,
len(input_stream),
content_type,
content_sha1,
file_info,
input_stream,
server_side_encryption=encryption, # todo: client side encryption
file_retention=file_retention,
legal_hold=legal_hold,
custom_upload_timestamp=custom_upload_timestamp,
)
if content_sha1 == HEX_DIGITS_AT_END:
content_sha1 = input_stream.hash
assert content_sha1 == 'do_not_verify' or content_sha1 == response[
'contentSha1'], '{} != {}'.format(content_sha1, response['contentSha1'])
return self.services.api.file_version_factory.from_api_response(response)

except B2Error as e:
if not e.should_retry_upload():
raise
exception_info_list.append(e)
self.account_info.clear_bucket_upload_data(bucket_id)

raise MaxRetriesExceeded(self.MAX_UPLOAD_ATTEMPTS, exception_info_list)
6 changes: 3 additions & 3 deletions b2sdk/version_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from abc import ABCMeta, abstractmethod
from functools import wraps

from pkg_resources import parse_version
from packaging.version import parse

from b2sdk.version import VERSION

Expand All @@ -28,7 +28,7 @@ def __init__(self, changed_version, cutoff_version=None, reason='', current_vers
"""
if current_version is None: # this is for tests only
current_version = VERSION # TODO autodetect by going up the qualname tree and trying getattr(part, '__version__')
self.current_version = parse_version(current_version) #: current version
self.current_version = parse(current_version) #: current version
self.reason = reason

self.changed_version = self._parse_if_not_none(
Expand All @@ -42,7 +42,7 @@ def __init__(self, changed_version, cutoff_version=None, reason='', current_vers
def _parse_if_not_none(cls, version):
if version is None:
return None
return parse_version(version)
return parse(version)

@abstractmethod
def __call__(self, func):
Expand Down
1 change: 1 addition & 0 deletions changelog.d/+progress_listeners.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix closing of passed progress listeners in `Bucket.upload` and `Bucket.copy`
1 change: 1 addition & 0 deletions changelog.d/+setuptools.changed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add dependency on `setuptools` and `packaging` as they are not shipped by cpython 3.12 and are used in production code.
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ logfury>=1.0.1,<2.0.0
requests>=2.9.1,<3.0.0
tqdm>=4.5.0,<5.0.0
typing-extensions>=4.7.1; python_version < '3.12'
setuptools>=60.0
packaging>=21.0
8 changes: 1 addition & 7 deletions test/unit/bucket/test_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,7 @@ def is_valid(self, **kwargs):
valid, _ = self.is_valid_reason(**kwargs)
return valid

def is_valid_reason(
self, check_closed=True, check_progress=True, check_monotonic_progress=False
):
def is_valid_reason(self, check_progress=True, check_monotonic_progress=False):
progress_end = -1
if self.history[progress_end] == 'closed':
progress_end = -2
Expand All @@ -206,8 +204,6 @@ def is_valid_reason(
prev = val
if self.total != self.last_byte_count:
return False, 'total different than last_byte_count'
if check_closed and self.history[-1] != 'closed':
return False, 'no "closed" at the end of history'
if check_progress and len(self.history[1:progress_end]) < 2:
return False, 'progress in history has less than 2 entries'
return True, ''
Expand Down Expand Up @@ -1995,7 +1991,6 @@ def _verify(self, expected_result, check_progress_listener=True):
self._assert_downloaded_data(expected_result)
if check_progress_listener:
valid, reason = self.progress_listener.is_valid_reason(
check_closed=False,
check_progress=False,
check_monotonic_progress=True,
)
Expand Down Expand Up @@ -2613,7 +2608,6 @@ def _verify(self, expected_result, check_progress_listener=True):
self._assert_downloaded_data(expected_result)
if check_progress_listener:
valid, reason = self.progress_listener.is_valid_reason(
check_closed=False,
check_progress=False,
check_monotonic_progress=True,
)
Expand Down
Loading

0 comments on commit c1ebafd

Please sign in to comment.