diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cc21f907a..10cc36130 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,7 +28,7 @@ jobs: - name: Run linters run: nox -vs lint - name: Validate new changelog entries - if: contains(github.event.pull_request.labels.*.name, '-changelog') == false + if: (contains(github.event.pull_request.labels.*.name, '-changelog') == false) && (github.event.pull_request.base.ref != '') run: if [ -z "$(git diff --diff-filter=A --name-only origin/${{ github.event.pull_request.base.ref }} changelog.d)" ]; then echo no changelog item added; exit 1; fi diff --git a/b2/console_tool.py b/b2/console_tool.py index 05bc97963..f802ca29a 100644 --- a/b2/console_tool.py +++ b/b2/console_tool.py @@ -13,6 +13,7 @@ import argparse import base64 +import contextlib import csv import dataclasses import datetime @@ -84,6 +85,7 @@ ScanPoliciesManager, Synchronizer, SyncReport, + TqdmProgressListener, UploadMode, current_time_millis, get_included_sources, @@ -683,7 +685,31 @@ def _set_threads_from_args(self, args): self.api.services.download_manager.set_thread_pool_size(threads) -class Command(Described): +class _TqdmCloser: + """ + On OSX using Tqdm with b2sdk causes semaphore leaks. This fix is located here and not in b2sdk, because after this + cleanup Tqdm might not work properly, therefore it's best to do it when exiting a python process. + """ + + def __init__(self, progress_listener): + self.progress_listener = progress_listener + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if sys.platform != "darwin" or os.environ.get('B2_TEST_DISABLE_TQDM_CLOSER'): + return + try: + from multiprocessing.synchronize import SemLock + tqdm_lock = self.progress_listener.tqdm.get_lock() + if tqdm_lock.mp_lock._semlock.name is not None: + SemLock._cleanup(tqdm_lock.mp_lock._semlock.name) + except Exception as ex: + logger.debug('Error encountered during Tqdm cleanup', exc_info=ex) + + +class Command(Described, metaclass=ABCMeta): # Set to True for commands that receive sensitive information in arguments FORBID_LOGGING_ARGUMENTS = False @@ -701,6 +727,14 @@ def __init__(self, console_tool): self.stdout = console_tool.stdout self.stderr = console_tool.stderr self.quiet = False + self.exit_stack = contextlib.ExitStack() + + def make_progress_listener(self, file_name: str, quiet: bool): + progress_listener = make_progress_listener(file_name, quiet) + self.exit_stack.enter_context(progress_listener) + if isinstance(progress_listener, TqdmProgressListener): + self.exit_stack.enter_context(_TqdmCloser(progress_listener)) + return progress_listener @classmethod def name_and_alias(cls): @@ -792,7 +826,12 @@ def create_parser( def run(self, args): self.quiet = args.quiet - return 0 + with self.exit_stack: + return self._run(args) + + @abstractmethod + def _run(self, args) -> int: + ... @classmethod def _setup_parser(cls, parser): @@ -947,8 +986,7 @@ class B2(Command): def name_and_alias(cls): return NAME, None - def run(self, args): - super().run(args) + def _run(self, args): # Commands could be named via name or alias, so we fetch # the command from args assigned during parser preparation. return args.command_class @@ -1003,8 +1041,7 @@ def _setup_parser(cls, parser): parser.add_argument('applicationKey', nargs='?') super()._setup_parser(parser) - def run(self, args): - super().run(args) + def _run(self, args): # Handle internal options for testing inside Backblaze. # These are not documented in the usage string. realm = self._get_realm(args) @@ -1091,8 +1128,7 @@ def _setup_parser(cls, parser): parser.add_argument('bucketName').completer = bucket_name_completer super()._setup_parser(parser) - def run(self, args): - super().run(args) + def _run(self, args): bucket = self.api.get_bucket_by_name(args.bucketName) for file_version in bucket.list_unfinished_large_files(): bucket.cancel_large_file(file_version.file_id) @@ -1118,8 +1154,7 @@ def _setup_parser(cls, parser): parser.add_argument('fileId') super()._setup_parser(parser) - def run(self, args): - super().run(args) + def _run(self, args): self.api.cancel_large_file(args.fileId) self._print(args.fileId, 'canceled') return 0 @@ -1141,8 +1176,7 @@ class ClearAccount(Command): REQUIRES_AUTH = False - def run(self, args): - super().run(args) + def _run(self, args): self.api.account_info.clear() return 0 @@ -1205,8 +1239,7 @@ def _setup_parser(cls, parser): super()._setup_parser(parser) # add parameters from the mixins - def run(self, args): - super().run(args) + def _run(self, args): file_infos = None if args.info: file_infos = self._parse_file_infos(args.info) @@ -1326,8 +1359,7 @@ def _setup_parser(cls, parser): super()._setup_parser(parser) # add parameters from the mixins - def run(self, args): - super().run(args) + def _run(self, args): encryption_setting = self._get_default_sse_setting(args) bucket = self.api.create_bucket( args.bucketName, @@ -1382,8 +1414,7 @@ def _setup_parser(cls, parser): capabilities.add_argument('--allCapabilities', action='store_true') super()._setup_parser(parser) - def run(self, args): - super().run(args) + def _run(self, args): # Translate the bucket name into a bucketId if args.bucket is None: bucket_id_or_none = None @@ -1420,8 +1451,7 @@ def _setup_parser(cls, parser): parser.add_argument('bucketName').completer = bucket_name_completer super()._setup_parser(parser) - def run(self, args): - super().run(args) + def _run(self, args): bucket = self.api.get_bucket_by_name(args.bucketName) self.api.delete_bucket(bucket) return 0 @@ -1452,8 +1482,7 @@ def _setup_parser(cls, parser): super()._setup_parser(parser) parser.add_argument('--bypassGovernance', action='store_true', default=False) - def run(self, args): - super().run(args) + def _run(self, args): file_name = self._get_file_name_from_args(args) file_info = self.api.delete_file_version(args.fileId, file_name, args.bypassGovernance) self._print_json(file_info) @@ -1475,15 +1504,19 @@ def _setup_parser(cls, parser): parser.add_argument('applicationKeyId') super()._setup_parser(parser) - def run(self, args): - super().run(args) + def _run(self, args): application_key = self.api.delete_key_by_id(application_key_id=args.applicationKeyId) self._print(application_key.id_) return 0 class DownloadCommand( - ProgressMixin, SourceSseMixin, WriteBufferSizeMixin, SkipHashVerificationMixin, Command + ProgressMixin, + SourceSseMixin, + WriteBufferSizeMixin, + SkipHashVerificationMixin, + Command, + metaclass=ABCMeta ): """ helper methods for returning results from download commands """ @@ -1589,9 +1622,8 @@ class DownloadFileBase( - **readFiles** """ - def run(self, args): - super().run(args) - progress_listener = make_progress_listener( + def _run(self, args): + progress_listener = self.make_progress_listener( args.localFileName, args.noProgress or args.quiet ) encryption_setting = self._get_source_sse_setting(args) @@ -1660,10 +1692,11 @@ class Cat(B2URIFileArgMixin, DownloadCommand): - **readFiles** """ - def run(self, args): - super().run(args) + def _run(self, args): target_filename = '-' - progress_listener = make_progress_listener(target_filename, args.noProgress or args.quiet) + progress_listener = self.make_progress_listener( + target_filename, args.noProgress or args.quiet + ) encryption_setting = self._get_source_sse_setting(args) file_request = self.api.download_file_by_uri( args.B2_URI, progress_listener=progress_listener, encryption=encryption_setting @@ -1680,8 +1713,7 @@ class GetAccountInfo(Command): the current application keys has. """ - def run(self, args): - super().run(args) + def _run(self, args): account_info = self.api.account_info data = dict( accountId=account_info.get_account_id(), @@ -1735,8 +1767,7 @@ def _setup_parser(cls, parser): parser.add_argument('bucketName').completer = bucket_name_completer super()._setup_parser(parser) - def run(self, args): - super().run(args) + def _run(self, args): # This always wants up-to-date info, so it does not use # the bucket cache. for b in self.api.list_buckets(args.bucketName): @@ -1774,8 +1805,7 @@ class FileInfoBase(Command): - **readFiles** """ - def run(self, args): - super().run(args) + def _run(self, args): b2_uri = self.get_b2_uri_from_arg(args) file_version = self.api.get_file_info_by_uri(b2_uri) self._print_json(file_version) @@ -1818,8 +1848,7 @@ def _setup_parser(cls, parser): parser.add_argument('bucketName').completer = bucket_name_completer super()._setup_parser(parser) - def run(self, args): - super().run(args) + def _run(self, args): bucket = self.api.get_bucket_by_name(args.bucketName) auth_token = bucket.get_download_authorization( file_name_prefix=args.prefix, valid_duration_in_seconds=args.duration @@ -1854,8 +1883,7 @@ def _setup_parser(cls, parser): parser.add_argument('fileName').completer = file_name_completer super()._setup_parser(parser) - def run(self, args): - super().run(args) + def _run(self, args): bucket = self.api.get_bucket_by_name(args.bucketName) auth_token = bucket.get_download_authorization( file_name_prefix=args.fileName, valid_duration_in_seconds=args.duration @@ -1882,8 +1910,7 @@ def _setup_parser(cls, parser): parser.add_argument('fileName').completer = file_name_completer super()._setup_parser(parser) - def run(self, args): - super().run(args) + def _run(self, args): bucket = self.api.get_bucket_by_name(args.bucketName) file_info = bucket.hide_file(args.fileName) self._print_json(file_info) @@ -1915,8 +1942,7 @@ def _setup_parser(cls, parser): parser.add_argument('--json', action='store_true') super()._setup_parser(parser) - def run(self, args): - super().run(args) + def _run(self, args): buckets = self.api.list_buckets() if args.json: self._print_json(list(buckets)) @@ -1962,8 +1988,7 @@ def __init__(self, console_tool): super().__init__(console_tool) self.bucket_id_to_bucket_name = None - def run(self, args): - super().run(args) + def _run(self, args): for key in self.api.list_keys(): self.print_key(key, args.long) @@ -2027,8 +2052,7 @@ def _setup_parser(cls, parser): parser.add_argument('largeFileId') super()._setup_parser(parser) - def run(self, args): - super().run(args) + def _run(self, args): for part in self.api.list_parts(args.largeFileId): self._print('%5d %9d %s' % (part.part_number, part.content_length, part.content_sha1)) return 0 @@ -2050,8 +2074,7 @@ def _setup_parser(cls, parser): parser.add_argument('bucketName').completer = bucket_name_completer super()._setup_parser(parser) - def run(self, args): - super().run(args) + def _run(self, args): bucket = self.api.get_bucket_by_name(args.bucketName) for unfinished in bucket.list_unfinished_large_files(): file_info_text = ' '.join( @@ -2184,8 +2207,7 @@ def _setup_parser(cls, parser): parser.add_argument('--replication', action='store_true') super()._setup_parser(parser) - def run(self, args): - super().run(args) + def _run(self, args): if args.json: i = -1 for i, (file_version, _) in enumerate(self._get_ls_generator(args)): @@ -2412,8 +2434,7 @@ def _setup_parser(cls, parser): parser.add_argument('--failFast', action='store_true') super()._setup_parser(parser) - def run(self, args): - super().run(args) + def _run(self, args): if args.dryRun: self._print_files(args) return 0 @@ -2454,8 +2475,7 @@ class GetUrlBase(Command): it is public. """ - def run(self, args): - super().run(args) + def _run(self, args): b2_uri = self.get_b2_uri_from_arg(args) self._print(self.api.get_download_url_by_uri(b2_uri)) return 0 @@ -2689,8 +2709,7 @@ def _setup_parser(cls, parser): del_keep_group.add_argument('--delete', action='store_true') del_keep_group.add_argument('--keepDays', type=float, metavar='DAYS') - def run(self, args): - super().run(args) + def _run(self, args): policies_manager = self.get_policies_manager_from_args(args) if args.threads is not None: @@ -2903,8 +2922,7 @@ def _setup_parser(cls, parser): super()._setup_parser(parser) # add parameters from the mixins and the parent class - def run(self, args): - super().run(args) + def _run(self, args): if args.defaultRetentionMode is not None: if args.defaultRetentionMode == 'none': default_retention = NO_RETENTION_BUCKET_SETTING @@ -3003,8 +3021,7 @@ def _setup_parser(cls, parser): super()._setup_parser(parser) # add parameters from the mixins - def run(self, args): - super().run(args) + def _run(self, args): self._set_threads_from_args(args) upload_kwargs = self.get_execute_kwargs(args) file_info = self.execute_operation(**upload_kwargs) @@ -3053,7 +3070,7 @@ def get_execute_kwargs(self, args) -> dict: "min_part_size": args.minPartSize, "progress_listener": - make_progress_listener(args.localFilePath, args.noProgress or args.quiet), + self.make_progress_listener(args.localFilePath, args.noProgress or args.quiet), "sha1_sum": args.sha1, "threads": @@ -3261,8 +3278,7 @@ def _setup_parser(cls, parser): super()._setup_parser(parser) parser.add_argument('legalHold', choices=(LegalHold.ON.value, LegalHold.OFF.value)) - def run(self, args): - super().run(args) + def _run(self, args): file_name = self._get_file_name_from_args(args) legal_hold = LegalHold(args.legalHold) self.api.update_file_legal_hold(args.fileId, file_name, legal_hold) @@ -3312,8 +3328,7 @@ def _setup_parser(cls, parser): ) parser.add_argument('--bypassGovernance', action='store_true', default=False) - def run(self, args): - super().run(args) + def _run(self, args): file_name = self._get_file_name_from_args(args) if args.retentionMode == 'none': @@ -3371,8 +3386,7 @@ def _setup_parser(cls, parser): help='if given, also replicates files uploaded prior to creation of the replication rule' ) - def run(self, args): - super().run(args) + def _run(self, args): if args.destination_profile is None: destination_api = self.api else: @@ -3398,8 +3412,7 @@ def _setup_parser(cls, parser): parser.add_argument('source', metavar='SOURCE_BUCKET_NAME') parser.add_argument('rule_name', metavar='REPLICATION_RULE_NAME') - def run(self, args): - super().run(args) + def _run(self, args): bucket = self.api.get_bucket_by_name(args.source).get_fresh_state() found, altered = self.alter_rule_by_name(bucket, args.rule_name) if not found: @@ -3545,8 +3558,7 @@ def _setup_parser(cls, parser): metavar='COLUMN ONE,COLUMN TWO' ) - def run(self, args): - super().run(args) + def _run(self, args): destination_api = args.destination_profile and _get_b2api_for_profile( args.destination_profile ) @@ -3687,8 +3699,7 @@ def _setup_parser(cls, parser): parser.add_argument('--short', action='store_true') super()._setup_parser(parser) - def run(self, args): - super().run(args) + def _run(self, args): if args.short: self._print(VERSION) else: @@ -3751,8 +3762,7 @@ def _setup_parser(cls, parser): ) super()._setup_parser(parser) - def run(self, args): - super().run(args) + def _run(self, args): if self.LICENSE_OUTPUT_FILE.exists() and not args.dump: self._print(self.LICENSE_OUTPUT_FILE.read_text(encoding='utf8')) return 0 @@ -3906,8 +3916,7 @@ def _setup_parser(cls, parser): parser.add_argument('--shell', choices=SUPPORTED_SHELLS, default=None) super()._setup_parser(parser) - def run(self, args): - super().run(args) + def _run(self, args): shell = args.shell or detect_shell() if shell not in SUPPORTED_SHELLS: self._print_stderr( diff --git a/changelog.d/+fix_leaking_semaphores.fix.md b/changelog.d/+fix_leaking_semaphores.fix.md new file mode 100644 index 000000000..57b1f39f5 --- /dev/null +++ b/changelog.d/+fix_leaking_semaphores.fix.md @@ -0,0 +1 @@ +fix an error that caused multiprocessing semaphores to leak on OSX \ No newline at end of file diff --git a/test/integration/helpers.py b/test/integration/helpers.py index 264f15045..a8777072e 100755 --- a/test/integration/helpers.py +++ b/test/integration/helpers.py @@ -30,7 +30,6 @@ from os import environ, linesep, path from pathlib import Path from tempfile import gettempdir, mkdtemp, mktemp -from unittest.mock import MagicMock import backoff from b2sdk.v2 import ( @@ -286,10 +285,8 @@ def print_text_indented(text): """ Prints text that may include weird characters, indented four spaces. """ - mock_console_tool = MagicMock() - cmd_instance = Command(console_tool=mock_console_tool) for line in text.split(linesep): - cmd_instance._print_standard_descriptor(sys.stdout, ' ', repr(line)[1:-1]) + Command._print_helper(sys.stdout, sys.stdout.encoding, ' ', repr(line)[1:-1]) def print_output(status, stdout, stderr): @@ -426,6 +423,7 @@ def should_succeed( args: list[str] | None, expected_pattern: str | None = None, additional_env: dict | None = None, + expected_stderr_pattern: str | re.Pattern = None, ) -> str: """ Runs the command-line with the given arguments. Raises an exception @@ -435,7 +433,10 @@ def should_succeed( status, stdout, stderr = self.execute(args, additional_env) assert status == 0, f'FAILED with status {status}, stderr={stderr}' - if stderr != '': + if expected_stderr_pattern: + assert expected_stderr_pattern.search(stderr), \ + f'stderr did not match pattern="{expected_stderr_pattern}", stderr="{stderr}"' + elif stderr != '': for line in (s.strip() for s in stderr.split(os.linesep)): assert any(p.match(line) for p in self.EXPECTED_STDERR_PATTERNS), \ f'Unexpected stderr line: {repr(line)}' diff --git a/test/integration/test_tqdm_closer.py b/test/integration/test_tqdm_closer.py new file mode 100644 index 000000000..a97a43102 --- /dev/null +++ b/test/integration/test_tqdm_closer.py @@ -0,0 +1,40 @@ +###################################################################### +# +# File: test/integration/test_tqdm_closer.py +# +# Copyright 2023 Backblaze Inc. All Rights Reserved. +# +# License https://www.backblaze.com/using_b2_code.html +# +###################################################################### +import re +import sys + +import pytest + + +@pytest.mark.skipif( + (sys.platform != 'darwin') or ((sys.version_info.major, sys.version_info.minor) < (3, 11)), + reason='Tqdm closing error only occurs on OSX and python 3.11 or newer', +) +def test_tqdm_closer(b2_tool, bucket, file_name): + # test that stderr doesn't contain any warning, in particular warnings about multiprocessing resource tracker + # leaking semaphores + b2_tool.should_succeed([ + 'cat', + f'b2://{bucket.name}/{file_name}', + ]) + + # test that disabling _TqdmCloser does produce a resource tracker warning. Should the following check ever fail, + # that would mean that either Tqdm or python fixed the issue and _TqdmCloser can be disabled for fixed versions + b2_tool.should_succeed( + [ + 'cat', + f'b2://{bucket.name}/{file_name}', + ], + additional_env={'B2_TEST_DISABLE_TQDM_CLOSER': '1'}, + expected_stderr_pattern=re.compile( + r'UserWarning: resource_tracker: There appear to be \d+ leaked semaphore' + r' objects to clean up at shutdown' + ), + ) diff --git a/test/unit/conftest.py b/test/unit/conftest.py index c21344b75..edcba882a 100644 --- a/test/unit/conftest.py +++ b/test/unit/conftest.py @@ -13,6 +13,8 @@ import pytest from b2sdk.raw_api import REALM_URLS +from b2.console_tool import _TqdmCloser + @pytest.fixture(autouse=True, scope='session') def mock_realm_urls(): @@ -25,3 +27,9 @@ def bg_executor(): """Executor for running background tasks in tests""" with RunOrDieExecutor() as executor: yield executor + + +@pytest.fixture(autouse=True) +def disable_tqdm_closer_cleanup(): + with mock.patch.object(_TqdmCloser, '__exit__'): + yield