Skip to content

Commit

Permalink
add autocomplete support for zsh and fish shells
Browse files Browse the repository at this point in the history
  • Loading branch information
mjurbanski-reef committed Mar 27, 2024
1 parent e38f495 commit e2317ff
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 28 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ jobs:
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install test binary dependencies
if: startsWith(matrix.os, 'ubuntu')
run: |
sudo apt-get -y update
sudo apt-get -y install zsh fish
- name: Install test binary dependencies (macOS)
if: startsWith(matrix.os, 'macos')
run: |
brew install fish expect
- name: Install dependencies
run: python -m pip install --upgrade nox pdm
- name: Run unit tests
Expand Down
176 changes: 151 additions & 25 deletions b2/_internal/_cli/autocomplete_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,25 @@
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

import abc
import logging
import os
import platform
import re
import shutil
import subprocess
import textwrap
from datetime import datetime
from pathlib import Path
from typing import List
from shlex import quote

import argcomplete
from class_registry import ClassRegistry, RegistryKeyError

from b2._internal._utils.python_compat import shlex_join

logger = logging.getLogger(__name__)

SHELL_REGISTRY = ClassRegistry()
Expand All @@ -33,7 +41,7 @@ def autocomplete_install(prog: str, shell: str = 'bash') -> None:
logger.info("Autocomplete for %s has been enabled.", prog)


class ShellAutocompleteInstaller:
class ShellAutocompleteInstaller(abc.ABC):
shell_exec: str

def __init__(self, prog: str):
Expand All @@ -43,6 +51,10 @@ def install(self) -> None:
"""Install autocomplete for the given program."""
script_path = self.create_script()
if not self.is_enabled():
logger.info(
"%s completion doesn't seem to be autoloaded from %s.", self.shell_exec,
script_path.parent
)
try:
self.force_enable(script_path)
except NotImplementedError as e:
Expand All @@ -60,10 +72,11 @@ def create_script(self) -> Path:

script_path = self.get_script_path()
logger.info("Creating autocompletion script under %s", script_path)
script_path.parent.mkdir(exist_ok=True)
script_path.parent.mkdir(exist_ok=True, parents=True, mode=0o755)
script_path.write_text(shellcode)
return script_path

