Skip to content

Commit

Permalink
Merge pull request #165 from Backblaze/b2b-sync
Browse files Browse the repository at this point in the history
Add support for bucket to bucket sync of the latest versions of files
  • Loading branch information
mlech-reef authored Nov 3, 2020
2 parents cfd813c + a7429eb commit e30256e
Show file tree
Hide file tree
Showing 33 changed files with 1,275 additions and 1,065 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
* Add support for Python 3.9
* Support for bucket to bucket sync

### Removed
* Drop Python 2 and Python 3.4 support :tada:
Expand Down
2 changes: 1 addition & 1 deletion b2sdk/b2http.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class B2Http(object):
"""

# timeout for HTTP GET/POST requests
TIMEOUT = 130
TIMEOUT = 900 # 15 minutes as server-side copy can take time

def __init__(self, requests_module=None, install_clock_skew_hook=True):
"""
Expand Down
5 changes: 3 additions & 2 deletions b2sdk/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,10 +687,11 @@ def copy(
"""

copy_source = CopySource(file_id, offset=offset, length=length)
if length is None:
if not length:
# TODO: it feels like this should be checked on lower level - eg. RawApi
validate_b2_file_name(new_file_name)
return self.api.services.upload_manager.copy_file(
progress_listener = progress_listener or DoNothingProgressListener()
return self.api.services.copy_manager.copy_file(
copy_source,
new_file_name,
content_type=content_type,
Expand Down
81 changes: 79 additions & 2 deletions b2sdk/sync/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,15 @@ def do_action(self, bucket, reporter):
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
if reporter:
progress_listener = SyncFileReporter(reporter)
else:
progress_listener = None
bucket.upload(
UploadSourceLocalFile(self.local_full_path),
self.b2_file_name,
file_info={SRC_LAST_MODIFIED_MILLIS: str(self.mod_time_millis)},
progress_listener=SyncFileReporter(reporter)
progress_listener=progress_listener
)

def do_report(self, bucket, reporter):
Expand Down Expand Up @@ -238,10 +242,15 @@ def do_action(self, bucket, reporter):
if not os.path.isdir(parent_dir):
raise Exception('could not create directory %s' % (parent_dir,))

if reporter:
progress_listener = SyncFileReporter(reporter)
else:
progress_listener = None

# Download the file to a .tmp file
download_path = self.local_full_path + '.b2.sync.tmp'
download_dest = DownloadDestLocalFile(download_path)
bucket.download_file_by_id(self.file_id, download_dest, SyncFileReporter(reporter))
bucket.download_file_by_id(self.file_id, download_dest, progress_listener)

# Move the file into place
try:
Expand All @@ -267,6 +276,74 @@ def __str__(self):
)


class B2CopyAction(AbstractAction):
"""
File copying action.
"""

def __init__(
self, relative_name, b2_file_name, file_id, dest_b2_file_name, mod_time_millis, size
):
"""
:param str relative_name: a relative file name
:param str b2_file_name: a name of a remote file
:param str file_id: a file ID
:param str dest_b2_file_name: a name of a destination remote file
:param int mod_time_millis: file modification time in milliseconds
:param int size: a file size
"""
self.relative_name = relative_name
self.b2_file_name = b2_file_name
self.file_id = file_id
self.dest_b2_file_name = dest_b2_file_name
self.mod_time_millis = mod_time_millis
self.size = size

def get_bytes(self):
"""
Return file size.
:rtype: int
"""
return self.size

def do_action(self, bucket, reporter):
"""
Perform the copying action, returning only after the action is completed.
:param bucket: a Bucket object
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
if reporter:
progress_listener = SyncFileReporter(reporter)
else:
progress_listener = None

bucket.copy(
self.file_id,
self.dest_b2_file_name,
length=self.size,
progress_listener=progress_listener
)

def do_report(self, bucket, reporter):
"""
Report the copying action performed.
:param bucket: a Bucket object
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
reporter.print_completion('copy ' + self.relative_name)

def __str__(self):
return (
'b2_copy(%s, %s, %s, %d)' %
(self.b2_file_name, self.file_id, self.dest_b2_file_name, self.mod_time_millis)
)


class B2DeleteAction(AbstractAction):
def __init__(self, relative_name, b2_file_name, file_id, note):
"""
Expand Down
5 changes: 3 additions & 2 deletions b2sdk/sync/folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def ensure_present(self):
if not os.path.exists(self.root):
try:
os.mkdir(self.root)
except:
except OSError:
raise Exception('unable to create directory %s' % (self.root,))
elif not os.path.isdir(self.root):
raise Exception('%s is not a directory' % (self.root,))
Expand Down Expand Up @@ -232,7 +232,8 @@ def _walk_relative_paths(cls, local_dir, b2_dir, reporter, policies_manager):
continue

if policies_manager.exclude_all_symlinks and os.path.islink(local_path):
reporter.symlink_skipped(local_path)
if reporter is not None:
reporter.symlink_skipped(local_path)
continue

if os.path.isdir(local_path):
Expand Down
56 changes: 55 additions & 1 deletion b2sdk/sync/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import logging

from ..exception import DestFileNewer
from .action import LocalDeleteAction, B2DeleteAction, B2DownloadAction, B2HideAction, B2UploadAction
from .action import LocalDeleteAction, B2CopyAction, B2DeleteAction, B2DownloadAction, B2HideAction, B2UploadAction
from .exception import InvalidArgument

ONE_DAY_IN_MS = 24 * 60 * 60 * 1000
Expand Down Expand Up @@ -305,6 +305,60 @@ class DownAndKeepDaysPolicy(DownPolicy):
pass


class CopyPolicy(AbstractFileSyncPolicy):
"""
File is copied (server-side).
"""
DESTINATION_PREFIX = 'b2://'
SOURCE_PREFIX = 'b2://'

def _make_transfer_action(self):
return B2CopyAction(
self._source_file.name,
self._source_folder.make_full_path(self._source_file.name),
self._source_file.latest_version().id_,
self._dest_folder.make_full_path(self._source_file.name),
self._get_source_mod_time(),
self._source_file.latest_version().size,
)


class CopyAndDeletePolicy(CopyPolicy):
"""
File is copied (server-side) and the delete flag is SET.
"""

def _get_hide_delete_actions(self):
for action in super()._get_hide_delete_actions():
yield action
for action in make_b2_delete_actions(
self._source_file,
self._dest_file,
self._dest_folder,
self._transferred,
):
yield action


class CopyAndKeepDaysPolicy(CopyPolicy):
"""
File is copied (server-side) and the keepDays flag is SET.
"""

def _get_hide_delete_actions(self):
for action in super()._get_hide_delete_actions():
yield action
for action in make_b2_keep_days_actions(
self._source_file,
self._dest_file,
self._dest_folder,
self._transferred,
self._keep_days,
self._now_millis,
):
yield action


def make_b2_delete_note(version, index, transferred):
"""
Create a note message for delete action.
Expand Down
22 changes: 16 additions & 6 deletions b2sdk/sync/policy_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
#
######################################################################

from .policy import DownAndDeletePolicy, DownAndKeepDaysPolicy, DownPolicy
from .policy import UpAndDeletePolicy, UpAndKeepDaysPolicy, UpPolicy
from .policy import CopyAndDeletePolicy, CopyAndKeepDaysPolicy, CopyPolicy, \
DownAndDeletePolicy, DownAndKeepDaysPolicy, DownPolicy, UpAndDeletePolicy, \
UpAndKeepDaysPolicy, UpPolicy


class SyncPolicyManager(object):
Expand Down Expand Up @@ -87,10 +88,19 @@ def get_policy_class(self, sync_type, delete, keep_days):
return DownAndKeepDaysPolicy
else:
return DownPolicy
assert False, 'invalid sync type: %s, keep_days: %s, delete: %s' % (
sync_type,
keep_days,
delete,
elif sync_type == 'b2-to-b2':
if delete:
return CopyAndDeletePolicy
elif keep_days:
return CopyAndKeepDaysPolicy
else:
return CopyPolicy
raise NotImplementedError(
'invalid sync type: %s, keep_days: %s, delete: %s' % (
sync_type,
keep_days,
delete,
)
)


Expand Down
54 changes: 39 additions & 15 deletions b2sdk/sync/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
logger = logging.getLogger(__name__)


class SyncReport(object):
class SyncReport:
"""
Handle reporting progress for syncing.
Expand All @@ -45,8 +45,8 @@ def __init__(self, stdout, no_progress):
self.stdout = stdout
self.no_progress = no_progress
self.start_time = time.time()
self.local_file_count = 0
self.local_done = False
self.total_count = 0
self.total_done = False
self.compare_done = False
self.compare_count = 0
self.total_transfer_files = 0 # set in end_compare()
Expand Down Expand Up @@ -109,9 +109,9 @@ def _update_progress(self):
self._last_update_time = now
time_delta = time.time() - self.start_time
rate = 0 if time_delta == 0 else int(self.transfer_bytes / time_delta)
if not self.local_done:
if not self.total_done:
message = ' count: %d files compare: %d files updated: %d files %s %s' % (
self.local_file_count,
self.total_count,
self.compare_count,
self.transfer_files,
format_and_scale_number(self.transfer_bytes, 'B'),
Expand All @@ -120,15 +120,15 @@ def _update_progress(self):
elif not self.compare_done:
message = ' compare: %d/%d files updated: %d files %s %s' % (
self.compare_count,
self.local_file_count,
self.total_count,
self.transfer_files,
format_and_scale_number(self.transfer_bytes, 'B'),
format_and_scale_number(rate, 'B/s')
) # yapf: disable
else:
message = ' compare: %d/%d files updated: %d/%d files %s %s' % (
self.compare_count,
self.local_file_count,
self.total_count,
self.transfer_files,
self.total_transfer_files,
format_and_scale_fraction(self.transfer_bytes, self.total_transfer_bytes, 'B'),
Expand Down Expand Up @@ -169,23 +169,23 @@ def _print_line(self, line, newline):
self.current_line = line
self.stdout.flush()

def update_local(self, delta):
def update_total(self, delta):
"""
Report that more local files have been found.
Report that more files have been found for comparison.
:param delta: number of files found since the last check
:type delta: int
"""
with self.lock:
self.local_file_count += delta
self.total_count += delta
self._update_progress()

def end_local(self):
def end_total(self):
"""
Local file count is done. Can proceed to step 2.
Total files count is done. Can proceed to step 2.
"""
with self.lock:
self.local_done = True
self.total_done = True
self._update_progress()

def update_compare(self, delta):
Expand Down Expand Up @@ -251,6 +251,30 @@ def local_permission_error(self, path):
def symlink_skipped(self, path):
pass

@property
def local_file_count(self):
# TODO: Deprecated. Should be removed in v2
return self.total_count

@local_file_count.setter
def local_file_count(self, value):
# TODO: Deprecated. Should be removed in v2
self.total_count = value

@property
def local_done(self):
# TODO: Deprecated. Should be removed in v2
return self.total_done

@local_done.setter
def local_done(self, value):
# TODO: Deprecated. Should be removed in v2
self.total_done = value

# TODO: Deprecated. Should be removed in v2
update_local = update_total
end_local = end_total


class SyncFileReporter(AbstractProgressListener):
"""
Expand Down Expand Up @@ -300,13 +324,13 @@ def sample_sync_report_run():
sync_report = SyncReport(sys.stdout, False)

for i in range(20):
sync_report.update_local(1)
sync_report.update_total(1)
time.sleep(0.2)
if i == 10:
sync_report.print_completion('transferred: a.txt')
if i % 2 == 0:
sync_report.update_compare(1)
sync_report.end_local()
sync_report.end_total()

for i in range(10):
sync_report.update_compare(1)
Expand Down
Loading

0 comments on commit e30256e

Please sign in to comment.