From c27237f86d5faf3738a998c5cee87eaf0fcc452e Mon Sep 17 00:00:00 2001 From: Marc Mueller <30130371+cdce8p@users.noreply.github.com> Date: Thu, 16 May 2024 17:15:39 +0200 Subject: [PATCH] WIP async singleton with HassKey --- .../components/assist_pipeline/pipeline.py | 21 ++++++++------- .../components/assist_pipeline/select.py | 12 ++++----- .../assist_pipeline/websocket_api.py | 11 ++++---- homeassistant/components/esphome/dashboard.py | 9 ++++--- homeassistant/helpers/singleton.py | 27 ++++++++++++++----- 5 files changed, 49 insertions(+), 31 deletions(-) diff --git a/homeassistant/components/assist_pipeline/pipeline.py b/homeassistant/components/assist_pipeline/pipeline.py index f8f6be3a40fdad..738d46f5a4de3a 100644 --- a/homeassistant/components/assist_pipeline/pipeline.py +++ b/homeassistant/components/assist_pipeline/pipeline.py @@ -49,6 +49,7 @@ language as language_util, ulid as ulid_util, ) +from homeassistant.util.hass_dict import HassKey from homeassistant.util.limited_size_dict import LimitedSizeDict from .audio_enhancer import AudioEnhancer, EnhancedAudioChunk, MicroVadSpeexEnhancer @@ -90,6 +91,8 @@ ("tts_engine", "tts_language"), ) +KEY_ASSIST_PIPELINE: HassKey[PipelineData] = HassKey(DOMAIN) + def validate_language(data: dict[str, Any]) -> Any: """Validate language settings.""" @@ -247,7 +250,7 @@ async def async_create_default_pipeline( The default pipeline will use the homeassistant conversation agent and the specified stt / tts engines. """ - pipeline_data: PipelineData = hass.data[DOMAIN] + pipeline_data = hass.data[KEY_ASSIST_PIPELINE] pipeline_store = pipeline_data.pipeline_store pipeline_settings = _async_resolve_default_pipeline_settings( hass, @@ -282,7 +285,7 @@ def _async_get_pipeline_from_conversation_entity( @callback def async_get_pipeline(hass: HomeAssistant, pipeline_id: str | None = None) -> Pipeline: """Get a pipeline by id or the preferred pipeline.""" - pipeline_data: PipelineData = hass.data[DOMAIN] + pipeline_data = hass.data[KEY_ASSIST_PIPELINE] if pipeline_id is None: # A pipeline was not specified, use the preferred one @@ -305,7 +308,7 @@ def async_get_pipeline(hass: HomeAssistant, pipeline_id: str | None = None) -> P @callback def async_get_pipelines(hass: HomeAssistant) -> list[Pipeline]: """Get all pipelines.""" - pipeline_data: PipelineData = hass.data[DOMAIN] + pipeline_data = hass.data[KEY_ASSIST_PIPELINE] return list(pipeline_data.pipeline_store.data.values()) @@ -328,7 +331,7 @@ async def async_update_pipeline( prefer_local_intents: bool | UndefinedType = UNDEFINED, ) -> None: """Update a pipeline.""" - pipeline_data: PipelineData = hass.data[DOMAIN] + pipeline_data = hass.data[KEY_ASSIST_PIPELINE] updates: dict[str, Any] = pipeline.to_json() updates.pop("id") @@ -586,7 +589,7 @@ def __post_init__(self) -> None: ): raise InvalidPipelineStagesError(self.start_stage, self.end_stage) - pipeline_data: PipelineData = self.hass.data[DOMAIN] + pipeline_data = self.hass.data[KEY_ASSIST_PIPELINE] if self.pipeline.id not in pipeline_data.pipeline_debug: pipeline_data.pipeline_debug[self.pipeline.id] = LimitedSizeDict( size_limit=STORED_PIPELINE_RUNS @@ -614,7 +617,7 @@ def __eq__(self, other: object) -> bool: def process_event(self, event: PipelineEvent) -> None: """Log an event and call listener.""" self.event_callback(event) - pipeline_data: PipelineData = self.hass.data[DOMAIN] + pipeline_data = self.hass.data[KEY_ASSIST_PIPELINE] if self.id not in pipeline_data.pipeline_debug[self.pipeline.id]: # This run has been evicted from the logged pipeline runs already return @@ -649,7 +652,7 @@ async def end(self) -> None: ) ) - pipeline_data: PipelineData = self.hass.data[DOMAIN] + pipeline_data = self.hass.data[KEY_ASSIST_PIPELINE] pipeline_data.pipeline_runs.remove_run(self) async def prepare_wake_word_detection(self) -> None: @@ -1207,7 +1210,7 @@ def _capture_chunk(self, audio_bytes: bytes | None) -> None: return # Forward to device audio capture - pipeline_data: PipelineData = self.hass.data[DOMAIN] + pipeline_data = self.hass.data[KEY_ASSIST_PIPELINE] audio_queue = pipeline_data.device_audio_queues.get(self._device_id) if audio_queue is None: return @@ -1858,7 +1861,7 @@ async def _async_migrate_func( return old_data -@singleton(DOMAIN) +@singleton(KEY_ASSIST_PIPELINE, async_=True) async def async_setup_pipeline_store(hass: HomeAssistant) -> PipelineData: """Set up the pipeline storage collection.""" pipeline_store = PipelineStorageCollection( diff --git a/homeassistant/components/assist_pipeline/select.py b/homeassistant/components/assist_pipeline/select.py index c7e4846aad73c3..a590f30fc7a11f 100644 --- a/homeassistant/components/assist_pipeline/select.py +++ b/homeassistant/components/assist_pipeline/select.py @@ -9,8 +9,8 @@ from homeassistant.core import HomeAssistant, callback from homeassistant.helpers import collection, entity_registry as er, restore_state -from .const import DOMAIN, OPTION_PREFERRED -from .pipeline import AssistDevice, PipelineData, PipelineStorageCollection +from .const import OPTION_PREFERRED +from .pipeline import KEY_ASSIST_PIPELINE, AssistDevice from .vad import VadSensitivity @@ -30,7 +30,7 @@ def get_chosen_pipeline( if state is None or state.state == OPTION_PREFERRED: return None - pipeline_store: PipelineStorageCollection = hass.data[DOMAIN].pipeline_store + pipeline_store = hass.data[KEY_ASSIST_PIPELINE].pipeline_store return next( (item.id for item in pipeline_store.async_items() if item.name == state.state), None, @@ -80,7 +80,7 @@ async def async_added_to_hass(self) -> None: """When entity is added to Home Assistant.""" await super().async_added_to_hass() - pipeline_data: PipelineData = self.hass.data[DOMAIN] + pipeline_data = self.hass.data[KEY_ASSIST_PIPELINE] pipeline_store = pipeline_data.pipeline_store self.async_on_remove( pipeline_store.async_add_change_set_listener(self._pipelines_updated) @@ -116,9 +116,7 @@ async def _pipelines_updated( @callback def _update_options(self) -> None: """Handle pipeline update.""" - pipeline_store: PipelineStorageCollection = self.hass.data[ - DOMAIN - ].pipeline_store + pipeline_store = self.hass.data[KEY_ASSIST_PIPELINE].pipeline_store options = [OPTION_PREFERRED] options.extend(sorted(item.name for item in pipeline_store.async_items())) self._attr_options = options diff --git a/homeassistant/components/assist_pipeline/websocket_api.py b/homeassistant/components/assist_pipeline/websocket_api.py index c96af655589934..b5a25034d26e3f 100644 --- a/homeassistant/components/assist_pipeline/websocket_api.py +++ b/homeassistant/components/assist_pipeline/websocket_api.py @@ -22,7 +22,6 @@ from .const import ( DEFAULT_PIPELINE_TIMEOUT, DEFAULT_WAKE_WORD_TIMEOUT, - DOMAIN, EVENT_RECORDING, SAMPLE_CHANNELS, SAMPLE_RATE, @@ -30,9 +29,9 @@ ) from .error import PipelineNotFound from .pipeline import ( + KEY_ASSIST_PIPELINE, AudioSettings, DeviceAudioQueue, - PipelineData, PipelineError, PipelineEvent, PipelineEventType, @@ -284,7 +283,7 @@ def websocket_list_runs( msg: dict[str, Any], ) -> None: """List pipeline runs for which debug data is available.""" - pipeline_data: PipelineData = hass.data[DOMAIN] + pipeline_data = hass.data[KEY_ASSIST_PIPELINE] pipeline_id = msg["pipeline_id"] if pipeline_id not in pipeline_data.pipeline_debug: @@ -320,7 +319,7 @@ def websocket_list_devices( msg: dict[str, Any], ) -> None: """List assist devices.""" - pipeline_data: PipelineData = hass.data[DOMAIN] + pipeline_data = hass.data[KEY_ASSIST_PIPELINE] ent_reg = er.async_get(hass) connection.send_result( msg["id"], @@ -351,7 +350,7 @@ def websocket_get_run( msg: dict[str, Any], ) -> None: """Get debug data for a pipeline run.""" - pipeline_data: PipelineData = hass.data[DOMAIN] + pipeline_data = hass.data[KEY_ASSIST_PIPELINE] pipeline_id = msg["pipeline_id"] pipeline_run_id = msg["pipeline_run_id"] @@ -456,7 +455,7 @@ async def websocket_device_capture( msg: dict[str, Any], ) -> None: """Capture raw audio from a satellite device and forward to client.""" - pipeline_data: PipelineData = hass.data[DOMAIN] + pipeline_data = hass.data[KEY_ASSIST_PIPELINE] device_id = msg["device_id"] # Number of seconds to record audio in wall clock time diff --git a/homeassistant/components/esphome/dashboard.py b/homeassistant/components/esphome/dashboard.py index b0a37aefd0d2d9..334c16e57301ad 100644 --- a/homeassistant/components/esphome/dashboard.py +++ b/homeassistant/components/esphome/dashboard.py @@ -12,6 +12,7 @@ from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.singleton import singleton from homeassistant.helpers.storage import Store +from homeassistant.util.hass_dict import HassKey from .const import DOMAIN from .coordinator import ESPHomeDashboardCoordinator @@ -19,7 +20,9 @@ _LOGGER = logging.getLogger(__name__) -KEY_DASHBOARD_MANAGER = "esphome_dashboard_manager" +KEY_DASHBOARD_MANAGER: HassKey[ESPHomeDashboardManager] = HassKey( + "esphome_dashboard_manager" +) STORAGE_KEY = "esphome.dashboard" STORAGE_VERSION = 1 @@ -33,7 +36,7 @@ async def async_setup(hass: HomeAssistant) -> None: await async_get_or_create_dashboard_manager(hass) -@singleton(KEY_DASHBOARD_MANAGER) +@singleton(KEY_DASHBOARD_MANAGER, async_=True) async def async_get_or_create_dashboard_manager( hass: HomeAssistant, ) -> ESPHomeDashboardManager: @@ -140,7 +143,7 @@ def async_get_dashboard(hass: HomeAssistant) -> ESPHomeDashboardCoordinator | No where manager can be an asyncio.Event instead of the actual manager because the singleton decorator is not yet done. """ - manager: ESPHomeDashboardManager | None = hass.data.get(KEY_DASHBOARD_MANAGER) + manager = hass.data.get(KEY_DASHBOARD_MANAGER) return manager.async_get() if manager else None diff --git a/homeassistant/helpers/singleton.py b/homeassistant/helpers/singleton.py index 20e4ee82162de9..86a31772a1a074 100644 --- a/homeassistant/helpers/singleton.py +++ b/homeassistant/helpers/singleton.py @@ -3,15 +3,22 @@ from __future__ import annotations import asyncio -from collections.abc import Callable +from collections.abc import Callable, Coroutine import functools -from typing import Any, cast, overload +from typing import Any, Literal, cast, overload from homeassistant.core import HomeAssistant from homeassistant.loader import bind_hass from homeassistant.util.hass_dict import HassKey type _FuncType[_T] = Callable[[HomeAssistant], _T] +type _Coro[_T] = Coroutine[Any, Any, _T] + + +@overload +def singleton[_T]( + data_key: HassKey[_T], *, async_: Literal[True] +) -> Callable[[_FuncType[_Coro[_T]]], _FuncType[_Coro[_T]]]: ... @overload @@ -24,13 +31,21 @@ def singleton[_T]( def singleton[_T](data_key: str) -> Callable[[_FuncType[_T]], _FuncType[_T]]: ... -def singleton[_T](data_key: Any) -> Callable[[_FuncType[_T]], _FuncType[_T]]: +def singleton[_T]( + data_key: Any, *, async_: bool = False +) -> Callable[[_FuncType[_T]], _FuncType[_T]]: """Decorate a function that should be called once per instance. Result will be cached and simultaneous calls will be handled. """ - def wrapper(func: _FuncType[_T]) -> _FuncType[_T]: + @overload + def wrapper(func: _FuncType[_Coro[_T]]) -> _FuncType[_Coro[_T]]: ... + + @overload + def wrapper(func: _FuncType[_T]) -> _FuncType[_T]: ... + + def wrapper(func: _FuncType[_Coro[_T] | _T]) -> _FuncType[_Coro[_T] | _T]: # type: ignore[misc] """Wrap a function with caching logic.""" if not asyncio.iscoroutinefunction(func): @@ -46,7 +61,7 @@ def wrapped(hass: HomeAssistant) -> _T: @bind_hass @functools.wraps(func) - async def async_wrapped(hass: HomeAssistant) -> Any: + async def async_wrapped(hass: HomeAssistant) -> _T: if data_key not in hass.data: evt = hass.data[data_key] = asyncio.Event() result = await func(hass) @@ -62,6 +77,6 @@ async def async_wrapped(hass: HomeAssistant) -> Any: return cast(_T, obj_or_evt) - return async_wrapped # type: ignore[return-value] + return async_wrapped return wrapper