@abc.abstractmethod
def force_enable(self, completion_script: Path) -> None:
"""
Enable autocomplete for the given program.
Expand All @@ -76,6 +89,7 @@ def get_shellcode(self) -> str:
"""Get autocomplete shellcode for the given program."""
return argcomplete.shellcode([self.prog], shell=self.shell_exec)

@abc.abstractmethod
def get_script_path(self) -> Path:
"""Get autocomplete script path for the given program."""
raise NotImplementedError
Expand All @@ -84,43 +98,151 @@ def program_in_path(self) -> bool:
"""Check if the given program is in PATH."""
return _silent_success_run([self.shell_exec, '-c', self.prog])

@abc.abstractmethod
def is_enabled(self) -> bool:
"""Check if autocompletion is enabled."""
return _silent_success_run([self.shell_exec, '-i', '-c', f'complete -p {self.prog}'])
raise NotImplementedError


@SHELL_REGISTRY.register('bash')
class BashAutocompleteInstaller(ShellAutocompleteInstaller):
shell_exec = 'bash'
class BashLikeAutocompleteInstaller(ShellAutocompleteInstaller):
shell_exec: str
rc_file_path: str

def get_rc_path(self) -> Path:
return Path(self.rc_file_path).expanduser()

def force_enable(self, completion_script: Path) -> None:
"""Enable autocomplete for the given program."""
logger.info(
"Bash completion doesn't seem to be autoloaded from %s. Most likely `bash-completion` is not installed.",
completion_script.parent
)
bashrc_path = Path("~/.bashrc").expanduser()
if bashrc_path.exists():
bck_path = bashrc_path.with_suffix(f".{datetime.now():%Y-%m-%dT%H-%M-%S}.bak")
logger.warning("Backing up %s to %s", bashrc_path, bck_path)
"""Enable autocomplete for the given program, common logic."""
rc_path = self.get_rc_path()
if rc_path.exists() and rc_path.read_text().strip():
bck_path = rc_path.with_suffix(f".{datetime.now():%Y-%m-%dT%H-%M-%S}.bak")
logger.warning("Backing up %s to %s", rc_path, bck_path)
try:
shutil.copyfile(bashrc_path, bck_path)
shutil.copyfile(rc_path, bck_path)
except OSError as e:
raise AutocompleteInstallError(
f"Failed to backup {bashrc_path} under {bck_path}"
f"Failed to backup {rc_path} under {bck_path}"
) from e
logger.warning("Explicitly adding %s to %s", completion_script, bashrc_path)
logger.warning("Explicitly adding %s to %s", completion_script, rc_path)
add_or_update_shell_section(
bashrc_path, f"{self.prog} autocomplete", self.prog, f"source {completion_script}"
rc_path, f"{self.prog} autocomplete", self.prog, self.get_rc_section(completion_script)
)

def get_rc_section(self, completion_script: Path) -> str:
return f"source {quote(str(completion_script))}"

def get_script_path(self) -> Path:
"""Get autocomplete script path for the given program."""
return Path("~/.bash_completion.d/").expanduser() / self.prog
"""Get autocomplete script path for the given program, common logic."""
script_dir = Path(f"~/.{self.shell_exec}_completion.d/").expanduser()
return script_dir / self.prog

def is_enabled(self) -> bool:
"""Check if autocompletion is enabled."""
return _silent_success_run([self.shell_exec, '-i', '-c', f'complete -p {quote(self.prog)}'])


@SHELL_REGISTRY.register('bash')
class BashAutocompleteInstaller(BashLikeAutocompleteInstaller):
shell_exec = 'bash'
rc_file_path = "~/.bashrc"


@SHELL_REGISTRY.register('zsh')
class ZshAutocompleteInstaller(BashLikeAutocompleteInstaller):
shell_exec = 'zsh'
rc_file_path = "~/.zshrc"

def get_rc_section(self, completion_script: Path) -> str:
return textwrap.dedent(
f"""\
if [[ -z "$_comps" ]] && [[ -t 0 ]]; then autoload -Uz compinit && compinit; fi
source {quote(str(completion_script))}
"""
)

def get_script_path(self) -> Path:
"""Custom get_script_path for Zsh, if the structure differs from the base implementation."""
return Path("~/.zsh/completion/").expanduser() / f"_{self.prog}"

def is_enabled(self) -> bool:
rc_path = self.get_rc_path()
if not rc_path.exists():
rc_path.touch(mode=0o750) # otherwise `zshrc -i` may hang on creation wizard when emulating tty
cmd = [self.shell_exec, '-i', '-c', f'[[ -v _comps[{quote(self.prog)}] ]]']
return _silent_success_run(cmd) if os.isatty(0) else _silent_success_run(_tty_cmd_wrap(cmd))


@SHELL_REGISTRY.register('fish')
class FishAutocompleteInstaller(ShellAutocompleteInstaller):
shell_exec = 'fish'
rc_file_path = "~/.config/fish/config.fish"

def force_enable(self, completion_script: Path) -> None:
raise NotImplementedError("Fish shell doesn't support manual completion enabling.")

def get_script_path(self) -> Path:
"""Get autocomplete script path for the given program, common logic."""
return Path("~/.config/fish/completions/").expanduser() / f"{self.prog}.fish"

def is_enabled(self) -> bool:
"""
Check if autocompletion is enabled.
Fish seems to lazy-load completions, hence first we trigger completion.
That alone cannot be used, since fish tends to always propose completions (e.g. suggesting similarly
named filenames).
"""
return _silent_success_run(
[
self.shell_exec, '-i', '-c',
f'string length -q -- (complete -C{quote(f"{self.prog} ")} >/dev/null && complete -c {quote(self.prog)})'
]
)

def _silent_success_run(cmd: List[str]) -> bool:
return subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0

def _silent_success_run(cmd: list[str], timeout: int | None = 60) -> bool:
# start_new_session prevents `zsh -i` interaction with parent terminal under pytest-xdist
p = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.DEVNULL,
start_new_session=True,
)

try:
stdout, stderr = p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
p.kill()
stdout, stderr = p.communicate(timeout=1)
logger.warning("Command %r timed out, stdout: %r, stderr: %r", cmd, stdout, stderr)
else:
if p.returncode != 0:
logger.debug(
"Command %r exited with code %d, stdout: %r, stderr: %r", cmd, p.returncode, stdout,
stderr
)
return p.returncode == 0


def _tty_cmd_wrap(cmd: list[str]) -> list[str]:
"""
Add tty emulation to the command.
Inspired by tty emulation for different platforms found here:
https://github.com/Yuri6037/Action-FakeTTY/tree/master/script
"""
system_name = platform.system().lower()
if system_name == 'darwin':
if not shutil.which('unbuffer'):
raise CLIError(
"unbuffer is required for macOS when running without a tty. "
"You can install it via `brew install expect`."
)
return ['unbuffer', *cmd]
elif system_name == 'windows':
return ["Invoke-Expression", shlex_join(cmd)]
return ['script', '-q', '-e', '-c', shlex_join(cmd), '/dev/null']


def add_or_update_shell_section(
Expand Down Expand Up @@ -152,7 +274,11 @@ def add_or_update_shell_section(
path.write_text(file_content)


class AutocompleteInstallError(Exception):
class CLIError(Exception):
"""Base exception for CLI errors."""


class AutocompleteInstallError(CLIError):
"""Exception raised when autocomplete installation fails."""


Expand Down
5 changes: 5 additions & 0 deletions b2/_internal/_utils/python_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Utilities for compatibility with older Python versions.
"""
import functools
import shlex
import sys

