Skip to content

Commit

Permalink
add autocomplete support for zsh and fish shells
Browse files Browse the repository at this point in the history
  • Loading branch information
mjurbanski-reef committed Mar 28, 2024
1 parent a8f9869 commit 8f39f13
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 25 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,16 @@ jobs:
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install test binary dependencies
if: startsWith(matrix.os, 'ubuntu')
run: |
sudo apt-get -y update
sudo apt-get -y install zsh fish
sudo chmod -R 755 /usr/share/zsh/vendor-completions /usr/share/zsh # Fix permissions for zsh completions
- name: Install test binary dependencies (macOS)
if: startsWith(matrix.os, 'macos')
run: |
brew install fish
- name: Install dependencies
run: python -m pip install --upgrade nox pdm
- name: Run unit tests
Expand Down
218 changes: 194 additions & 24 deletions b2/_internal/_cli/autocomplete_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,28 @@
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
from __future__ import annotations

import abc
import io
import logging
import os
import re
import shlex
import shutil
import signal
import subprocess
import textwrap
from datetime import datetime
from importlib.util import find_spec
from pathlib import Path
from typing import List
from shlex import quote

import argcomplete
from class_registry import ClassRegistry, RegistryKeyError

from b2._internal._utils.python_compat import shlex_join

logger = logging.getLogger(__name__)

SHELL_REGISTRY = ClassRegistry()
Expand All @@ -33,7 +44,7 @@ def autocomplete_install(prog: str, shell: str = 'bash') -> None:
logger.info("Autocomplete for %s has been enabled.", prog)


class ShellAutocompleteInstaller:
class ShellAutocompleteInstaller(abc.ABC):
shell_exec: str

def __init__(self, prog: str):
Expand All @@ -43,6 +54,10 @@ def install(self) -> None:
"""Install autocomplete for the given program."""
script_path = self.create_script()
if not self.is_enabled():
logger.info(
"%s completion doesn't seem to be autoloaded from %s.", self.shell_exec,
script_path.parent
)
try:
self.force_enable(script_path)
except NotImplementedError as e:
Expand All @@ -60,10 +75,11 @@ def create_script(self) -> Path:

script_path = self.get_script_path()
logger.info("Creating autocompletion script under %s", script_path)
script_path.parent.mkdir(exist_ok=True)
script_path.parent.mkdir(exist_ok=True, parents=True, mode=0o755)
script_path.write_text(shellcode)
return script_path

