Skip to content

Commit

Permalink
Fallback components in generated formulas (#1037)
Browse files Browse the repository at this point in the history
Fallback components are used in generated formulas. If primary
components is unavailable, formula will generate metric from fallback
components. Fallback formulas are implemented for:
  - PVPowerFormula
  - ProducerPowerFormula
  - BatteryPowerFormula
  - ConsumerPowerFormula
  - GridPowerFormula

All necessary formulas are implemented in this PR
  • Loading branch information
ela-kotulska-frequenz authored Aug 30, 2024
2 parents 4641bd9 + c70ea97 commit 82dcb63
Show file tree
Hide file tree
Showing 23 changed files with 1,539 additions and 161 deletions.
7 changes: 6 additions & 1 deletion RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
- Fallback components are used in generated formulas. If primary components is unavailable, formula will generate metric from fallback components. Fallback formulas are implemented for:
- PVPowerFormula
- ProducerPowerFormula
- BatteryPowerFormula
- ConsumerPowerFormula
- GridPowerFormula

## Bug Fixes

Expand Down
1 change: 1 addition & 0 deletions src/frequenz/sdk/timeseries/battery_pool/_battery_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ def power(self) -> FormulaEngine[Power]:
BatteryPowerFormula,
FormulaGeneratorConfig(
component_ids=self._pool_ref_store._batteries,
allow_fallback=True,
),
)
assert isinstance(engine, FormulaEngine)
Expand Down
13 changes: 12 additions & 1 deletion src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
ConstantValue,
Consumption,
Divider,
FallbackMetricFetcher,
FormulaStep,
Maximizer,
MetricFetcher,
Expand Down Expand Up @@ -747,6 +748,7 @@ def push_metric(
data_stream: Receiver[Sample[QuantityT]],
*,
nones_are_zeros: bool,
fallback: FallbackMetricFetcher[QuantityT] | None = None,
) -> None:
"""Push a metric receiver into the engine.
Expand All @@ -755,9 +757,18 @@ def push_metric(
data_stream: A receiver to fetch this metric from.
nones_are_zeros: Whether to treat None values from the stream as 0s. If
False, the returned value will be a None.
fallback: Metric fetcher to use if primary one start sending
invalid data (e.g. due to a component stop). If None, the data from
primary metric fetcher will be used.
"""
fetcher = self._metric_fetchers.setdefault(
name, MetricFetcher(name, data_stream, nones_are_zeros=nones_are_zeros)
name,
MetricFetcher(
name,
data_stream,
nones_are_zeros=nones_are_zeros,
fallback=fallback,
),
)
self._steps.append(fetcher)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@

"""Formula generator from component graph for Grid Power."""

import itertools
import logging

from frequenz.client.microgrid import ComponentMetricId
from frequenz.client.microgrid import Component, ComponentCategory, ComponentMetricId

from ....microgrid import connection_manager
from ..._quantities import Power
from ...formula_engine import FormulaEngine
from ._fallback_formula_metric_fetcher import FallbackFormulaMetricFetcher
from ._formula_generator import (
NON_EXISTING_COMPONENT_ID,
ComponentNotFound,
FormulaGenerationError,
FormulaGenerator,
FormulaGeneratorConfig,
)

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -48,8 +51,8 @@ def generate(
builder = self._get_builder(
"battery-power", ComponentMetricId.ACTIVE_POWER, Power.from_watts
)
component_ids = self._config.component_ids
if not component_ids:

if not self._config.component_ids:
_logger.warning(
"No Battery component IDs specified. "
"Subscribing to the resampling actor with a non-existing "
Expand All @@ -63,43 +66,109 @@ def generate(
)
return builder.build()

component_ids = set(self._config.component_ids)
component_graph = connection_manager.get().component_graph
inv_bat_mapping: dict[Component, set[Component]] = {}

battery_inverters = frozenset(
frozenset(
for bat_id in component_ids:
inverters = set(
filter(
component_graph.is_battery_inverter,
component_graph.predecessors(bat_id),
)
)
for bat_id in component_ids
)

if not all(battery_inverters):
raise ComponentNotFound(
"All batteries must have at least one inverter as a predecessor."
)
if len(inverters) == 0:
raise ComponentNotFound(
"All batteries must have at least one inverter as a predecessor."
f"Battery ID {bat_id} has no inverter as a predecessor.",
)

all_connected_batteries = set()
for inverters in battery_inverters:
for inverter in inverters:
all_connected_batteries.update(
component_graph.successors(inverter.component_id)
all_connected_batteries = component_graph.successors(
inverter.component_id
)
battery_ids = set(
map(lambda battery: battery.component_id, all_connected_batteries)
)
if not battery_ids.issubset(component_ids):
raise FormulaGenerationError(
f"Not all batteries behind inverter {inverter.component_id} "
f"are requested. Missing: {battery_ids - component_ids}"
)

inv_bat_mapping[inverter] = all_connected_batteries

if self._config.allow_fallback:
fallbacks = self._get_fallback_formulas(inv_bat_mapping)

for idx, (primary_component, fallback_formula) in enumerate(
fallbacks.items()
):
if idx > 0:
builder.push_oper("+")

builder.push_component_metric(
primary_component.component_id,
nones_are_zeros=(
primary_component.category != ComponentCategory.METER
),
fallback=fallback_formula,
)
else:
for idx, comp in enumerate(inv_bat_mapping.keys()):
if idx > 0:
builder.push_oper("+")
builder.push_component_metric(comp.component_id, nones_are_zeros=True)

return builder.build()

def _get_fallback_formulas(
self, inv_bat_mapping: dict[Component, set[Component]]
) -> dict[Component, FallbackFormulaMetricFetcher[Power] | None]:
"""Find primary and fallback components and create fallback formulas.
if len(all_connected_batteries) != len(component_ids):
raise FormulaGenerationError(
"All batteries behind a set of inverters must be requested."
The primary component is the one that will be used to calculate the battery power.
If it is not available, the fallback formula will be used instead.
Fallback formulas calculate the battery power using the fallback components.
Fallback formulas are wrapped in `FallbackFormulaMetricFetcher`.
Args:
inv_bat_mapping: A mapping from inverter to connected batteries.
Returns:
A dictionary mapping primary components to their FallbackFormulaMetricFetcher.
"""
fallbacks = self._get_metric_fallback_components(set(inv_bat_mapping.keys()))

fallback_formulas: dict[
Component, FallbackFormulaMetricFetcher[Power] | None
] = {}
for primary_component, fallback_components in fallbacks.items():
if len(fallback_components) == 0:
fallback_formulas[primary_component] = None
continue

battery_ids = set(
map(
lambda battery: battery.component_id,
itertools.chain.from_iterable(
inv_bat_mapping[inv] for inv in fallback_components
),
)
)

builder.push_oper("(")
builder.push_oper("(")
# Iterate over the flattened list of inverters
for idx, comp in enumerate(
inverter for inverters in battery_inverters for inverter in inverters
):
if idx > 0:
builder.push_oper("+")
builder.push_component_metric(comp.component_id, nones_are_zeros=True)
generator = BatteryPowerFormula(
f"{self._namespace}_fallback_{battery_ids}",
self._channel_registry,
self._resampler_subscription_sender,
FormulaGeneratorConfig(
component_ids=battery_ids,
allow_fallback=False,
),
)

return builder.build()
fallback_formulas[primary_component] = FallbackFormulaMetricFetcher(
generator
)

return fallback_formulas
Loading

0 comments on commit 82dcb63

Please sign in to comment.