Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Autocomplete improvements #229

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions b2/_cli/arg_parser_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
######################################################################
#
# File: b2/_cli/arg_parser_types.py
#
# Copyright 2020 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################

import argparse
import functools
import re

import arrow
from b2sdk.v2 import RetentionPeriod

_arrow_version = tuple(int(p) for p in arrow.__version__.split("."))


def parse_comma_separated_list(s):
"""
Parse comma-separated list.
"""
return [word.strip() for word in s.split(",")]


def parse_millis_from_float_timestamp(s):
"""
Parse timestamp, e.g. 1367900664 or 1367900664.152
"""
parsed = arrow.get(float(s))
if _arrow_version < (1, 0, 0):
return int(parsed.format("XSSS"))
else:
return int(parsed.format("x")[:13])


def parse_range(s):
"""
Parse optional integer range
"""
bytes_range = None
if s is not None:
bytes_range = s.split(',')
if len(bytes_range) != 2:
raise argparse.ArgumentTypeError('the range must have 2 values: start,end')
bytes_range = (
int(bytes_range[0]),
int(bytes_range[1]),
)

return bytes_range


def parse_default_retention_period(s):
unit_part = '(' + ')|('.join(RetentionPeriod.KNOWN_UNITS) + ')'
m = re.match(r'^(?P<duration>\d+) (?P<unit>%s)$' % (unit_part), s)
if not m:
raise argparse.ArgumentTypeError(
'default retention period must be in the form of "X days|years "'
)
return RetentionPeriod(**{m.group('unit'): int(m.group('duration'))})


def wrap_with_argument_type_error(func, translator=str, exc_type=ValueError):
"""
Wrap function that may raise an exception into a function that raises ArgumentTypeError error.
"""

@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except exc_type as e:
raise argparse.ArgumentTypeError(translator(e))

return wrapper
48 changes: 23 additions & 25 deletions b2/_cli/argcompleters.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,32 @@
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from functools import wraps
from itertools import islice

from b2sdk.v2 import LIST_FILE_NAMES_MAX_LIMIT
from b2sdk.v2.api import B2Api

from b2._cli.b2api import _get_b2api_for_profile
from b2._utils.python_compat import removeprefix
from b2._utils.uri import parse_b2_uri


def _with_api(func):
"""Decorator to inject B2Api instance into argcompleter function."""

@wraps(func)
def wrapper(prefix, parsed_args, **kwargs):
api = _get_b2api_for_profile(parsed_args.profile)
return func(prefix=prefix, parsed_args=parsed_args, api=api, **kwargs)
# We import all the necessary modules lazily in completers in order
# to avoid upfront cost of the imports when argcompleter is used for
# autocompletions.

return wrapper
from itertools import islice


@_with_api
def bucket_name_completer(api: B2Api, **kwargs):
return [bucket.name for bucket in api.list_buckets(use_cache=True)]
def bucket_name_completer(prefix, parsed_args, **kwargs):
from b2._cli.b2api import _get_b2api_for_profile
api = _get_b2api_for_profile(getattr(parsed_args, 'profile', None))
res = [bucket.name for bucket in api.list_buckets(use_cache=True)]
return res


@_with_api
def file_name_completer(api: B2Api, parsed_args, **kwargs):
def file_name_completer(prefix, parsed_args, **kwargs):
"""
Completes file names in a bucket.

To limit delay & cost only lists files returned from by single call to b2_list_file_names
"""
from b2sdk.v2 import LIST_FILE_NAMES_MAX_LIMIT

from b2._cli.b2api import _get_b2api_for_profile

api = _get_b2api_for_profile(parsed_args.profile)
bucket = api.get_bucket_by_name(parsed_args.bucketName)
file_versions = bucket.ls(
getattr(parsed_args, 'folderName', None) or '',
Expand All @@ -54,11 +46,17 @@ def file_name_completer(api: B2Api, parsed_args, **kwargs):
]


@_with_api
def b2uri_file_completer(api: B2Api, prefix: str, **kwargs):
def b2uri_file_completer(prefix: str, parsed_args, **kwargs):
"""
Complete B2 URI pointing to a file-like object in a bucket.
"""
from b2sdk.v2 import LIST_FILE_NAMES_MAX_LIMIT

from b2._cli.b2api import _get_b2api_for_profile
from b2._utils.python_compat import removeprefix
from b2._utils.uri import parse_b2_uri

api = _get_b2api_for_profile(getattr(parsed_args, 'profile', None))
if prefix.startswith('b2://'):
prefix_without_scheme = removeprefix(prefix, 'b2://')
if '/' not in prefix_without_scheme:
Expand Down
146 changes: 146 additions & 0 deletions b2/_cli/autocomplete_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
######################################################################
#
# File: b2/_cli/autocomplete_cache.py
#
# Copyright 2020 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

import abc
import argparse
import os
import pathlib
import pickle
from typing import Callable

import argcomplete
import platformdirs

from b2.version import VERSION


def identity(x):
return x


class StateTracker(abc.ABC):
@abc.abstractmethod
def current_state_identifier(self) -> str:
raise NotImplementedError()


class PickleStore(abc.ABC):
@abc.abstractmethod
def get_pickle(self, identifier: str) -> bytes | None:
raise NotImplementedError()

@abc.abstractmethod
def set_pickle(self, identifier: str, data: bytes) -> None:
raise NotImplementedError()


class VersionTracker(StateTracker):
def current_state_identifier(self) -> str:
return VERSION


class HomeCachePickleStore(PickleStore):
_dir: pathlib.Path

