Skip to content

Commit 64098eb

Browse files
committed
WIP Quantity ABC
1 parent 922c972 commit 64098eb

File tree

8 files changed

+159
-141
lines changed

8 files changed

+159
-141
lines changed

src/frequenz/sdk/timeseries/formula_engine/_formula_engine.py

Lines changed: 96 additions & 92 deletions
Large diffs are not rendered by default.

src/frequenz/sdk/timeseries/formula_engine/_formula_engine_pool.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55

66
from __future__ import annotations
77

8-
from typing import TYPE_CHECKING
8+
from typing import TYPE_CHECKING, Callable, cast
99

1010
from frequenz.channels import Sender
1111

1212
from ...microgrid.component import ComponentMetricId
13-
from .._quantities import Current, Power, Quantity
13+
from .._quantities import Current, Power, Quantity, SupportsFloatT
1414
from ._formula_generators._formula_generator import (
1515
FormulaGenerator,
1616
FormulaGeneratorConfig,
@@ -59,14 +59,16 @@ def from_string(
5959
formula: str,
6060
component_metric_id: ComponentMetricId,
6161
*,
62+
value_constructor: Callable[[float], SupportsFloatT],
6263
nones_are_zeros: bool = False,
63-
) -> FormulaEngine[Quantity]:
64+
) -> FormulaEngine[SupportsFloatT]:
6465
"""Get a receiver for a manual formula.
6566
6667
Args:
6768
formula: formula to execute.
6869
component_metric_id: The metric ID to use when fetching receivers from the
6970
resampling actor.
71+
value_constructor: A function to create new values with.
7072
nones_are_zeros: Whether to treat None values from the stream as 0s. If
7173
False, the returned value will be a None.
7274
@@ -75,18 +77,22 @@ def from_string(
7577
"""
7678
channel_key = formula + component_metric_id.value
7779
if channel_key in self._string_engines:
78-
return self._string_engines[channel_key]
80+
return cast(
81+
FormulaEngine[SupportsFloatT], self._string_engines[channel_key]
82+
)
7983

