Skip to content

Commit

Permalink
WIP: formula engine
Browse files Browse the repository at this point in the history
  • Loading branch information
llucax committed Feb 12, 2024
1 parent 4c3ec4e commit 59f3506
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 83 deletions.
84 changes: 50 additions & 34 deletions src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from abc import ABC
from collections import deque
from collections.abc import Callable
from typing import Generic, TypeVar, Union, overload
from typing import Generic, SupportsFloat, TypeVar, Union, overload

from frequenz.channels import Broadcast, Receiver

Expand Down Expand Up @@ -40,6 +40,12 @@

_logger = logging.Logger(__name__)

SupportsFloatInputT = TypeVar("SupportsFloatInputT", bound=SupportsFloat)
"""Type variable for inputs that support conversion to float."""

SupportsFloatOutputT = TypeVar("SupportsFloatOutputT", bound=SupportsFloat)
"""Type variable for outputs that support conversion to float."""

_operator_precedence = {
"max": 0,
"min": 1,
Expand Down Expand Up @@ -95,11 +101,17 @@
# but mypy doesn't support that, so we need to use `# type: ignore` in several places in
# this, and subsequent classes, to avoid mypy errors.
class _ComposableFormulaEngine(
ABC, Generic[_GenericEngine, _GenericHigherOrderBuilder, SupportsFloatT]
ABC,
Generic[
_GenericEngine,
_GenericHigherOrderBuilder,
SupportsFloatInputT,
SupportsFloatOutputT,
],
):
"""A base class for formula engines."""

_create_method: Callable[[float], SupportsFloatT]
_create_method: Callable[[float], SupportsFloatOutputT]
_higher_order_builder: type[_GenericHigherOrderBuilder]
_task: asyncio.Task[None] | None = None

Expand All @@ -110,8 +122,7 @@ async def _stop(self) -> None:
await cancel_and_await(self._task)

def __add__(
self,
other: _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatT,
self, other: _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatInputT
) -> _GenericHigherOrderBuilder:
"""Return a formula builder that adds (data in) `other` to `self`.
Expand All @@ -126,7 +137,7 @@ def __add__(
return self._higher_order_builder(self, self._create_method) + other # type: ignore

def __sub__(
self, other: _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatT
self, other: _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatInputT
) -> _GenericHigherOrderBuilder:
"""Return a formula builder that subtracts (data in) `other` from `self`.
Expand Down Expand Up @@ -171,7 +182,7 @@ def __truediv__(
return self._higher_order_builder(self, self._create_method) / other # type: ignore

def max(
self, other: _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatT
self, other: _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatInputT
) -> _GenericHigherOrderBuilder:
"""Return a formula engine that outputs the maximum of `self` and `other`.
Expand All @@ -186,7 +197,7 @@ def max(
return self._higher_order_builder(self, self._create_method).max(other) # type: ignore

def min(
self, other: _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatT
self, other: _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatInputT
) -> _GenericHigherOrderBuilder:
"""Return a formula engine that outputs the minimum of `self` and `other`.
Expand Down Expand Up @@ -221,11 +232,11 @@ def production(self) -> _GenericHigherOrderBuilder:


class FormulaEngine(
Generic[SupportsFloatT],
Generic[SupportsFloatInputT, SupportsFloatOutputT],
_ComposableFormulaEngine[
"FormulaEngine", # type: ignore[type-arg]
"HigherOrderFormulaBuilder", # type: ignore[type-arg]
SupportsFloatT,
SupportsFloatOutputT,
],
):
"""[`FormulaEngine`][frequenz.sdk.timeseries.formula_engine.FormulaEngine]s are a
Expand Down Expand Up @@ -294,8 +305,8 @@ class FormulaEngine(

def __init__(
self,
builder: FormulaBuilder[SupportsFloatT],
create_method: Callable[[float], SupportsFloatT],
builder: FormulaBuilder[SupportsFloatInputT, SupportsFloatOutputT],
create_method: Callable[[float], SupportsFloatOutputT],
) -> None:
"""Create a `FormulaEngine` instance.
Expand All @@ -308,19 +319,21 @@ def __init__(
"""
self._higher_order_builder = HigherOrderFormulaBuilder
self._name: str = builder.name
self._builder: FormulaBuilder[SupportsFloatT] = builder
self._builder: FormulaBuilder[SupportsFloatInputT, SupportsFloatOutputT] = (
builder
)
self._create_method = create_method
self._channel: Broadcast[Sample[SupportsFloatT]] = Broadcast(self._name)
self._channel: Broadcast[Sample[SupportsFloatInputT]] = Broadcast(self._name)

@classmethod
def from_receiver(
cls,
name: str,
receiver: Receiver[Sample[SupportsFloatT]],
create_method: Callable[[float], SupportsFloatT],
receiver: Receiver[Sample[SupportsFloatInputT]],
create_method: Callable[[float], SupportsFloatOutputT],
*,
nones_are_zeros: bool = False,
) -> FormulaEngine[SupportsFloatT]:
) -> FormulaEngine[SupportsFloatInputT, SupportsFloatOutputT]:
"""
Create a formula engine from a receiver.
Expand Down Expand Up @@ -370,7 +383,7 @@ async def run() -> None:
async def _run(self) -> None:
await self._builder.subscribe()
steps, metric_fetchers = self._builder.finalize()
evaluator = FormulaEvaluator[SupportsFloatT](
evaluator = FormulaEvaluator[SupportsFloatInputT, SupportsFloatOutputT](
self._name, steps, metric_fetchers, self._create_method
)
sender = self._channel.new_sender()
Expand Down Expand Up @@ -556,7 +569,7 @@ def new_receiver(
return self._channel.new_receiver(name, max_size)


class FormulaBuilder(Generic[SupportsFloatT]):
class FormulaBuilder(Generic[SupportsFloatInputT, SupportsFloatOutputT]):
"""Builds a post-fix formula engine that operates on `Sample` receivers.
Operators and metrics need to be pushed in in-fix order, and they get rearranged
Expand Down Expand Up @@ -585,7 +598,7 @@ class FormulaBuilder(Generic[SupportsFloatT]):
"""

def __init__(
self, name: str, create_method: Callable[[float], SupportsFloatT]
self, name: str, create_method: Callable[[float], SupportsFloatOutputT]
) -> None:
"""Create a `FormulaBuilder` instance.
Expand All @@ -596,10 +609,10 @@ def __init__(
`Power.from_watts`, for example.
"""
self._name = name
self._create_method: Callable[[float], SupportsFloatT] = create_method
self._create_method: Callable[[float], SupportsFloatOutputT] = create_method
self._build_stack: list[FormulaStep] = []
self._steps: list[FormulaStep] = []
self._metric_fetchers: dict[str, MetricFetcher[SupportsFloatT]] = {}
self._metric_fetchers: dict[str, MetricFetcher[SupportsFloatInputT]] = {}

def push_oper(self, oper: str) -> None: # pylint: disable=too-many-branches
"""Push an operator into the engine.
Expand Down Expand Up @@ -643,7 +656,7 @@ def push_oper(self, oper: str) -> None: # pylint: disable=too-many-branches
def push_metric(
self,
name: str,
data_stream: Receiver[Sample[SupportsFloatT]],
data_stream: Receiver[Sample[SupportsFloatInputT]],
*,
nones_are_zeros: bool,
) -> None:
Expand Down Expand Up @@ -735,7 +748,7 @@ async def subscribe(self) -> None:

def finalize(
self,
) -> tuple[list[FormulaStep], dict[str, MetricFetcher[SupportsFloatT]]]:
) -> tuple[list[FormulaStep], dict[str, MetricFetcher[SupportsFloatInputT]]]:
"""Finalize and return the steps and fetchers for the formula.
Returns:
Expand All @@ -755,7 +768,7 @@ def __str__(self) -> str:
steps = self._steps if len(self._steps) > 0 else self._build_stack
return format_formula(steps)

def build(self) -> FormulaEngine[SupportsFloatT]:
def build(self) -> FormulaEngine[SupportsFloatInputT, SupportsFloatOutputT]:
"""Create a formula engine with the steps and fetchers that have been pushed.
Returns:
Expand All @@ -765,13 +778,16 @@ def build(self) -> FormulaEngine[SupportsFloatT]:
return FormulaEngine(self, create_method=self._create_method)


class _BaseHOFormulaBuilder(ABC, Generic[SupportsFloatT]):
class _BaseHOFormulaBuilder(ABC, Generic[SupportsFloatInputT, SupportsFloatOutputT]):
"""Provides a way to build formulas from the outputs of other formulas."""

def __init__(
self,
engine: FormulaEngine[SupportsFloatT] | FormulaEngine3Phase[SupportsFloatT],
create_method: Callable[[float], SupportsFloatT],
engine: (
FormulaEngine[SupportsFloatInputT, SupportsFloatOutputT]
| FormulaEngine3Phase[SupportsFloatT]
),
create_method: Callable[[float], SupportsFloatOutputT],
) -> None:
"""Create a `GenericHigherOrderFormulaBuilder` instance.
Expand All @@ -785,20 +801,20 @@ def __init__(
self._steps: deque[
tuple[
TokenType,
FormulaEngine[SupportsFloatT]
FormulaEngine[SupportsFloatInputT, SupportsFloatOutputT]
| FormulaEngine3Phase[SupportsFloatT]
| Quantity
| float
| str,
]
] = deque()
self._steps.append((TokenType.COMPONENT_METRIC, engine))
self._create_method: Callable[[float], SupportsFloatT] = create_method
self._create_method: Callable[[float], SupportsFloatOutputT] = create_method

@overload
def _push(
self, oper: str, other: _CompositionType1Phase
) -> HigherOrderFormulaBuilder[SupportsFloatT]: ...
) -> HigherOrderFormulaBuilder[SupportsFloatInputT, SupportsFloatOutputT]: ...

@overload
def _push(
Expand Down Expand Up @@ -1061,13 +1077,13 @@ def production(


class HigherOrderFormulaBuilder(
Generic[SupportsFloatT], _BaseHOFormulaBuilder[SupportsFloatT]
_BaseHOFormulaBuilder[SupportsFloatInputT, SupportsFloatT]
):
"""A specialization of the _BaseHOFormulaBuilder for `FormulaReceiver`."""

def build(
self, name: str, *, nones_are_zeros: bool = False
) -> FormulaEngine[SupportsFloatT]:
) -> FormulaEngine[SupportsFloatInputT, SupportsFloatT]:
"""Build a `FormulaEngine` instance from the builder.
Args:
Expand Down Expand Up @@ -1099,7 +1115,7 @@ def build(


class HigherOrderFormulaBuilder3Phase(
Generic[SupportsFloatT], _BaseHOFormulaBuilder[SupportsFloatT]
_BaseHOFormulaBuilder[SupportsFloatInputT, SupportsFloatT]
):
"""A specialization of the _BaseHOFormulaBuilder for `FormulaReceiver3Phase`."""

Expand Down
30 changes: 20 additions & 10 deletions src/frequenz/sdk/timeseries/formula_engine/_formula_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,33 @@
from collections.abc import Callable
from datetime import datetime
from math import isinf, isnan
from typing import Generic
from typing import Generic, SupportsFloat, TypeVar

from .._base_types import Sample, SupportsFloatT
from ._formula_steps import FormulaStep, MetricFetcher

SupportsFloatInputT = TypeVar("SupportsFloatInputT", bound=SupportsFloat)
"""Type variable for inputs that support conversion to float."""

class FormulaEvaluator(Generic[SupportsFloatT]):
"""A post-fix formula evaluator that operates on `Sample` receivers."""
SupportsFloatOutputT = TypeVar("SupportsFloatOutputT", bound=SupportsFloat)
"""Type variable for outputs that support conversion to float."""


class FormulaEvaluator(Generic[SupportsFloatInputT, SupportsFloatOutputT]):
"""A post-fix formula evaluator that operates on `Sample` receivers.
This formula evaluator takes [`float`][] samples as input and produces
`SupportFloatT` samples.
"""

def __init__(
self,
name: str,
steps: list[FormulaStep],
metric_fetchers: dict[str, MetricFetcher[SupportsFloatT]],
create_method: Callable[[float], SupportsFloatT],
metric_fetchers: dict[str, MetricFetcher[SupportsFloatInputT]],
create_method: Callable[[float], SupportsFloatOutputT],
) -> None:
"""Create a `FormulaEngine` instance.
"""Initialize this instance.
Args:
name: A name for the formula.
Expand All @@ -35,14 +45,14 @@ def __init__(
"""
self._name = name
self._steps = steps
self._metric_fetchers: dict[str, MetricFetcher[SupportsFloatT]] = (
self._metric_fetchers: dict[str, MetricFetcher[SupportsFloatInputT]] = (
metric_fetchers
)
self._first_run = True
self._create_method: Callable[[float], SupportsFloatT] = create_method
self._create_method: Callable[[float], SupportsFloatOutputT] = create_method

async def _synchronize_metric_timestamps(
self, metrics: set[asyncio.Task[Sample[SupportsFloatT] | None]]
self, metrics: set[asyncio.Task[Sample[SupportsFloatInputT] | None]]
) -> datetime:
"""Synchronize the metric streams.
Expand Down Expand Up @@ -89,7 +99,7 @@ async def _synchronize_metric_timestamps(
self._first_run = False
return latest_ts

async def apply(self) -> Sample[SupportsFloatT]:
async def apply(self) -> Sample[SupportsFloatOutputT]:
"""Fetch the latest metrics, apply the formula once and return the result.
Returns:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from __future__ import annotations

from collections.abc import Callable
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, get_type_hints

from frequenz.channels import Receiver, Sender

Expand Down Expand Up @@ -54,6 +54,15 @@ def __init__( # pylint: disable=too-many-arguments
self._namespace: str = namespace
self._metric_id: ComponentMetricId = metric_id
self._resampler_requests: list[ComponentMetricRequest] = []
# We need to store the runtime value type of the formula, so that we can
# create the correct channel in the channel registry, as we need to pass the
# runtime type to the channel registry.
# Since invoking the function seems to be the only reliable way to do this
# (trying to get it from the type hints doesn't work because usually `Self`
# is used as the return type), we do it only once in the constructor to avoid
# unnecessary runtime cost.
self._value_type = type(create_method(0.0))

super().__init__(formula_name, create_method)

def _get_resampled_receiver(
Expand All @@ -74,7 +83,7 @@ def _get_resampled_receiver(
request = ComponentMetricRequest(self._namespace, component_id, metric_id, None)
self._resampler_requests.append(request)
resampled_channel = self._channel_registry.get_or_create(
Sample[SupportsFloatT], request.get_channel_name()
Sample[float], request.get_channel_name()
)
resampled_receiver = resampled_channel.new_receiver().map(
lambda sample: Sample(
Expand Down
1 change: 1 addition & 0 deletions tests/microgrid/test_grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ async def test_grid_power_1(mocker: MockerFixture) -> None:

grid_power_recv = grid.power.new_receiver()

# TODO: REMOVE THIS and validate the test against hardcoded values
grid_meter_recv = get_resampled_stream(
grid._formula_pool._namespace, # pylint: disable=protected-access
mockgrid.meter_ids[0],
Expand Down
7 changes: 0 additions & 7 deletions tests/timeseries/_formula_engine/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,9 @@ def get_resampled_stream(
metric_id,
create_method,
)
# Resampled data is always `Quantity` type, so we need to convert it to the desired
# output type.
return builder._get_resampled_receiver(
comp_id,
metric_id,
).map(
lambda sample: Sample(
sample.timestamp,
None if sample.value is None else create_method(sample.value.base_value),
)
)
# pylint: enable=protected-access

Expand Down
Loading

0 comments on commit 59f3506

Please sign in to comment.