def __init__(self, dir: pathlib.Path | None = None) -> None:
self._dir = dir

def _cache_dir(self) -> pathlib.Path:
if self._dir:
return self._dir
self._dir = pathlib.Path(
platformdirs.user_cache_dir(appname='b2', appauthor='backblaze')
) / 'autocomplete'
return self._dir

def _fname(self, identifier: str) -> str:
return f"b2-autocomplete-cache-{identifier}.pickle"

def get_pickle(self, identifier: str) -> bytes | None:
path = self._cache_dir() / self._fname(identifier)
if path.exists():
with open(path, 'rb') as f:
return f.read()

def set_pickle(self, identifier: str, data: bytes) -> None:
"""Sets the pickle for identifier if it doesn't exist.
When a new pickle is added, old ones are removed."""

dir = self._cache_dir()
os.makedirs(dir, exist_ok=True)
path = dir / self._fname(identifier)
for file in dir.glob('b2-autocomplete-cache-*.pickle'):
file.unlink()
with open(path, 'wb') as f:
f.write(data)


class AutocompleteCache:
_tracker: StateTracker
_store: PickleStore
_unpickle: Callable[[bytes], argparse.ArgumentParser]

def __init__(
self,
tracker: StateTracker,
store: PickleStore,
unpickle: Callable[[bytes], argparse.ArgumentParser] | None = None
):
self._tracker = tracker
self._store = store
self._unpickle = unpickle or pickle.loads

def _is_autocomplete_run(self) -> bool:
return '_ARGCOMPLETE' in os.environ

def autocomplete_from_cache(self, uncached_args: dict | None = None) -> None:
if not self._is_autocomplete_run():
return

try:
identifier = self._tracker.current_state_identifier()
pickle_data = self._store.get_pickle(identifier)
if pickle_data:
parser = self._unpickle(pickle_data)
argcomplete.autocomplete(parser, **(uncached_args or {}))
except Exception:
# Autocomplete from cache failed but maybe we can autocomplete from scratch
return

def _clean_parser(self, parser: argparse.ArgumentParser) -> None:
parser.register('type', None, identity)
for action in parser._actions:
if action.type not in [str, int]:
action.type = None
for action in parser._action_groups:
for key in parser._defaults:
action.set_defaults(**{key: None})
parser.description = None
if parser._subparsers:
for group_action in parser._subparsers._group_actions:
for parser in group_action.choices.values():
self._clean_parser(parser)

def cache_and_autocomplete(
self, parser: argparse.ArgumentParser, uncached_args: dict | None = None
) -> None:
if not self._is_autocomplete_run():
return

try:
identifier = self._tracker.current_state_identifier()
self._clean_parser(parser)
self._store.set_pickle(identifier, pickle.dumps(parser))
finally:
argcomplete.autocomplete(parser, **(uncached_args or {}))


AUTOCOMPLETE = AutocompleteCache(tracker=VersionTracker(), store=HomeCachePickleStore())
2 changes: 1 addition & 1 deletion b2/_cli/b2args.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
"""
import argparse

from b2._cli.arg_parser_types import wrap_with_argument_type_error
from b2._cli.argcompleters import b2uri_file_completer
from b2._utils.uri import B2URI, B2URIBase, parse_b2_uri
from b2.arg_parser import wrap_with_argument_type_error


def b2_file_uri(value: str) -> B2URIBase:
Expand Down
65 changes: 0 additions & 65 deletions b2/arg_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,12 @@
import argparse
import functools
import locale
import re
import sys
import textwrap
import unittest.mock

import arrow
from b2sdk.v2 import RetentionPeriod
from rst2ansi import rst2ansi

_arrow_version = tuple(int(p) for p in arrow.__version__.split("."))


class B2RawTextHelpFormatter(argparse.RawTextHelpFormatter):
"""
Expand Down Expand Up @@ -152,63 +147,3 @@ def print_help(self, *args, show_all: bool = False, **kwargs):
self, 'formatter_class', functools.partial(B2RawTextHelpFormatter, show_all=show_all)
):
super().print_help(*args, **kwargs)


def parse_comma_separated_list(s):
"""
Parse comma-separated list.
"""
return [word.strip() for word in s.split(",")]


def parse_millis_from_float_timestamp(s):
"""
Parse timestamp, e.g. 1367900664 or 1367900664.152
"""
parsed = arrow.get(float(s))
if _arrow_version < (1, 0, 0):
return int(parsed.format("XSSS"))
else:
return int(parsed.format("x")[:13])


def parse_range(s):
"""
Parse optional integer range
"""
bytes_range = None
if s is not None:
bytes_range = s.split(',')
if len(bytes_range) != 2:
raise argparse.ArgumentTypeError('the range must have 2 values: start,end')
bytes_range = (
int(bytes_range[0]),
int(bytes_range[1]),
)

return bytes_range


def parse_default_retention_period(s):
unit_part = '(' + ')|('.join(RetentionPeriod.KNOWN_UNITS) + ')'
m = re.match(r'^(?P<duration>\d+) (?P<unit>%s)$' % (unit_part), s)
if not m:
raise argparse.ArgumentTypeError(
'default retention period must be in the form of "X days|years "'
)
return RetentionPeriod(**{m.group('unit'): int(m.group('duration'))})


def wrap_with_argument_type_error(func, translator=str, exc_type=ValueError):
"""
Wrap function that may raise an exception into a function that raises ArgumentTypeError error.
"""

@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except exc_type as e:
raise argparse.ArgumentTypeError(translator(e))

return wrapper
Loading
Loading