80-
builder = ResampledFormulaBuilder(
84+
builder: ResampledFormulaBuilder[SupportsFloatT] = ResampledFormulaBuilder(
8185
self._namespace,
8286
formula,
8387
self._channel_registry,
8488
self._resampler_subscription_sender,
8589
component_metric_id,
86-
Quantity,
90+
value_constructor,
8791
)
8892
formula_engine = builder.from_string(formula, nones_are_zeros=nones_are_zeros)
89-
self._string_engines[channel_key] = formula_engine
93+
self._string_engines[channel_key] = cast(
94+
FormulaEngine[Quantity], formula_engine
95+
)
9096

9197
return formula_engine
9298

src/frequenz/sdk/timeseries/formula_engine/_formula_evaluator.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,19 @@
99
from math import isinf, isnan
1010
from typing import Generic
1111

12-
from .. import Sample
13-
from .._quantities import QuantityT
12+
from .._base_types import Sample, SupportsFloatT
1413
from ._formula_steps import FormulaStep, MetricFetcher
1514

1615

17-
class FormulaEvaluator(Generic[QuantityT]):
16+
class FormulaEvaluator(Generic[SupportsFloatT]):
1817
"""A post-fix formula evaluator that operates on `Sample` receivers."""
1918

2019
def __init__(
2120
self,
2221
name: str,
2322
steps: list[FormulaStep],
24-
metric_fetchers: dict[str, MetricFetcher[QuantityT]],
25-
create_method: Callable[[float], QuantityT],
23+
metric_fetchers: dict[str, MetricFetcher[SupportsFloatT]],
24+
create_method: Callable[[float], SupportsFloatT],
2625
) -> None:
2726
"""Create a `FormulaEngine` instance.
2827
@@ -36,12 +35,14 @@ def __init__(
3635
"""
3736
self._name = name
3837
self._steps = steps
39-
self._metric_fetchers: dict[str, MetricFetcher[QuantityT]] = metric_fetchers
38+
self._metric_fetchers: dict[str, MetricFetcher[SupportsFloatT]] = (
39+
metric_fetchers
40+
)
4041
self._first_run = True
41-
self._create_method: Callable[[float], QuantityT] = create_method
42+
self._create_method: Callable[[float], SupportsFloatT] = create_method
4243

4344
async def _synchronize_metric_timestamps(
44-
self, metrics: set[asyncio.Task[Sample[QuantityT] | None]]
45+
self, metrics: set[asyncio.Task[Sample[SupportsFloatT] | None]]
4546
) -> datetime:
4647
"""Synchronize the metric streams.
4748
@@ -88,7 +89,7 @@ async def _synchronize_metric_timestamps(
8889
self._first_run = False
8990
return latest_ts
9091

91-
async def apply(self) -> Sample[QuantityT]:
92+
async def apply(self) -> Sample[SupportsFloatT]:
9293
"""Fetch the latest metrics, apply the formula once and return the result.
9394
9495
Returns:

src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_formula_generator.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
from ....microgrid import component, connection_manager
1818
from ....microgrid.component import ComponentMetricId
19-
from ..._quantities import QuantityT
19+
from ..._base_types import SupportsFloatT
2020
from .._formula_engine import FormulaEngine, FormulaEngine3Phase
2121
from .._resampled_formula_builder import ResampledFormulaBuilder
2222

@@ -52,7 +52,7 @@ class FormulaGeneratorConfig:
5252
"""The component IDs to use for generating the formula."""
5353

5454

55-
class FormulaGenerator(ABC, Generic[QuantityT]):
55+
class FormulaGenerator(ABC, Generic[SupportsFloatT]):
5656
"""A class for generating formulas from the component graph."""
5757

5858
def __init__(
@@ -83,8 +83,8 @@ def _get_builder(
8383
self,
8484
name: str,
8585
component_metric_id: ComponentMetricId,
86-
create_method: Callable[[float], QuantityT],
87-
) -> ResampledFormulaBuilder[QuantityT]:
86+
create_method: Callable[[float], SupportsFloatT],
87+
) -> ResampledFormulaBuilder[SupportsFloatT]:
8888
builder = ResampledFormulaBuilder(
8989
self._namespace,
9090
name,
@@ -140,5 +140,5 @@ def _get_grid_component_successors(self) -> set[component.Component]:
140140
@abstractmethod
141141
def generate(
142142
self,
143-
) -> FormulaEngine[QuantityT] | FormulaEngine3Phase[QuantityT]:
143+
) -> FormulaEngine[SupportsFloatT] | FormulaEngine3Phase[SupportsFloatT]:
144144
"""Generate a formula engine, based on the component graph."""

src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@
77

88
import math
99
from abc import ABC, abstractmethod
10-
from typing import Generic
10+
from typing import Generic, SupportsFloat, TypeVar
1111

1212
from frequenz.channels import Receiver
1313

1414
from .. import Sample
15-
from .._quantities import QuantityT
15+
16+
SupportsFloatT = TypeVar("SupportsFloatT", bound=SupportsFloat)
17+
"""Type variable for types that support conversion to float."""
1618

1719

1820
class FormulaStep(ABC):
@@ -343,13 +345,13 @@ def apply(self, eval_stack: list[float]) -> None:
343345
eval_stack.append(val)
344346

345347

346-
class MetricFetcher(Generic[QuantityT], FormulaStep):
348+
class MetricFetcher(Generic[SupportsFloatT], FormulaStep):
347349
"""A formula step for fetching a value from a metric Receiver."""
348350

349351
def __init__(
350352
self,
351353
name: str,
352-
stream: Receiver[Sample[QuantityT]],
354+
stream: Receiver[Sample[SupportsFloatT]],
353355
*,
354356
nones_are_zeros: bool,
355357
) -> None:
@@ -361,12 +363,12 @@ def __init__(
361363
nones_are_zeros: Whether to treat None values from the stream as 0s.
362364
"""
363365
self._name = name
364-
self._stream: Receiver[Sample[QuantityT]] = stream
365-
self._next_value: Sample[QuantityT] | None = None
366+
self._stream: Receiver[Sample[SupportsFloatT]] = stream
367+
self._next_value: Sample[SupportsFloatT] | None = None
366368
self._nones_are_zeros = nones_are_zeros
367369

368370
@property
369-
def stream(self) -> Receiver[Sample[QuantityT]]:
371+
def stream(self) -> Receiver[Sample[SupportsFloatT]]:
370372
"""Return the stream from which to fetch values.
371373
372374
Returns:
@@ -382,7 +384,7 @@ def stream_name(self) -> str:
382384
"""
383385
return str(self._stream.__doc__)
384386

385-
async def fetch_next(self) -> Sample[QuantityT] | None:
387+
async def fetch_next(self) -> Sample[SupportsFloatT] | None:
386388
"""Fetch the next value from the stream.
387389
388390
To be called before each call to `apply`.
@@ -394,7 +396,7 @@ async def fetch_next(self) -> Sample[QuantityT] | None:
394396
return self._next_value
395397

396398
@property
397-
def value(self) -> Sample[QuantityT] | None:
399+
def value(self) -> Sample[SupportsFloatT] | None:
398400
"""Get the next value in the stream.
399401
400402
Returns:
@@ -423,10 +425,10 @@ def apply(self, eval_stack: list[float]) -> None:
423425
raise RuntimeError("No next value available to append.")
424426

425427
next_value = self._next_value.value
426-
if next_value is None or next_value.isnan() or next_value.isinf():
428+
if next_value is None or math.isnan(next_value) or math.isinf(next_value):
427429
if self._nones_are_zeros:
428430
eval_stack.append(0.0)
429431
else:
430432
eval_stack.append(math.nan)
431433
else:
432-
eval_stack.append(next_value.base_value)
434+
eval_stack.append(float(next_value))

src/frequenz/sdk/timeseries/formula_engine/_resampled_formula_builder.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@
1111
from frequenz.channels import Receiver, Sender
1212

1313
from ...microgrid.component import ComponentMetricId
14-
from .. import Sample
15-
from .._quantities import Quantity, QuantityT
14+
from .._base_types import Sample, SupportsFloatT
1615
from ._formula_engine import FormulaBuilder, FormulaEngine
1716
from ._tokenizer import Tokenizer, TokenType
1817

@@ -21,7 +20,7 @@
2120
from ...actor import ChannelRegistry, ComponentMetricRequest
2221

2322

24-
class ResampledFormulaBuilder(FormulaBuilder[QuantityT]):
23+
class ResampledFormulaBuilder(FormulaBuilder[SupportsFloatT]):
2524
"""Provides a way to build a FormulaEngine from resampled data streams."""
2625

2726
def __init__( # pylint: disable=too-many-arguments
@@ -31,7 +30,7 @@ def __init__( # pylint: disable=too-many-arguments
3130
channel_registry: ChannelRegistry,
3231
resampler_subscription_sender: Sender[ComponentMetricRequest],
3332
metric_id: ComponentMetricId,
34-
create_method: Callable[[float], QuantityT],
33+
create_method: Callable[[float], SupportsFloatT],
3534
) -> None:
3635
"""Create a `ResampledFormulaBuilder` instance.
3736
@@ -55,11 +54,11 @@ def __init__( # pylint: disable=too-many-arguments
5554
self._namespace: str = namespace
5655
self._metric_id: ComponentMetricId = metric_id
5756
self._resampler_requests: list[ComponentMetricRequest] = []
58-
super().__init__(formula_name, create_method) # type: ignore[arg-type]
57+
super().__init__(formula_name, create_method)
5958

6059
def _get_resampled_receiver(
6160
self, component_id: int, metric_id: ComponentMetricId
62-
) -> Receiver[Sample[QuantityT]]:
61+
) -> Receiver[Sample[SupportsFloatT]]:
6362
"""Get a receiver with the resampled data for the given component id.
6463
6564
Args:
@@ -75,13 +74,13 @@ def _get_resampled_receiver(
7574
request = ComponentMetricRequest(self._namespace, component_id, metric_id, None)
7675
self._resampler_requests.append(request)
7776
resampled_channel = self._channel_registry.get_or_create(
78-
Sample[Quantity], request.get_channel_name()
77+
Sample[SupportsFloatT], request.get_channel_name()
7978
)
8079
resampled_receiver = resampled_channel.new_receiver().map(
8180
lambda sample: Sample(
8281
sample.timestamp,
8382
(
84-
self._create_method(sample.value.base_value)
83+
self._create_method(float(sample.value))
8584
if sample.value is not None
8685
else None
8786
),
@@ -112,7 +111,7 @@ def from_string(
112111
formula: str,
113112
*,
114113
nones_are_zeros: bool,
115-
) -> FormulaEngine[QuantityT]:
114+
) -> FormulaEngine[SupportsFloatT]:
116115
"""Construct a `FormulaEngine` from the given formula string.
117116
118117
Formulas can have Component IDs that are preceeded by a pound symbol("#"), and

src/frequenz/sdk/timeseries/logical_meter/_logical_meter.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@
55

66

77
import uuid
8+
from typing import Callable
89

910
from frequenz.channels import Sender
1011

1112
from ...actor import ChannelRegistry, ComponentMetricRequest
1213
from ...microgrid.component import ComponentMetricId
13-
from .._quantities import Power, Quantity
14+
from .._quantities import Power, SupportsFloatT
1415
from ..formula_engine import FormulaEngine
1516
from ..formula_engine._formula_engine_pool import FormulaEnginePool
1617
from ..formula_engine._formula_generators import CHPPowerFormula, PVPowerFormula
@@ -99,8 +100,9 @@ def start_formula(
99100
formula: str,
100101
component_metric_id: ComponentMetricId,
101102
*,
103+
value_constructor: Callable[[float], SupportsFloatT],
102104
nones_are_zeros: bool = False,
103-
) -> FormulaEngine[Quantity]:
105+
) -> FormulaEngine[SupportsFloatT]:
104106
"""Start execution of the given formula.
105107
106108
Formulas can have Component IDs that are preceeded by a pound symbol("#"), and
@@ -113,14 +115,18 @@ def start_formula(
113115
formula: formula to execute.
114116
component_metric_id: The metric ID to use when fetching receivers from the
115117
resampling actor.
118+
value_constructor: A function to create new values with.
116119
nones_are_zeros: Whether to treat None values from the stream as 0s. If
117120
False, the returned value will be a None.
118121
119122
Returns:
120123
A FormulaEngine that applies the formula and streams values.
121124
"""
122125
return self._formula_pool.from_string(
123-
formula, component_metric_id, nones_are_zeros=nones_are_zeros
126+
formula,
127+
component_metric_id,
128+
value_constructor=value_constructor,
129+
nones_are_zeros=nones_are_zeros,
124130
)
125131

126132
@property

tests/timeseries/_formula_engine/utils.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from frequenz.sdk.microgrid import _data_pipeline
1313
from frequenz.sdk.microgrid.component import ComponentMetricId
1414
from frequenz.sdk.timeseries import Sample
15-
from frequenz.sdk.timeseries._quantities import QuantityT
15+
from frequenz.sdk.timeseries._quantities import SupportsFloatT
1616
from frequenz.sdk.timeseries.formula_engine._resampled_formula_builder import (
1717
ResampledFormulaBuilder,
1818
)
@@ -22,8 +22,8 @@ def get_resampled_stream(
2222
namespace: str,
2323
comp_id: int,
2424
metric_id: ComponentMetricId,
25-
create_method: Callable[[float], QuantityT],
26-
) -> Receiver[Sample[QuantityT]]:
25+
create_method: Callable[[float], SupportsFloatT],
26+
) -> Receiver[Sample[SupportsFloatT]]:
2727
"""Return the resampled data stream for the given component."""
2828
# Create a `FormulaBuilder` instance, just in order to reuse its
2929
# `_get_resampled_receiver` function implementation.
@@ -51,10 +51,10 @@ def get_resampled_stream(
5151
# pylint: enable=protected-access
5252

5353

54-
def equal_float_lists(list1: list[QuantityT], list2: list[QuantityT]) -> bool:
54+
def equal_float_lists(list1: list[SupportsFloatT], list2: list[SupportsFloatT]) -> bool:
5555
"""Compare two float lists with `math.isclose()`."""
5656
return (
5757
len(list1) > 0
5858
and len(list1) == len(list2)
59-
and all(isclose(v1.base_value, v2.base_value) for v1, v2 in zip(list1, list2))
59+
and all(isclose(v1, v2) for v1, v2 in zip(list1, list2))
6060
)

0 commit comments

Comments
 (0)