diff --git a/b2/console_tool.py b/b2/console_tool.py index 7ed2ecf26..778ef045e 100644 --- a/b2/console_tool.py +++ b/b2/console_tool.py @@ -38,7 +38,7 @@ from concurrent.futures import Executor, Future, ThreadPoolExecutor from contextlib import suppress from enum import Enum -from typing import Any, BinaryIO, Dict, List, Optional, Tuple +from typing import Any, BinaryIO, List import argcomplete import b2sdk @@ -760,12 +760,12 @@ def _print_json(self, data) -> None: json.dumps(data, indent=4, sort_keys=True, cls=B2CliJsonEncoder), enforce_output=True ) - def _print(self, *args, enforce_output: bool = False, end: Optional[str] = None) -> None: + def _print(self, *args, enforce_output: bool = False, end: str | None = None) -> None: return self._print_standard_descriptor( self.stdout, "stdout", *args, enforce_output=enforce_output, end=end ) - def _print_stderr(self, *args, end: Optional[str] = None) -> None: + def _print_stderr(self, *args, end: str | None = None) -> None: return self._print_standard_descriptor( self.stderr, "stderr", *args, enforce_output=True, end=end ) @@ -776,7 +776,7 @@ def _print_standard_descriptor( descriptor_name: str, *args, enforce_output: bool = False, - end: Optional[str] = None, + end: str | None = None, ) -> None: """ Prints to fd, unless quiet is set. @@ -797,7 +797,7 @@ def _print_helper( descriptor_encoding: str, descriptor_name: str, *args, - end: Optional[str] = None + end: str | None = None ): try: descriptor.write(' '.join(args)) @@ -1181,7 +1181,7 @@ def run(self, args): self._print_json(file_version) return 0 - def _is_ssec(self, encryption: Optional[EncryptionSetting]): + def _is_ssec(self, encryption: EncryptionSetting | None): if encryption is not None and encryption.mode == EncryptionMode.SSE_C: return True return False @@ -1189,12 +1189,12 @@ def _is_ssec(self, encryption: Optional[EncryptionSetting]): def _determine_source_metadata( self, source_file_id: str, - destination_encryption: Optional[EncryptionSetting], - source_encryption: Optional[EncryptionSetting], - target_file_info: Optional[dict], - target_content_type: Optional[str], + destination_encryption: EncryptionSetting | None, + source_encryption: EncryptionSetting | None, + target_file_info: dict | None, + target_content_type: str | None, fetch_if_necessary: bool, - ) -> Tuple[Optional[dict], Optional[str]]: + ) -> tuple[dict | None, str | None]: """Determine if source file metadata is necessary to perform the copy - due to sse_c_key_id""" if not self._is_ssec(source_encryption) and not self._is_ssec( destination_encryption @@ -2065,7 +2065,7 @@ def _print_file_version( self, args, file_version: FileVersion, - folder_name: Optional[str], + folder_name: str | None, ) -> None: self._print(folder_name or file_version.file_name) @@ -2172,7 +2172,7 @@ def _print_file_version( self, args, file_version: FileVersion, - folder_name: Optional[str], + folder_name: str | None, ) -> None: if not args.long: super()._print_file_version(args, file_version, folder_name) @@ -2294,7 +2294,7 @@ class SubmitThread(threading.Thread): def __init__( self, - runner: 'Rm', + runner: Rm, args: argparse.Namespace, messages_queue: queue.Queue, reporter: ProgressReport, @@ -3035,7 +3035,7 @@ def get_execute_kwargs(self, args) -> dict: } @abstractmethod - def execute_operation(self, **kwargs) -> 'b2sdk.file_version.FileVersion': + def execute_operation(self, **kwargs) -> b2sdk.file_version.FileVersion: raise NotImplementedError def upload_file_kwargs_to_unbound_upload(self, **kwargs): @@ -3047,7 +3047,7 @@ def upload_file_kwargs_to_unbound_upload(self, **kwargs): kwargs["read_size"] = kwargs["min_part_size"] or DEFAULT_MIN_PART_SIZE return kwargs - def get_input_stream(self, filename: str) -> 'str | int | io.BinaryIO': + def get_input_stream(self, filename: str) -> str | int | io.BinaryIO: """Get input stream IF filename points to a FIFO or stdin.""" if filename == "-": if os.path.exists('-'): @@ -3062,7 +3062,7 @@ def get_input_stream(self, filename: str) -> 'str | int | io.BinaryIO': raise self.NotAnInputStream() def file_identifier_to_read_stream( - self, file_id: 'str | int | BinaryIO', buffering + self, file_id: str | int | BinaryIO, buffering ) -> BinaryIO: if isinstance(file_id, (str, int)): return open( @@ -3387,7 +3387,7 @@ def run(self, args): return 0 @classmethod - def alter_rule_by_name(cls, bucket: Bucket, name: str) -> Tuple[bool, bool]: + def alter_rule_by_name(cls, bucket: Bucket, name: str) -> tuple[bool, bool]: """ returns False if rule could not be found """ if not bucket.replication or not bucket.replication.rules: return False, False @@ -3424,7 +3424,7 @@ def alter_rule_by_name(cls, bucket: Bucket, name: str) -> Tuple[bool, bool]: @classmethod @abstractmethod - def alter_one_rule(cls, rule: ReplicationRule) -> Optional[ReplicationRule]: + def alter_one_rule(cls, rule: ReplicationRule) -> ReplicationRule | None: """ return None to delete a rule """ pass @@ -3441,7 +3441,7 @@ class ReplicationDelete(ReplicationRuleChanger): """ @classmethod - def alter_one_rule(cls, rule: ReplicationRule) -> Optional[ReplicationRule]: + def alter_one_rule(cls, rule: ReplicationRule) -> ReplicationRule | None: """ return None to delete rule """ return None @@ -3458,7 +3458,7 @@ class ReplicationPause(ReplicationRuleChanger): """ @classmethod - def alter_one_rule(cls, rule: ReplicationRule) -> Optional[ReplicationRule]: + def alter_one_rule(cls, rule: ReplicationRule) -> ReplicationRule | None: """ return None to delete rule """ rule.is_enabled = False return rule @@ -3476,7 +3476,7 @@ class ReplicationUnpause(ReplicationRuleChanger): """ @classmethod - def alter_one_rule(cls, rule: ReplicationRule) -> Optional[ReplicationRule]: + def alter_one_rule(cls, rule: ReplicationRule) -> ReplicationRule | None: """ return None to delete rule """ rule.is_enabled = True return rule @@ -3576,9 +3576,9 @@ def run(self, args): @classmethod def get_results_for_rule( - cls, bucket: Bucket, rule: ReplicationRule, destination_api: Optional[B2Api], + cls, bucket: Bucket, rule: ReplicationRule, destination_api: B2Api | None, scan_destination: bool, quiet: bool - ) -> List[dict]: + ) -> list[dict]: monitor = ReplicationMonitor( bucket=bucket, rule=rule, @@ -3595,7 +3595,7 @@ def get_results_for_rule( ] @classmethod - def filter_results_columns(cls, results: List[dict], columns: List[str]) -> List[dict]: + def filter_results_columns(cls, results: list[dict], columns: list[str]) -> list[dict]: return [{key: result[key] for key in columns} for result in results] @classmethod @@ -3611,10 +3611,10 @@ def to_human_readable(cls, value: Any) -> str: return str(value) - def output_json(self, results: Dict[str, List[dict]]) -> None: + def output_json(self, results: dict[str, list[dict]]) -> None: self._print_json(results) - def output_console(self, results: Dict[str, List[dict]]) -> None: + def output_console(self, results: dict[str, list[dict]]) -> None: for rule_name, rule_results in results.items(): self._print(f'Replication "{rule_name}":') rule_results = [ @@ -3626,7 +3626,7 @@ def output_console(self, results: Dict[str, List[dict]]) -> None: ] self._print(tabulate(rule_results, headers='keys', tablefmt='grid')) - def output_csv(self, results: Dict[str, List[dict]]) -> None: + def output_csv(self, results: dict[str, list[dict]]) -> None: rows = [] @@ -3803,7 +3803,7 @@ def _put_license_text_for_packages(self, stream: io.StringIO): stream.write(str(summary_table)) @classmethod - def _get_licenses_dicts(cls) -> List[Dict]: + def _get_licenses_dicts(cls) -> list[dict]: assert piplicenses, 'In order to run this command, you need to install the `license` extra: pip install b2[license]' pipdeptree_run = subprocess.run( ["pipdeptree", "--json", "-p", "b2"], @@ -3910,7 +3910,7 @@ class ConsoleTool: Uses a ``b2sdk.SqlitedAccountInfo`` object to keep account data between runs. """ - def __init__(self, b2_api: Optional[B2Api], stdout, stderr): + def __init__(self, b2_api: B2Api | None, stdout, stderr): self.api = b2_api self.stdout = stdout self.stderr = stderr