diff --git a/pyflowlauncher/__init__.py b/pyflowlauncher/__init__.py deleted file mode 100644 index 22bbdfa..0000000 --- a/pyflowlauncher/__init__.py +++ /dev/null @@ -1,18 +0,0 @@ -import logging - -from .plugin import Plugin -from .result import JsonRPCAction, Result, send_results, ResultResponse -from .method import Method - - -logger = logging.getLogger(__name__) - - -__all__ = [ - "Plugin", - "ResultResponse", - "send_results", - "Result", - "JsonRPCAction", - "Method", -] diff --git a/pyflowlauncher/base.py b/pyflowlauncher/base.py new file mode 100644 index 0000000..ed15bf0 --- /dev/null +++ b/pyflowlauncher/base.py @@ -0,0 +1,16 @@ +from abc import ABC +import logging + + +_logger = logging.getLogger(__name__) + + +class Base(ABC): + + _LOGGER = _logger.getChild("Base") + + def __init__(self) -> None: + self._logger = logging.getLogger(__name__).getChild(self.__class__.__name__) + + def __del__(self): + self._logger.debug(f"Destroying {self.__class__.__name__} instance") diff --git a/pyflowlauncher/event.py b/pyflowlauncher/event.py deleted file mode 100644 index 48453d7..0000000 --- a/pyflowlauncher/event.py +++ /dev/null @@ -1,56 +0,0 @@ -import asyncio -from typing import Any, Callable, Iterable, Type, Union - - -class EventNotFound(Exception): - - def __init__(self, event: str): - self.event = event - super().__init__(f"Event '{event}' not found.") - - -class EventHandler: - - def __init__(self): - self._events = {} - self._handlers = {} - - def _get_callable_name(self, method: Union[Callable[..., Any], Exception]): - return getattr(method, '__name__', method.__class__.__name__).lower() - - def add_event(self, event: Callable[..., Any], *, name=None) -> str: - key = name or self._get_callable_name(event) - self._events[key] = event - return key - - def add_events(self, events: Iterable[Callable[..., Any]]): - for event in events: - self.add_event(event) - - def add_exception_handler(self, exception: Type[Exception], handler: Callable[..., Any]): - self._handlers[exception] = handler - - def get_event(self, event: str) -> Callable[..., Any]: - try: - return self._events[event] - except KeyError: - raise EventNotFound(event) - - async def _await_maybe(self, result: Any) -> Any: - if asyncio.iscoroutine(result): - return await result - return result - - async def trigger_exception_handler(self, exception: Exception) -> Any: - try: - handler = self._handlers[exception.__class__] - return await self._await_maybe(handler(exception)) - except KeyError: - raise exception - - async def trigger_event(self, event: str, *args, **kwargs) -> Any: - try: - result = self.get_event(event)(*args, **kwargs) - return await self._await_maybe(result) - except Exception as e: - return await self.trigger_exception_handler(e) diff --git a/pyflowlauncher/jsonrpc.py b/pyflowlauncher/jsonrpc.py deleted file mode 100644 index 89aa567..0000000 --- a/pyflowlauncher/jsonrpc.py +++ /dev/null @@ -1,28 +0,0 @@ -from __future__ import annotations - -import json -import sys -from typing import Any, Mapping - -if sys.version_info < (3, 11): - from typing_extensions import NotRequired, TypedDict -else: - from typing import NotRequired, TypedDict - - -class JsonRPCRequest(TypedDict): - method: str - parameters: list - settings: NotRequired[dict[Any, Any]] - - -class JsonRPCClient: - - def send(self, data: Mapping) -> None: - json.dump(data, sys.stdout) - - def recieve(self) -> JsonRPCRequest: - try: - return json.loads(sys.argv[1]) - except (IndexError, json.JSONDecodeError): - return {'method': 'query', 'parameters': ['']} diff --git a/pyflowlauncher/py.typed b/pyflowlauncher/jsonrpc/client.py similarity index 100% rename from pyflowlauncher/py.typed rename to pyflowlauncher/jsonrpc/client.py diff --git a/pyflowlauncher/jsonrpc/handler.py b/pyflowlauncher/jsonrpc/handler.py new file mode 100644 index 0000000..dc78d99 --- /dev/null +++ b/pyflowlauncher/jsonrpc/handler.py @@ -0,0 +1,33 @@ +import asyncio +import logging + +from ..base import Base +from .models import JsonRPCRequest + + +_logger = logging.getLogger(__name__) + + +class Dispatcher(Base): + + def __init__(self): + self._methods = {} + + def add_method(self, func): + self._methods[func.__name__] = func + _logger.debug(f"Adding method {func.__name__} to dispatcher") + return func + + def remove_method(self, func): + del self._methods[func.__name__] + + async def await_maybe(self, func): + if asyncio.iscoroutine(func): + return await func + return func + + async def dispatch(self, request: JsonRPCRequest): + method = self._methods.get(request['method']) + if method is None: + return {'error': 'method not found'} + return await self.await_maybe(method(*request['parameters'])) diff --git a/pyflowlauncher/jsonrpc/ids.py b/pyflowlauncher/jsonrpc/ids.py new file mode 100644 index 0000000..ee690c6 --- /dev/null +++ b/pyflowlauncher/jsonrpc/ids.py @@ -0,0 +1,19 @@ +from itertools import count + + +class ID: + + def __init__(self, start=1, step=1): + self._counter = count(start, step) + self._current = start + + def __next__(self): + self._current = next(self._counter) + return self._current + + def __iter__(self): + return self + + @property + def current(self): + return self._current diff --git a/pyflowlauncher/jsonrpc/models.py b/pyflowlauncher/jsonrpc/models.py new file mode 100644 index 0000000..9f274e6 --- /dev/null +++ b/pyflowlauncher/jsonrpc/models.py @@ -0,0 +1,14 @@ +from typing import TypedDict + + +class JsonRPCRequest(TypedDict): + id: int + jsonrpc: str + method: str + parameters: list + + +class JsonRPCResponse(TypedDict): + id: int + jsonrpc: str + result: list diff --git a/pyflowlauncher/jsonrpc/server.py b/pyflowlauncher/jsonrpc/server.py new file mode 100644 index 0000000..f5a716f --- /dev/null +++ b/pyflowlauncher/jsonrpc/server.py @@ -0,0 +1,8 @@ +from ..base import Base +from ids import ID + + +class Server(Base): + + def __init__(self, id: ID): + self.id = id \ No newline at end of file diff --git a/pyflowlauncher/method.py b/pyflowlauncher/method.py deleted file mode 100644 index 50d3e0a..0000000 --- a/pyflowlauncher/method.py +++ /dev/null @@ -1,24 +0,0 @@ -from __future__ import annotations - -from abc import ABC, abstractmethod -from typing import Any, Dict, Optional - -from .result import JsonRPCAction, Result, ResultResponse, send_results -from .shared import logger - - -class Method(ABC): - - def __init__(self) -> None: - self._logger = logger(self) - self._results: list[Result] = [] - - def add_result(self, result: Result) -> None: - self._results.append(result) - - def return_results(self, settings: Optional[Dict[str, Any]] = None) -> ResultResponse: - return send_results(self._results, settings) - - @abstractmethod - def __call__(self, *args, **kwargs) -> ResultResponse | JsonRPCAction: - pass diff --git a/pyflowlauncher/plugin.py b/pyflowlauncher/plugin.py index 9f26ee6..d8d6788 100644 --- a/pyflowlauncher/plugin.py +++ b/pyflowlauncher/plugin.py @@ -1,102 +1,24 @@ -from __future__ import annotations - -import sys -from functools import wraps -from typing import Any, Callable, Iterable, Optional, Type, Union -from pathlib import Path -import json -import asyncio - -from pyflowlauncher.shared import logger - -from .event import EventHandler -from .jsonrpc import JsonRPCClient -from .result import JsonRPCAction, ResultResponse -from .manifest import PluginManifestSchema, MANIFEST_FILE - -Method = Callable[..., Union[ResultResponse, JsonRPCAction, None]] +from jsonrpcserver import method, async_dispatch as dispatch class Plugin: - def __init__(self, methods: list[Method] | None = None) -> None: - self._logger = logger(self) - self._client = JsonRPCClient() - self._event_handler = EventHandler() - self._settings: dict[str, Any] = {} - if methods: - self.add_methods(methods) - - def add_method(self, method: Method) -> str: - """Add a method to the event handler.""" - return self._event_handler.add_event(method) - - def add_methods(self, methods: Iterable[Method]) -> None: - self._event_handler.add_events(methods) - - def on_method(self, method: Method) -> Method: - @wraps(method) - def wrapper(*args, **kwargs): - return method(*args, **kwargs) - self._event_handler.add_event(wrapper) - return wrapper - - def method(self, method: Method) -> Method: - """Register a method to be called when the plugin is run.""" - return self.on_method(method) - - def add_exception_handler(self, exception: Type[Exception], handler: Callable[..., Any]) -> None: - """Add exception handler to be called when an exception is raised in a method.""" - self._event_handler.add_exception_handler(exception, handler) - - def on_except(self, exception: Type[Exception]) -> Callable[..., Any]: - @wraps(exception) - def wrapper(handler: Callable[..., Any]) -> Callable[..., Any]: - self.add_exception_handler(exception, handler) - return handler - return wrapper - - def action(self, method: Method, parameters: Optional[Iterable] = None) -> JsonRPCAction: - """Register a method and return a JsonRPCAction that calls it.""" - method_name = self.add_method(method) - return {"method": method_name, "parameters": parameters or []} + def __init__(self): + self.methods = {} - @property - def settings(self) -> dict: - if self._settings is None: - self._settings = {} - self._settings = self._client.recieve().get('settings', {}) - return self._settings + def on_method(self, func): + self.methods[func.__name__] = func + return func - def run(self) -> None: - request = self._client.recieve() - method = request["method"] - parameters = request.get('parameters', []) - if sys.version_info >= (3, 10, 0): - feedback = asyncio.run(self._event_handler.trigger_event(method, *parameters)) - else: - loop = asyncio.get_event_loop() - feedback = loop.run_until_complete(self._event_handler.trigger_event(method, *parameters)) - if not feedback: - return - self._client.send(feedback) + async def run(self): + while True: + request = await self.receive_request() + response = await dispatch(request, methods=self.methods) + await self.send_response(response) - @property - def run_dir(self) -> Path: - """Return the run directory of the plugin.""" - return Path(sys.argv[0]).parent + async def receive_request(self): + pass - def root_dir(self) -> Path: - """Return the root directory of the plugin.""" - current_dir = self.run_dir - for part in current_dir.parts: - if current_dir.joinpath(MANIFEST_FILE).exists(): - return current_dir - current_dir = current_dir.parent - raise FileNotFoundError(f"Could not find {MANIFEST_FILE} in {self.run_dir} or any parent directory.") + async def send_response(self, response): + pass - def manifest(self) -> PluginManifestSchema: - """Return the plugin manifest.""" - with open(self.root_dir() / MANIFEST_FILE, 'r', encoding='utf-8') as f: - manifest = json.load(f) - return manifest diff --git a/pyflowlauncher/result.py b/pyflowlauncher/result.py deleted file mode 100644 index 84a3dc6..0000000 --- a/pyflowlauncher/result.py +++ /dev/null @@ -1,74 +0,0 @@ -from __future__ import annotations - -import sys -from dataclasses import dataclass -from pathlib import Path -from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Union - -if sys.version_info < (3, 11): - from typing_extensions import NotRequired, TypedDict -else: - from typing import NotRequired, TypedDict - - -if TYPE_CHECKING: - from .plugin import Method - - -class JsonRPCAction(TypedDict): - """Flow Launcher JsonRPCAction""" - method: str - parameters: Iterable - dontHideAfterAction: NotRequired[bool] - - -class Glyph(TypedDict): - """Flow Launcher Glyph""" - Glyph: str - FontFamily: str - - -class PreviewInfo(TypedDict): - """Flow Launcher Preview section""" - PreviewImagePath: Optional[str] - Description: Optional[str] - IsMedia: bool - PreviewDeligate: Optional[str] - - -@dataclass -class Result: - Title: str - SubTitle: Optional[str] = None - IcoPath: Optional[Union[str, Path]] = None - Score: int = 0 - JsonRPCAction: Optional[JsonRPCAction] = None - ContextData: Optional[Iterable] = None - Glyph: Optional[Glyph] = None - CopyText: Optional[str] = None - AutoCompleteText: Optional[str] = None - RoundedIcon: bool = False - Preview: Optional[PreviewInfo] = None - - def as_dict(self) -> Dict[str, Any]: - return self.__dict__ - - def add_action(self, method: Method, - parameters: Optional[Iterable[Any]] = None, - *, - dont_hide_after_action: bool = False) -> None: - self.JsonRPCAction = { - "method": method.__name__, - "parameters": parameters or [], - "dontHideAfterAction": dont_hide_after_action - } - - -class ResultResponse(TypedDict): - result: List[Dict[str, Any]] - SettingsChange: NotRequired[Optional[Dict[str, Any]]] - - -def send_results(results: Iterable[Result], settings: Optional[Dict[str, Any]] = None) -> ResultResponse: - """Formats and returns results as a JsonRPCResponse""" - return {'result': [result.as_dict() for result in results], 'SettingsChange': settings} diff --git a/pyflowlauncher/settings.py b/pyflowlauncher/settings.py deleted file mode 100644 index ba5cbf9..0000000 --- a/pyflowlauncher/settings.py +++ /dev/null @@ -1,7 +0,0 @@ -from typing import Any, Dict -from .jsonrpc import JsonRPCClient - - -def settings() -> Dict[str, Any]: - """Retrieve the settings from Flow Launcher.""" - return JsonRPCClient().recieve().get('settings', {}) diff --git a/pyflowlauncher/shared.py b/pyflowlauncher/shared.py deleted file mode 100644 index 767d6b1..0000000 --- a/pyflowlauncher/shared.py +++ /dev/null @@ -1,14 +0,0 @@ -import logging -import sys -from pathlib import Path -from typing import Any - -_logger = logging.getLogger(__name__) - - -def logger(obj: Any) -> logging.Logger: - module_file = sys.modules[obj.__module__].__file__ - if module_file is not None: - module_name = Path(module_file).stem - return logging.getLogger(f"{module_name}.{obj.__class__.__name__}") - return logging.getLogger(f"{obj.__module__}.{obj.__name__}")