Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bucket to bucket sync #103

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
* Add support for Python 3.9
* Support for bucket to bucket sync

### Removed
* Drop Python 2 and Python 3.4 support :tada:
Expand Down
2 changes: 1 addition & 1 deletion b2sdk/b2http.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ class B2Http(object):
"""

# timeout for HTTP GET/POST requests
TIMEOUT = 130
TIMEOUT = 900 # 15 minutes as server-side copy can take time
mlech-reef marked this conversation as resolved.
Show resolved Hide resolved

def __init__(self, requests_module=None, install_clock_skew_hook=True):
"""
Expand Down
5 changes: 3 additions & 2 deletions b2sdk/bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,10 +687,11 @@ def copy(
"""

copy_source = CopySource(file_id, offset=offset, length=length)
if length is None:
if not length:
# TODO: it feels like this should be checked on lower level - eg. RawApi
validate_b2_file_name(new_file_name)
return self.api.services.upload_manager.copy_file(
progress_listener = progress_listener or DoNothingProgressListener()
return self.api.services.copy_manager.copy_file(
copy_source,
new_file_name,
content_type=content_type,
Expand Down
81 changes: 79 additions & 2 deletions b2sdk/sync/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,15 @@ def do_action(self, bucket, reporter):
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
if reporter:
progress_listener = SyncFileReporter(reporter)
else:
progress_listener = None
bucket.upload(
UploadSourceLocalFile(self.local_full_path),
self.b2_file_name,
file_info={SRC_LAST_MODIFIED_MILLIS: str(self.mod_time_millis)},
progress_listener=SyncFileReporter(reporter)
progress_listener=progress_listener
)

def do_report(self, bucket, reporter):
Expand Down Expand Up @@ -238,10 +242,15 @@ def do_action(self, bucket, reporter):
if not os.path.isdir(parent_dir):
raise Exception('could not create directory %s' % (parent_dir,))

if reporter:
progress_listener = SyncFileReporter(reporter)
else:
progress_listener = None

# Download the file to a .tmp file
download_path = self.local_full_path + '.b2.sync.tmp'
download_dest = DownloadDestLocalFile(download_path)
bucket.download_file_by_id(self.file_id, download_dest, SyncFileReporter(reporter))
bucket.download_file_by_id(self.file_id, download_dest, progress_listener)

# Move the file into place
try:
Expand All @@ -267,6 +276,74 @@ def __str__(self):
)


class B2CopyAction(AbstractAction):
"""
File copying action.
"""

def __init__(
self, relative_name, b2_file_name, file_id, dest_b2_file_name, mod_time_millis, size
):
"""
:param str relative_name: a relative file name
:param str b2_file_name: a name of a remote file
:param str file_id: a file ID
:param str dest_b2_file_name: a name of a destination remote file
:param int mod_time_millis: file modification time in milliseconds
:param int size: a file size
"""
self.relative_name = relative_name
self.b2_file_name = b2_file_name
self.file_id = file_id
self.dest_b2_file_name = dest_b2_file_name
self.mod_time_millis = mod_time_millis
self.size = size

def get_bytes(self):
"""
Return file size.

:rtype: int
"""
return self.size

def do_action(self, bucket, reporter):
"""
Perform the copying action, returning only after the action is completed.

:param bucket: a Bucket object
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
if reporter:
progress_listener = SyncFileReporter(reporter)
else:
progress_listener = None

bucket.copy(
self.file_id,
self.dest_b2_file_name,
length=self.size,
progress_listener=progress_listener
)

def do_report(self, bucket, reporter):
"""
Report the copying action performed.

:param bucket: a Bucket object
:type bucket: b2sdk.bucket.Bucket
:param reporter: a place to report errors
"""
reporter.print_completion('copy ' + self.relative_name)

def __str__(self):
return (
'b2_copy(%s, %s, %s, %d)' %
(self.b2_file_name, self.file_id, self.dest_b2_file_name, self.mod_time_millis)
)


class B2DeleteAction(AbstractAction):
def __init__(self, relative_name, b2_file_name, file_id, note):
"""
Expand Down
5 changes: 3 additions & 2 deletions b2sdk/sync/folder.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def ensure_present(self):
if not os.path.exists(self.root):
try:
os.mkdir(self.root)
except:
except OSError:
mlech-reef marked this conversation as resolved.
Show resolved Hide resolved
raise Exception('unable to create directory %s' % (self.root,))
elif not os.path.isdir(self.root):
raise Exception('%s is not a directory' % (self.root,))
Expand Down Expand Up @@ -232,7 +232,8 @@ def _walk_relative_paths(cls, local_dir, b2_dir, reporter, policies_manager):
continue

if policies_manager.exclude_all_symlinks and os.path.islink(local_path):
reporter.symlink_skipped(local_path)
if reporter is not None:
reporter.symlink_skipped(local_path)
continue

if os.path.isdir(local_path):
Expand Down
56 changes: 55 additions & 1 deletion b2sdk/sync/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import logging

from ..exception import DestFileNewer
from .action import LocalDeleteAction, B2DeleteAction, B2DownloadAction, B2HideAction, B2UploadAction
from .action import LocalDeleteAction, B2CopyAction, B2DeleteAction, B2DownloadAction, B2HideAction, B2UploadAction
from .exception import InvalidArgument

ONE_DAY_IN_MS = 24 * 60 * 60 * 1000
Expand Down Expand Up @@ -305,6 +305,60 @@ class DownAndKeepDaysPolicy(DownPolicy):
pass


class CopyPolicy(AbstractFileSyncPolicy):
"""
File is copied (server-side).
"""
DESTINATION_PREFIX = 'b2://'
SOURCE_PREFIX = 'b2://'

def _make_transfer_action(self):
return B2CopyAction(
self._source_file.name,
self._source_folder.make_full_path(self._source_file.name),
self._source_file.latest_version().id_,
self._dest_folder.make_full_path(self._source_file.name),
self._get_source_mod_time(),
self._source_file.latest_version().size,
)


class CopyAndDeletePolicy(CopyPolicy):
"""
File is copied (server-side) and the delete flag is SET.
"""

def _get_hide_delete_actions(self):
for action in super()._get_hide_delete_actions():
yield action
for action in make_b2_delete_actions(
self._source_file,
self._dest_file,
self._dest_folder,
self._transferred,
):
yield action


class CopyAndKeepDaysPolicy(CopyPolicy):
"""
File is copied (server-side) and the keepDays flag is SET.
"""

def _get_hide_delete_actions(self):
for action in super()._get_hide_delete_actions():
yield action
for action in make_b2_keep_days_actions(
self._source_file,
self._dest_file,
self._dest_folder,
self._transferred,
self._keep_days,
self._now_millis,
):
yield action


def make_b2_delete_note(version, index, transferred):
"""
Create a note message for delete action.
Expand Down
22 changes: 16 additions & 6 deletions b2sdk/sync/policy_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
#
######################################################################

from .policy import DownAndDeletePolicy, DownAndKeepDaysPolicy, DownPolicy
from .policy import UpAndDeletePolicy, UpAndKeepDaysPolicy, UpPolicy
from .policy import CopyAndDeletePolicy, CopyAndKeepDaysPolicy, CopyPolicy, \
DownAndDeletePolicy, DownAndKeepDaysPolicy, DownPolicy, UpAndDeletePolicy, \
UpAndKeepDaysPolicy, UpPolicy


class SyncPolicyManager(object):
Expand Down Expand Up @@ -87,10 +88,19 @@ def get_policy_class(self, sync_type, delete, keep_days):
return DownAndKeepDaysPolicy
else:
return DownPolicy
assert False, 'invalid sync type: %s, keep_days: %s, delete: %s' % (
sync_type,
keep_days,
delete,
elif sync_type == 'b2-to-b2':
if delete:
return CopyAndDeletePolicy
elif keep_days:
return CopyAndKeepDaysPolicy
else:
return CopyPolicy
raise NotImplementedError(
'invalid sync type: %s, keep_days: %s, delete: %s' % (
sync_type,
keep_days,
delete,
)
)


Expand Down
54 changes: 39 additions & 15 deletions b2sdk/sync/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
logger = logging.getLogger(__name__)


class SyncReport(object):
class SyncReport:
"""
Handle reporting progress for syncing.

Expand All @@ -45,8 +45,8 @@ def __init__(self, stdout, no_progress):
self.stdout = stdout
self.no_progress = no_progress
self.start_time = time.time()
self.local_file_count = 0
self.local_done = False
self.total_count = 0
ppolewicz marked this conversation as resolved.
Show resolved Hide resolved
self.total_done = False
self.compare_done = False
self.compare_count = 0
self.total_transfer_files = 0 # set in end_compare()
Expand Down Expand Up @@ -109,9 +109,9 @@ def _update_progress(self):
self._last_update_time = now
time_delta = time.time() - self.start_time
rate = 0 if time_delta == 0 else int(self.transfer_bytes / time_delta)
if not self.local_done:
if not self.total_done:
message = ' count: %d files compare: %d files updated: %d files %s %s' % (
self.local_file_count,
self.total_count,
self.compare_count,
self.transfer_files,
format_and_scale_number(self.transfer_bytes, 'B'),
Expand All @@ -120,15 +120,15 @@ def _update_progress(self):
elif not self.compare_done:
message = ' compare: %d/%d files updated: %d files %s %s' % (
self.compare_count,
self.local_file_count,
self.total_count,
self.transfer_files,
format_and_scale_number(self.transfer_bytes, 'B'),
format_and_scale_number(rate, 'B/s')
) # yapf: disable
else:
message = ' compare: %d/%d files updated: %d/%d files %s %s' % (
self.compare_count,
self.local_file_count,
self.total_count,
self.transfer_files,
self.total_transfer_files,
format_and_scale_fraction(self.transfer_bytes, self.total_transfer_bytes, 'B'),
Expand Down Expand Up @@ -169,23 +169,23 @@ def _print_line(self, line, newline):
self.current_line = line
self.stdout.flush()

def update_local(self, delta):
def update_total(self, delta):
"""
Report that more local files have been found.
Report that more files have been found for comparison.

:param delta: number of files found since the last check
:type delta: int
"""
with self.lock:
self.local_file_count += delta
self.total_count += delta
self._update_progress()

def end_local(self):
def end_total(self):
"""
Local file count is done. Can proceed to step 2.
Total files count is done. Can proceed to step 2.
"""
with self.lock:
self.local_done = True
self.total_done = True
self._update_progress()

def update_compare(self, delta):
Expand Down Expand Up @@ -251,6 +251,30 @@ def local_permission_error(self, path):
def symlink_skipped(self, path):
pass

@property
def local_file_count(self):
# TODO: Deprecated. Should be removed in v2
mlech-reef marked this conversation as resolved.
Show resolved Hide resolved
return self.total_count

@local_file_count.setter
def local_file_count(self, value):
# TODO: Deprecated. Should be removed in v2
self.total_count = value

@property
def local_done(self):
# TODO: Deprecated. Should be removed in v2
return self.total_done

@local_done.setter
def local_done(self, value):
# TODO: Deprecated. Should be removed in v2
self.total_done = value

# TODO: Deprecated. Should be removed in v2
update_local = update_total
end_local = end_total


class SyncFileReporter(AbstractProgressListener):
"""
Expand Down Expand Up @@ -300,13 +324,13 @@ def sample_sync_report_run():
sync_report = SyncReport(sys.stdout, False)

for i in range(20):
sync_report.update_local(1)
sync_report.update_total(1)
time.sleep(0.2)
if i == 10:
sync_report.print_completion('transferred: a.txt')
if i % 2 == 0:
sync_report.update_compare(1)
sync_report.end_local()
sync_report.end_total()

for i in range(10):
sync_report.update_compare(1)
Expand Down
Loading