@abc.abstractmethod
def force_enable(self, completion_script: Path) -> None:
"""
Enable autocomplete for the given program.
Expand All @@ -76,6 +92,7 @@ def get_shellcode(self) -> str:
"""Get autocomplete shellcode for the given program."""
return argcomplete.shellcode([self.prog], shell=self.shell_exec)

@abc.abstractmethod
def get_script_path(self) -> Path:
"""Get autocomplete script path for the given program."""
raise NotImplementedError
Expand All @@ -84,43 +101,196 @@ def program_in_path(self) -> bool:
"""Check if the given program is in PATH."""
return _silent_success_run([self.shell_exec, '-c', self.prog])

@abc.abstractmethod
def is_enabled(self) -> bool:
"""Check if autocompletion is enabled."""
return _silent_success_run([self.shell_exec, '-i', '-c', f'complete -p {self.prog}'])
raise NotImplementedError


@SHELL_REGISTRY.register('bash')
class BashAutocompleteInstaller(ShellAutocompleteInstaller):
shell_exec = 'bash'
class BashLikeAutocompleteInstaller(ShellAutocompleteInstaller):
shell_exec: str
rc_file_path: str

def get_rc_path(self) -> Path:
return Path(self.rc_file_path).expanduser()

def force_enable(self, completion_script: Path) -> None:
"""Enable autocomplete for the given program."""
logger.info(
"Bash completion doesn't seem to be autoloaded from %s. Most likely `bash-completion` is not installed.",
completion_script.parent
)
bashrc_path = Path("~/.bashrc").expanduser()
if bashrc_path.exists():
bck_path = bashrc_path.with_suffix(f".{datetime.now():%Y-%m-%dT%H-%M-%S}.bak")
logger.warning("Backing up %s to %s", bashrc_path, bck_path)
"""Enable autocomplete for the given program, common logic."""
rc_path = self.get_rc_path()
if rc_path.exists() and rc_path.read_text().strip():
bck_path = rc_path.with_suffix(f".{datetime.now():%Y-%m-%dT%H-%M-%S}.bak")
logger.warning("Backing up %s to %s", rc_path, bck_path)
try:
shutil.copyfile(bashrc_path, bck_path)
shutil.copyfile(rc_path, bck_path)
except OSError as e:
raise AutocompleteInstallError(
f"Failed to backup {bashrc_path} under {bck_path}"
f"Failed to backup {rc_path} under {bck_path}"
) from e
logger.warning("Explicitly adding %s to %s", completion_script, bashrc_path)
logger.warning("Explicitly adding %s to %s", completion_script, rc_path)
add_or_update_shell_section(
bashrc_path, f"{self.prog} autocomplete", self.prog, f"source {completion_script}"
rc_path, f"{self.prog} autocomplete", self.prog, self.get_rc_section(completion_script)
)

def get_rc_section(self, completion_script: Path) -> str:
return f"source {quote(str(completion_script))}"

def get_script_path(self) -> Path:
"""Get autocomplete script path for the given program."""
return Path("~/.bash_completion.d/").expanduser() / self.prog
"""Get autocomplete script path for the given program, common logic."""
script_dir = Path(f"~/.{self.shell_exec}_completion.d/").expanduser()
return script_dir / self.prog

def is_enabled(self) -> bool:
"""Check if autocompletion is enabled."""
return _silent_success_run([self.shell_exec, '-i', '-c', f'complete -p {quote(self.prog)}'])


def _silent_success_run(cmd: List[str]) -> bool:
return subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0
@SHELL_REGISTRY.register('bash')
class BashAutocompleteInstaller(BashLikeAutocompleteInstaller):
shell_exec = 'bash'
rc_file_path = "~/.bashrc"


@SHELL_REGISTRY.register('zsh')
class ZshAutocompleteInstaller(BashLikeAutocompleteInstaller):
shell_exec = 'zsh'
rc_file_path = "~/.zshrc"

def get_rc_section(self, completion_script: Path) -> str:
return textwrap.dedent(
f"""\
if [[ -z "$_comps" ]] && [[ -t 0 ]]; then autoload -Uz compinit && compinit -i -D; fi
source {quote(str(completion_script))}
"""
)

def get_script_path(self) -> Path:
"""Custom get_script_path for Zsh, if the structure differs from the base implementation."""
return Path("~/.zsh/completion/").expanduser() / f"_{self.prog}"

def is_enabled(self) -> bool:
rc_path = self.get_rc_path()
if not rc_path.exists():
# if zshrc is missing `zshrc -i` may hang on creation wizard when emulating tty
rc_path.touch(mode=0o750)
_silent_success_run_with_pty(
[self.shell_exec, '-c', 'autoload -Uz compaudit; echo AUDIT; compaudit']
)

cmd = [self.shell_exec, '-i', '-c', f'[[ -v _comps[{quote(self.prog)}] ]]']
return _silent_success_run_with_tty(cmd)


@SHELL_REGISTRY.register('fish')
class FishAutocompleteInstaller(ShellAutocompleteInstaller):
shell_exec = 'fish'
rc_file_path = "~/.config/fish/config.fish"

def force_enable(self, completion_script: Path) -> None:
raise NotImplementedError("Fish shell doesn't support manual completion enabling.")

def get_script_path(self) -> Path:
"""Get autocomplete script path for the given program, common logic."""
complete_paths = [
Path(p) for p in shlex.split(
subprocess.run(
[self.shell_exec, '-c', 'echo $fish_complete_path'],
timeout=30,
text=True,
check=True,
capture_output=True
).stdout
)
]
user_path = Path("~/.config/fish/completions").expanduser()
if complete_paths:
target_path = user_path if user_path in complete_paths else complete_paths[0]
else:
logger.warning("$fish_complete_path is empty, falling back to %r", user_path)
target_path = user_path
return target_path / f"{self.prog}.fish"

def is_enabled(self) -> bool:
"""
Check if autocompletion is enabled.
Fish seems to lazy-load completions, hence first we trigger completion.
That alone cannot be used, since fish tends to always propose completions (e.g. suggesting similarly
named filenames).
"""
environ = os.environ.copy()
environ.setdefault("TERM", "xterm") # TERM has to be set for fish to load completions
return _silent_success_run_with_tty(
[
self.shell_exec, '-i', '-c',
f'string length -q -- (complete -C{quote(f"{self.prog} ")} >/dev/null && complete -c {quote(self.prog)})'
],
env=environ,
)


def _silent_success_run_with_tty(
cmd: list[str], timeout: int = 30, env: dict | None = None
) -> bool:
emulate_tty = not os.isatty(0) # is True under GHA or pytest-xdist
if emulate_tty and not find_spec('pexpect'):
emulate_tty = False
logger.warning(
"pexpect is needed to check autocomplete installation correctness without tty. "
"You can install it via `pip install pexpect`."
)
run_func = _silent_success_run_with_pty if emulate_tty else _silent_success_run
return run_func(cmd, timeout=timeout, env=env)


def _silent_success_run(cmd: list[str], timeout: int = 30, env: dict | None = None) -> bool:
p = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stdin=subprocess.DEVNULL,
start_new_session=True, # prevents `zsh -i` messing with parent tty under pytest-xdist
env=env,
)

try:
stdout, stderr = p.communicate(timeout=timeout)
except subprocess.TimeoutExpired:
p.kill()
stdout, stderr = p.communicate(timeout=1)
logger.warning("Command %r timed out, stdout: %r, stderr: %r", cmd, stdout, stderr)
else:
(logger.debug if p.returncode == 0 else logger.warning)(
"Command %r exited with code %r, stdout: %r, stderr: %r", cmd, p.returncode, stdout,
stderr
)
return p.returncode == 0


def _silent_success_run_with_pty(
cmd: list[str], timeout: int = 30, env: dict | None = None
) -> bool:
"""
Run a command with emulated terminal and return whether it succeeded.
"""
import pexpect

command_str = shlex_join(cmd)

child = pexpect.spawn(command_str, timeout=timeout, env=env)
output = io.BytesIO()
try:
child.logfile_read = output
child.expect(pexpect.EOF)
except pexpect.TIMEOUT:
logger.warning("Command %r timed out, output: %r", cmd, output.getvalue())
child.kill(signal.SIGKILL)
return False
finally:
child.close()

(logger.debug if child.exitstatus == 0 else logger.warning
)("Command %r exited with code %r, output: %r", cmd, child.exitstatus, output.getvalue())
return child.exitstatus == 0


def add_or_update_shell_section(
Expand Down
5 changes: 5 additions & 0 deletions b2/_internal/_utils/python_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Utilities for compatibility with older Python versions.
"""
import functools
import shlex
import sys