if sys.version_info < (3, 9):
Expand Down Expand Up @@ -45,5 +46,9 @@ def method_wrapper(arg, *args, **kwargs):

method_wrapper.register = self.register
return method_wrapper

def shlex_join(split_command):
return ' '.join(shlex.quote(arg) for arg in split_command)
else:
singledispatchmethod = functools.singledispatchmethod
shlex_join = shlex.join
4 changes: 2 additions & 2 deletions b2/_internal/console_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@
from b2._internal._cli.argcompleters import bucket_name_completer, file_name_completer
from b2._internal._cli.autocomplete_install import (
SUPPORTED_SHELLS,
AutocompleteInstallError,
CLIError,
autocomplete_install,
)
from b2._internal._cli.b2api import _get_b2api_for_profile
Expand Down Expand Up @@ -4165,7 +4165,7 @@ def _run(self, args):

try:
autocomplete_install(self.console_tool.b2_binary_name, shell=shell)
except AutocompleteInstallError as e:
except CLIError as e:
raise CommandError(str(e)) from e
self._print(f'Autocomplete successfully installed for {shell}.')
self._print(
Expand Down
1 change: 1 addition & 0 deletions changelog.d/+zsh_autocomplete.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add autocomplete support for `zsh` and `fish` shells.
24 changes: 24 additions & 0 deletions test/unit/_cli/fixtures/dummy_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env python
######################################################################
#
# File: test/unit/_cli/fixtures/dummy_command.py
#
# Copyright 2024 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
import argparse


def main():
parser = argparse.ArgumentParser(description="Dummy command")
parser.add_argument("--foo", help="foo help")
parser.add_argument("--bar", help="bar help")
args = parser.parse_args()
print(args.foo)
print(args.bar)


if __name__ == "__main__":
main()
34 changes: 33 additions & 1 deletion test/unit/_cli/test_autocomplete_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
import pathlib
import shutil
from test.helpers import skip_on_windows

import pytest

from b2._internal._cli.autocomplete_install import add_or_update_shell_section
from b2._internal._cli.autocomplete_install import (
SHELL_REGISTRY,
add_or_update_shell_section,
)

section = "test_section"
managed_by = "pytest"
Expand Down Expand Up @@ -73,3 +80,28 @@ def test_add_or_update_shell_section_no_file(test_file):
{content}
# <<< {section} <<<
"""


@pytest.fixture
def dummy_command(homedir, monkeypatch, env):
name = "dummy_command"
bin_path = homedir / "bin" / name
bin_path.parent.mkdir(parents=True, exist_ok=True)
bin_path.symlink_to(pathlib.Path(__file__).parent / "fixtures" / f"{name}.py")
monkeypatch.setenv("PATH", f"{homedir}/bin:{env['PATH']}")
yield name


@pytest.mark.parametrize("shell", ["bash", "zsh", "fish"])
@skip_on_windows
def test_autocomplete_installer(homedir, env, shell, caplog, dummy_command):
caplog.set_level(10)
shell_installer = SHELL_REGISTRY.get(shell, prog=dummy_command)

shell_bin = shutil.which(shell)
if shell_bin is None:
pytest.skip(f"{shell} is not installed")

assert shell_installer.is_enabled() is False
shell_installer.install()
assert shell_installer.is_enabled() is True
13 changes: 13 additions & 0 deletions test/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ def cli_version(request) -> str:
return request.config.getoption('--cli')


@pytest.fixture
def homedir(tmp_path_factory):
yield tmp_path_factory.mktemp("test_homedir")


@pytest.fixture
def env(homedir, monkeypatch):
"""Get ENV for running b2 command from shell level."""
monkeypatch.setenv("HOME", str(homedir))
monkeypatch.setenv("SHELL", "/bin/bash") # fix for running under github actions
yield os.environ


@pytest.fixture(scope='session')
def console_tool_class(cli_version):
# Ensures import of the correct library to handle all the tests.
Expand Down
Loading

0 comments on commit e2317ff

Please sign in to comment.