1212from abc import ABC
1313from collections import deque
1414from collections .abc import Callable
15- from typing import Generic , TypeVar , Union , overload
15+ from typing import Generic , SupportsFloat , TypeVar , Union , overload
1616
1717from frequenz .channels import Broadcast , Receiver
1818
4040
4141_logger = logging .Logger (__name__ )
4242
43+ SupportsFloatInputT = TypeVar ("SupportsFloatInputT" , bound = SupportsFloat )
44+ """Type variable for inputs that support conversion to float."""
45+
46+ SupportsFloatOutputT = TypeVar ("SupportsFloatOutputT" , bound = SupportsFloat )
47+ """Type variable for outputs that support conversion to float."""
48+
4349_operator_precedence = {
4450 "max" : 0 ,
4551 "min" : 1 ,
95101# but mypy doesn't support that, so we need to use `# type: ignore` in several places in
96102# this, and subsequent classes, to avoid mypy errors.
97103class _ComposableFormulaEngine (
98- ABC , Generic [_GenericEngine , _GenericHigherOrderBuilder , SupportsFloatT ]
104+ ABC ,
105+ Generic [
106+ _GenericEngine ,
107+ _GenericHigherOrderBuilder ,
108+ SupportsFloatInputT ,
109+ SupportsFloatOutputT ,
110+ ],
99111):
100112 """A base class for formula engines."""
101113
102- _create_method : Callable [[float ], SupportsFloatT ]
114+ _create_method : Callable [[float ], SupportsFloatOutputT ]
103115 _higher_order_builder : type [_GenericHigherOrderBuilder ]
104116 _task : asyncio .Task [None ] | None = None
105117
@@ -110,8 +122,7 @@ async def _stop(self) -> None:
110122 await cancel_and_await (self ._task )
111123
112124 def __add__ (
113- self ,
114- other : _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatT ,
125+ self , other : _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatInputT
115126 ) -> _GenericHigherOrderBuilder :
116127 """Return a formula builder that adds (data in) `other` to `self`.
117128
@@ -126,7 +137,7 @@ def __add__(
126137 return self ._higher_order_builder (self , self ._create_method ) + other # type: ignore
127138
128139 def __sub__ (
129- self , other : _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatT
140+ self , other : _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatInputT
130141 ) -> _GenericHigherOrderBuilder :
131142 """Return a formula builder that subtracts (data in) `other` from `self`.
132143
@@ -171,7 +182,7 @@ def __truediv__(
171182 return self ._higher_order_builder (self , self ._create_method ) / other # type: ignore
172183
173184 def max (
174- self , other : _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatT
185+ self , other : _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatInputT
175186 ) -> _GenericHigherOrderBuilder :
176187 """Return a formula engine that outputs the maximum of `self` and `other`.
177188
@@ -186,7 +197,7 @@ def max(
186197 return self ._higher_order_builder (self , self ._create_method ).max (other ) # type: ignore
187198
188199 def min (
189- self , other : _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatT
200+ self , other : _GenericEngine | _GenericHigherOrderBuilder | SupportsFloatInputT
190201 ) -> _GenericHigherOrderBuilder :
191202 """Return a formula engine that outputs the minimum of `self` and `other`.
192203
@@ -221,11 +232,11 @@ def production(self) -> _GenericHigherOrderBuilder:
221232
222233
223234class FormulaEngine (
224- Generic [SupportsFloatT ],
235+ Generic [SupportsFloatInputT , SupportsFloatOutputT ],
225236 _ComposableFormulaEngine [
226237 "FormulaEngine" , # type: ignore[type-arg]
227238 "HigherOrderFormulaBuilder" , # type: ignore[type-arg]
228- SupportsFloatT ,
239+ SupportsFloatOutputT ,
229240 ],
230241):
231242 """[`FormulaEngine`][frequenz.sdk.timeseries.formula_engine.FormulaEngine]s are a
@@ -294,8 +305,8 @@ class FormulaEngine(
294305
295306 def __init__ (
296307 self ,
297- builder : FormulaBuilder [SupportsFloatT ],
298- create_method : Callable [[float ], SupportsFloatT ],
308+ builder : FormulaBuilder [SupportsFloatInputT , SupportsFloatOutputT ],
309+ create_method : Callable [[float ], SupportsFloatOutputT ],
299310 ) -> None :
300311 """Create a `FormulaEngine` instance.
301312
@@ -308,19 +319,21 @@ def __init__(
308319 """
309320 self ._higher_order_builder = HigherOrderFormulaBuilder
310321 self ._name : str = builder .name
311- self ._builder : FormulaBuilder [SupportsFloatT ] = builder
322+ self ._builder : FormulaBuilder [SupportsFloatInputT , SupportsFloatOutputT ] = (
323+ builder
324+ )
312325 self ._create_method = create_method
313- self ._channel : Broadcast [Sample [SupportsFloatT ]] = Broadcast (self ._name )
326+ self ._channel : Broadcast [Sample [SupportsFloatInputT ]] = Broadcast (self ._name )
314327
315328 @classmethod
316329 def from_receiver (
317330 cls ,
318331 name : str ,
319- receiver : Receiver [Sample [SupportsFloatT ]],
320- create_method : Callable [[float ], SupportsFloatT ],
332+ receiver : Receiver [Sample [SupportsFloatInputT ]],
333+ create_method : Callable [[float ], SupportsFloatOutputT ],
321334 * ,
322335 nones_are_zeros : bool = False ,
323- ) -> FormulaEngine [SupportsFloatT ]:
336+ ) -> FormulaEngine [SupportsFloatInputT , SupportsFloatOutputT ]:
324337 """
325338 Create a formula engine from a receiver.
326339
@@ -370,7 +383,7 @@ async def run() -> None:
370383 async def _run (self ) -> None :
371384 await self ._builder .subscribe ()
372385 steps , metric_fetchers = self ._builder .finalize ()
373- evaluator = FormulaEvaluator [SupportsFloatT ](
386+ evaluator = FormulaEvaluator [SupportsFloatInputT , SupportsFloatOutputT ](
374387 self ._name , steps , metric_fetchers , self ._create_method
375388 )
376389 sender = self ._channel .new_sender ()
@@ -556,7 +569,7 @@ def new_receiver(
556569 return self ._channel .new_receiver (name , max_size )
557570
558571
559- class FormulaBuilder (Generic [SupportsFloatT ]):
572+ class FormulaBuilder (Generic [SupportsFloatInputT , SupportsFloatOutputT ]):
560573 """Builds a post-fix formula engine that operates on `Sample` receivers.
561574
562575 Operators and metrics need to be pushed in in-fix order, and they get rearranged
@@ -585,7 +598,7 @@ class FormulaBuilder(Generic[SupportsFloatT]):
585598 """
586599
587600 def __init__ (
588- self , name : str , create_method : Callable [[float ], SupportsFloatT ]
601+ self , name : str , create_method : Callable [[float ], SupportsFloatOutputT ]
589602 ) -> None :
590603 """Create a `FormulaBuilder` instance.
591604
@@ -596,10 +609,10 @@ def __init__(
596609 `Power.from_watts`, for example.
597610 """
598611 self ._name = name
599- self ._create_method : Callable [[float ], SupportsFloatT ] = create_method
612+ self ._create_method : Callable [[float ], SupportsFloatOutputT ] = create_method
600613 self ._build_stack : list [FormulaStep ] = []
601614 self ._steps : list [FormulaStep ] = []
602- self ._metric_fetchers : dict [str , MetricFetcher [SupportsFloatT ]] = {}
615+ self ._metric_fetchers : dict [str , MetricFetcher [SupportsFloatInputT ]] = {}
603616
604617 def push_oper (self , oper : str ) -> None : # pylint: disable=too-many-branches
605618 """Push an operator into the engine.
@@ -643,7 +656,7 @@ def push_oper(self, oper: str) -> None: # pylint: disable=too-many-branches
643656 def push_metric (
644657 self ,
645658 name : str ,
646- data_stream : Receiver [Sample [SupportsFloatT ]],
659+ data_stream : Receiver [Sample [SupportsFloatInputT ]],
647660 * ,
648661 nones_are_zeros : bool ,
649662 ) -> None :
@@ -735,7 +748,7 @@ async def subscribe(self) -> None:
735748
736749 def finalize (
737750 self ,
738- ) -> tuple [list [FormulaStep ], dict [str , MetricFetcher [SupportsFloatT ]]]:
751+ ) -> tuple [list [FormulaStep ], dict [str , MetricFetcher [SupportsFloatInputT ]]]:
739752 """Finalize and return the steps and fetchers for the formula.
740753
741754 Returns:
@@ -755,7 +768,7 @@ def __str__(self) -> str:
755768 steps = self ._steps if len (self ._steps ) > 0 else self ._build_stack
756769 return format_formula (steps )
757770
758- def build (self ) -> FormulaEngine [SupportsFloatT ]:
771+ def build (self ) -> FormulaEngine [SupportsFloatInputT , SupportsFloatOutputT ]:
759772 """Create a formula engine with the steps and fetchers that have been pushed.
760773
761774 Returns:
@@ -765,13 +778,16 @@ def build(self) -> FormulaEngine[SupportsFloatT]:
765778 return FormulaEngine (self , create_method = self ._create_method )
766779
767780
768- class _BaseHOFormulaBuilder (ABC , Generic [SupportsFloatT ]):
781+ class _BaseHOFormulaBuilder (ABC , Generic [SupportsFloatInputT , SupportsFloatOutputT ]):
769782 """Provides a way to build formulas from the outputs of other formulas."""
770783
771784 def __init__ (
772785 self ,
773- engine : FormulaEngine [SupportsFloatT ] | FormulaEngine3Phase [SupportsFloatT ],
774- create_method : Callable [[float ], SupportsFloatT ],
786+ engine : (
787+ FormulaEngine [SupportsFloatInputT , SupportsFloatOutputT ]
788+ | FormulaEngine3Phase [SupportsFloatT ]
789+ ),
790+ create_method : Callable [[float ], SupportsFloatOutputT ],
775791 ) -> None :
776792 """Create a `GenericHigherOrderFormulaBuilder` instance.
777793
@@ -785,20 +801,20 @@ def __init__(
785801 self ._steps : deque [
786802 tuple [
787803 TokenType ,
788- FormulaEngine [SupportsFloatT ]
804+ FormulaEngine [SupportsFloatInputT , SupportsFloatOutputT ]
789805 | FormulaEngine3Phase [SupportsFloatT ]
790806 | Quantity
791807 | float
792808 | str ,
793809 ]
794810 ] = deque ()
795811 self ._steps .append ((TokenType .COMPONENT_METRIC , engine ))
796- self ._create_method : Callable [[float ], SupportsFloatT ] = create_method
812+ self ._create_method : Callable [[float ], SupportsFloatOutputT ] = create_method
797813
798814 @overload
799815 def _push (
800816 self , oper : str , other : _CompositionType1Phase
801- ) -> HigherOrderFormulaBuilder [SupportsFloatT ]: ...
817+ ) -> HigherOrderFormulaBuilder [SupportsFloatInputT , SupportsFloatOutputT ]: ...
802818
803819 @overload
804820 def _push (
@@ -1061,13 +1077,13 @@ def production(
10611077
10621078
10631079class HigherOrderFormulaBuilder (
1064- Generic [ SupportsFloatT ], _BaseHOFormulaBuilder [ SupportsFloatT ]
1080+ _BaseHOFormulaBuilder [ SupportsFloatInputT , SupportsFloatT ]
10651081):
10661082 """A specialization of the _BaseHOFormulaBuilder for `FormulaReceiver`."""
10671083
10681084 def build (
10691085 self , name : str , * , nones_are_zeros : bool = False
1070- ) -> FormulaEngine [SupportsFloatT ]:
1086+ ) -> FormulaEngine [SupportsFloatInputT , SupportsFloatT ]:
10711087 """Build a `FormulaEngine` instance from the builder.
10721088
10731089 Args:
@@ -1099,7 +1115,7 @@ def build(
10991115
11001116
11011117class HigherOrderFormulaBuilder3Phase (
1102- Generic [ SupportsFloatT ], _BaseHOFormulaBuilder [ SupportsFloatT ]
1118+ _BaseHOFormulaBuilder [ SupportsFloatInputT , SupportsFloatT ]
11031119):
11041120 """A specialization of the _BaseHOFormulaBuilder for `FormulaReceiver3Phase`."""
11051121
0 commit comments