if sys.version_info < (3, 9):
Expand Down Expand Up @@ -45,5 +46,9 @@ def method_wrapper(arg, *args, **kwargs):

method_wrapper.register = self.register
return method_wrapper

def shlex_join(split_command):
return ' '.join(shlex.quote(arg) for arg in split_command)
else:
singledispatchmethod = functools.singledispatchmethod
shlex_join = shlex.join
1 change: 1 addition & 0 deletions changelog.d/+zsh_autocomplete.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add autocomplete support for `zsh` and `fish` shells.
24 changes: 24 additions & 0 deletions test/unit/_cli/fixtures/dummy_command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env python
######################################################################
#
# File: test/unit/_cli/fixtures/dummy_command.py
#
# Copyright 2024 Backblaze Inc. All Rights Reserved.
#
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
import argparse


def main():
parser = argparse.ArgumentParser(description="Dummy command")
parser.add_argument("--foo", help="foo help")
parser.add_argument("--bar", help="bar help")
args = parser.parse_args()
print(args.foo)
print(args.bar)


if __name__ == "__main__":
main()
34 changes: 33 additions & 1 deletion test/unit/_cli/test_autocomplete_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@
# License https://www.backblaze.com/using_b2_code.html
#
######################################################################
import pathlib
import shutil
from test.helpers import skip_on_windows

import pytest

from b2._internal._cli.autocomplete_install import add_or_update_shell_section
from b2._internal._cli.autocomplete_install import (
SHELL_REGISTRY,
add_or_update_shell_section,
)

section = "test_section"
managed_by = "pytest"
Expand Down Expand Up @@ -73,3 +80,28 @@ def test_add_or_update_shell_section_no_file(test_file):
{content}
# <<< {section} <<<
"""


@pytest.fixture
def dummy_command(homedir, monkeypatch, env):
name = "dummy_command"
bin_path = homedir / "bin" / name
bin_path.parent.mkdir(parents=True, exist_ok=True)
bin_path.symlink_to(pathlib.Path(__file__).parent / "fixtures" / f"{name}.py")
monkeypatch.setenv("PATH", f"{homedir}/bin:{env['PATH']}")
yield name


@pytest.mark.parametrize("shell", ["bash", "zsh", "fish"])
@skip_on_windows
def test_autocomplete_installer(homedir, env, shell, caplog, dummy_command):
caplog.set_level(10)
shell_installer = SHELL_REGISTRY.get(shell, prog=dummy_command)

shell_bin = shutil.which(shell)
if shell_bin is None:
pytest.skip(f"{shell} is not installed")

assert shell_installer.is_enabled() is False
shell_installer.install()
assert shell_installer.is_enabled() is True
Loading

0 comments on commit 8f39f13

Please sign in to comment.