Skip to content

Commit

Permalink
Fix usage of progress listener
Browse files Browse the repository at this point in the history
  • Loading branch information
mlech-reef committed Sep 17, 2020
1 parent c1b51f9 commit 2f0f77f
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 157 deletions.
3 changes: 1 addition & 2 deletions b2sdk/download_dest.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,5 +219,4 @@ def write_file_and_report_progress_context(
if range_ is not None:
total_bytes = range_[1] - range_[0] + 1
self.progress_listener.set_total_bytes(total_bytes)
with self.progress_listener:
yield WritingStreamWithProgress(file_, self.progress_listener)
yield WritingStreamWithProgress(file_, self.progress_listener)
39 changes: 22 additions & 17 deletions b2sdk/sync/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import logging
import os
from ..download_dest import DownloadDestLocalFile
from ..progress import DoNothingProgressListener
from ..raw_api import SRC_LAST_MODIFIED_MILLIS
from ..transfer.outbound.upload_source import UploadSourceLocalFile
from .report import SyncFileReporter
Expand Down Expand Up @@ -122,13 +123,15 @@ def do_action(self, bucket, reporter):
if reporter:
progress_listener = SyncFileReporter(reporter)
else:
progress_listener = None
bucket.upload(
UploadSourceLocalFile(self.local_full_path),
self.b2_file_name,
file_info={SRC_LAST_MODIFIED_MILLIS: str(self.mod_time_millis)},
progress_listener=progress_listener
)
progress_listener = DoNothingProgressListener()

with progress_listener:
bucket.upload(
UploadSourceLocalFile(self.local_full_path),
self.b2_file_name,
file_info={SRC_LAST_MODIFIED_MILLIS: str(self.mod_time_millis)},
progress_listener=progress_listener
)

def do_report(self, bucket, reporter):
"""
Expand Down Expand Up @@ -245,12 +248,13 @@ def do_action(self, bucket, reporter):
if reporter:
progress_listener = SyncFileReporter(reporter)
else:
progress_listener = None
progress_listener = DoNothingProgressListener()

# Download the file to a .tmp file
download_path = self.local_full_path + '.b2.sync.tmp'
download_dest = DownloadDestLocalFile(download_path)
bucket.download_file_by_id(self.file_id, download_dest, progress_listener)
with progress_listener:
bucket.download_file_by_id(self.file_id, download_dest, progress_listener)

# Move the file into place
try:
Expand Down Expand Up @@ -324,14 +328,15 @@ def do_action(self, bucket, reporter):
if reporter:
progress_listener = SyncFileReporter(reporter)
else:
progress_listener = None

bucket.copy(
self.file_id,
self.dest_b2_file_name,
length=self.size,
progress_listener=progress_listener
)
progress_listener = DoNothingProgressListener()

with progress_listener:
bucket.copy(
self.file_id,
self.dest_b2_file_name,
length=self.size,
progress_listener=progress_listener
)

def do_report(self, bucket, reporter):
"""
Expand Down
2 changes: 0 additions & 2 deletions b2sdk/sync/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ def make_folder_sync_actions(
reporter.update_total(1)
reporter.update_compare(1)

import time
for action in self.make_file_sync_actions(
sync_type,
source_file,
Expand All @@ -278,7 +277,6 @@ def make_folder_sync_actions(
total_files += 1
total_bytes += action.get_bytes()
yield action
time.sleep(.02)

if reporter is not None:
if source_type == 'b2':
Expand Down
41 changes: 20 additions & 21 deletions b2sdk/transfer/emerge/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,27 +147,26 @@ def execute_plan(self, emerge_plan):
)
file_id = unfinished_file.file_id

with self.progress_listener:
large_file_upload_state = LargeFileUploadState(self.progress_listener)

part_futures = []
for part_number, emerge_part in emerge_plan.enumerate_emerge_parts():
execution_step_factory = LargeFileEmergeExecutionStepFactory(
self,
emerge_part,
part_number,
file_id,
large_file_upload_state,
finished_parts=finished_parts,
)
execution_step = execution_step_factory.get_execution_step()
future = self._execute_step(execution_step)
part_futures.append(future)

# Collect the sha1 checksums of the parts as the uploads finish.
# If any of them raised an exception, that same exception will
# be raised here by result()
part_sha1_array = [interruptible_get_result(f)['contentSha1'] for f in part_futures]
large_file_upload_state = LargeFileUploadState(self.progress_listener)

part_futures = []
for part_number, emerge_part in emerge_plan.enumerate_emerge_parts():
execution_step_factory = LargeFileEmergeExecutionStepFactory(
self,
emerge_part,
part_number,
file_id,
large_file_upload_state,
finished_parts=finished_parts,
)
execution_step = execution_step_factory.get_execution_step()
future = self._execute_step(execution_step)
part_futures.append(future)

# Collect the sha1 checksums of the parts as the uploads finish.
# If any of them raised an exception, that same exception will
# be raised here by result()
part_sha1_array = [interruptible_get_result(f)['contentSha1'] for f in part_futures]

# Finish the large file
response = self.services.session.finish_large_file(file_id, part_sha1_array)
Expand Down
50 changes: 24 additions & 26 deletions b2sdk/transfer/outbound/copy_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,31 +151,29 @@ def _copy_small_file(
destination_bucket_id,
progress_listener,
):
with progress_listener:
progress_listener.set_total_bytes(copy_source.get_content_length() or 0)

bytes_range = copy_source.get_bytes_range()

if content_type is None:
if file_info is not None:
raise ValueError('File info can be set only when content type is set')
metadata_directive = MetadataDirectiveMode.COPY
else:
if file_info is None:
raise ValueError('File info can be not set only when content type is not set')
metadata_directive = MetadataDirectiveMode.REPLACE

response = self.services.session.copy_file(
copy_source.file_id,
file_name,
bytes_range=bytes_range,
metadata_directive=metadata_directive,
content_type=content_type,
file_info=file_info,
destination_bucket_id=destination_bucket_id
)
file_info = FileVersionInfoFactory.from_api_response(response)
if progress_listener is not None:
progress_listener.bytes_completed(file_info.size)
progress_listener.set_total_bytes(copy_source.get_content_length() or 0)

bytes_range = copy_source.get_bytes_range()

if content_type is None:
if file_info is not None:
raise ValueError('File info can be set only when content type is set')
metadata_directive = MetadataDirectiveMode.COPY
else:
if file_info is None:
raise ValueError('File info can be not set only when content type is not set')
metadata_directive = MetadataDirectiveMode.REPLACE

response = self.services.session.copy_file(
copy_source.file_id,
file_name,
bytes_range=bytes_range,
metadata_directive=metadata_directive,
content_type=content_type,
file_info=file_info,
destination_bucket_id=destination_bucket_id
)
file_info = FileVersionInfoFactory.from_api_response(response)
progress_listener.bytes_completed(file_info.size)

return file_info
56 changes: 27 additions & 29 deletions b2sdk/transfer/outbound/upload_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,34 +184,32 @@ def _upload_small_file(
content_length = upload_source.get_content_length()
exception_info_list = []
progress_listener.set_total_bytes(content_length)
with progress_listener:
for _ in range(self.MAX_UPLOAD_ATTEMPTS):
try:
with upload_source.open() as file:
input_stream = ReadingStreamWithProgress(
file, progress_listener, length=content_length
)
if upload_source.is_sha1_known():
content_sha1 = upload_source.get_content_sha1()
else:
input_stream = StreamWithHash(
input_stream, stream_length=content_length
)
content_sha1 = HEX_DIGITS_AT_END
# it is important that `len()` works on `input_stream`
response = self.services.session.upload_file(
bucket_id, file_name, len(input_stream), content_type, content_sha1,
file_info, input_stream
)
if content_sha1 == HEX_DIGITS_AT_END:
content_sha1 = input_stream.hash
assert content_sha1 == response['contentSha1']
return FileVersionInfoFactory.from_api_response(response)

except B2Error as e:
if not e.should_retry_upload():
raise
exception_info_list.append(e)
self.account_info.clear_bucket_upload_data(bucket_id)

for _ in range(self.MAX_UPLOAD_ATTEMPTS):
try:
with upload_source.open() as file:
input_stream = ReadingStreamWithProgress(
file, progress_listener, length=content_length
)
if upload_source.is_sha1_known():
content_sha1 = upload_source.get_content_sha1()
else:
input_stream = StreamWithHash(input_stream, stream_length=content_length)
content_sha1 = HEX_DIGITS_AT_END
# it is important that `len()` works on `input_stream`
response = self.services.session.upload_file(
bucket_id, file_name, len(input_stream), content_type, content_sha1,
file_info, input_stream
)
if content_sha1 == HEX_DIGITS_AT_END:
content_sha1 = input_stream.hash
assert content_sha1 == response['contentSha1']
return FileVersionInfoFactory.from_api_response(response)

except B2Error as e:
if not e.should_retry_upload():
raise
exception_info_list.append(e)
self.account_info.clear_bucket_upload_data(bucket_id)

raise MaxRetriesExceeded(self.MAX_UPLOAD_ATTEMPTS, exception_info_list)
50 changes: 28 additions & 22 deletions test/unit/v0/test_bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ def test_upload_bytes(self):

def test_upload_bytes_progress(self):
data = b'hello world'
progress_listener = StubProgressListener()
self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
with StubProgressListener() as progress_listener:
self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
self.assertTrue(progress_listener.is_valid())

def test_upload_local_file(self):
Expand Down Expand Up @@ -475,8 +475,8 @@ def test_upload_file_too_many_retryable_errors(self):

def test_upload_large(self):
data = self._make_data(self.simulator.MIN_PART_SIZE * 3)
progress_listener = StubProgressListener()
self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
with StubProgressListener() as progress_listener:
self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
self._check_file_contents('file1', data)
self.assertTrue(progress_listener.is_valid())

Expand All @@ -485,8 +485,8 @@ def test_upload_large_resume(self):
data = self._make_data(part_size * 3)
large_file_id = self._start_large_file('file1')
self._upload_part(large_file_id, 1, data[:part_size])
progress_listener = StubProgressListener()
file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
with StubProgressListener() as progress_listener:
file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
self.assertEqual(large_file_id, file_info.id_)
self._check_file_contents('file1', data)
self.assertTrue(progress_listener.is_valid())
Expand All @@ -495,8 +495,8 @@ def test_upload_large_resume_no_parts(self):
part_size = self.simulator.MIN_PART_SIZE
data = self._make_data(part_size * 3)
large_file_id = self._start_large_file('file1')
progress_listener = StubProgressListener()
file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
with StubProgressListener() as progress_listener:
file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
self.assertNotEqual(large_file_id, file_info.id_) # it's not a match if there are no parts
self._check_file_contents('file1', data)
self.assertTrue(progress_listener.is_valid())
Expand All @@ -508,8 +508,8 @@ def test_upload_large_resume_all_parts_there(self):
self._upload_part(large_file_id, 1, data[:part_size])
self._upload_part(large_file_id, 2, data[part_size:2 * part_size])
self._upload_part(large_file_id, 3, data[2 * part_size:])
progress_listener = StubProgressListener()
file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
with StubProgressListener() as progress_listener:
file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
self.assertEqual(large_file_id, file_info.id_)
self._check_file_contents('file1', data)
self.assertTrue(progress_listener.is_valid())
Expand All @@ -519,8 +519,8 @@ def test_upload_large_resume_part_does_not_match(self):
data = self._make_data(part_size * 3)
large_file_id = self._start_large_file('file1')
self._upload_part(large_file_id, 3, data[:part_size]) # wrong part number for this data
progress_listener = StubProgressListener()
file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
with StubProgressListener() as progress_listener:
file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
self.assertNotEqual(large_file_id, file_info.id_)
self._check_file_contents('file1', data)
self.assertTrue(progress_listener.is_valid())
Expand All @@ -530,8 +530,8 @@ def test_upload_large_resume_wrong_part_size(self):
data = self._make_data(part_size * 3)
large_file_id = self._start_large_file('file1')
self._upload_part(large_file_id, 1, data[:part_size + 1]) # one byte to much
progress_listener = StubProgressListener()
file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
with StubProgressListener() as progress_listener:
file_info = self.bucket.upload_bytes(data, 'file1', progress_listener=progress_listener)
self.assertNotEqual(large_file_id, file_info.id_)
self._check_file_contents('file1', data)
self.assertTrue(progress_listener.is_valid())
Expand All @@ -541,10 +541,13 @@ def test_upload_large_resume_file_info(self):
data = self._make_data(part_size * 3)
large_file_id = self._start_large_file('file1', {'property': 'value1'})
self._upload_part(large_file_id, 1, data[:part_size])
progress_listener = StubProgressListener()
file_info = self.bucket.upload_bytes(
data, 'file1', progress_listener=progress_listener, file_infos={'property': 'value1'}
)
with StubProgressListener() as progress_listener:
file_info = self.bucket.upload_bytes(
data,
'file1',
progress_listener=progress_listener,
file_infos={'property': 'value1'}
)
self.assertEqual(large_file_id, file_info.id_)
self._check_file_contents('file1', data)
self.assertTrue(progress_listener.is_valid())
Expand All @@ -554,10 +557,13 @@ def test_upload_large_resume_file_info_does_not_match(self):
data = self._make_data(part_size * 3)
large_file_id = self._start_large_file('file1', {'property': 'value1'})
self._upload_part(large_file_id, 1, data[:part_size])
progress_listener = StubProgressListener()
file_info = self.bucket.upload_bytes(
data, 'file1', progress_listener=progress_listener, file_infos={'property': 'value2'}
)
with StubProgressListener() as progress_listener:
file_info = self.bucket.upload_bytes(
data,
'file1',
progress_listener=progress_listener,
file_infos={'property': 'value2'}
)
self.assertNotEqual(large_file_id, file_info.id_)
self._check_file_contents('file1', data)
self.assertTrue(progress_listener.is_valid())
Expand Down
12 changes: 6 additions & 6 deletions test/unit/v0/test_download_dest.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ def test_write_and_set_mod_time_and_progress(self):
with TempDir() as temp_dir:
file_path = os.path.join(temp_dir, "test.txt")
download_local_file = DownloadDestLocalFile(file_path)
progress_listener = ProgressListenerForTest()
download_dest = DownloadDestProgressWrapper(download_local_file, progress_listener)
with download_dest.make_file_context(
"file_id", "file_name", 100, "content_type", "sha1", {}, mod_time
) as f:
f.write(b'hello world\n')
with ProgressListenerForTest() as progress_listener:
download_dest = DownloadDestProgressWrapper(download_local_file, progress_listener)
with download_dest.make_file_context(
"file_id", "file_name", 100, "content_type", "sha1", {}, mod_time
) as f:
f.write(b'hello world\n')
with open(file_path, 'rb') as f:
self.assertEqual(b'hello world\n', f.read())
self.assertEqual(mod_time, int(os.path.getmtime(file_path) * 1000))
Expand Down
Loading

0 comments on commit 2f0f77f

Please sign in to comment.