99import math
1010from collections .abc import Sequence
1111from datetime import datetime , timedelta
12- from typing import SupportsIndex , overload
12+ from typing import Generic , SupportsIndex , overload
1313
1414import numpy as np
1515from frequenz .channels import Broadcast , Receiver , Sender
1616from numpy .typing import ArrayLike
1717
1818from ..actor ._background_service import BackgroundService
19- from ._base_types import UNIX_EPOCH , Sample
20- from ._quantities import Quantity
19+ from ._base_types import UNIX_EPOCH , Sample , SupportsFloatT
2120from ._resampling import Resampler , ResamplerConfig
2221from ._ringbuffer import OrderedRingBuffer
2322
2423_logger = logging .getLogger (__name__ )
2524
2625
27- class MovingWindow (BackgroundService ):
26+ class MovingWindow (BackgroundService , Generic [ SupportsFloatT ] ):
2827 """
2928 A data window that moves with the latest datapoints of a data stream.
3029
@@ -130,9 +129,9 @@ async def run() -> None:
130129 def __init__ ( # pylint: disable=too-many-arguments
131130 self ,
132131 size : timedelta ,
133- resampled_data_recv : Receiver [Sample [Quantity ]],
132+ resampled_data_recv : Receiver [Sample [SupportsFloatT ]],
134133 input_sampling_period : timedelta ,
135- resampler_config : ResamplerConfig | None = None ,
134+ resampler_config : ResamplerConfig [ SupportsFloatT ] | None = None ,
136135 align_to : datetime = UNIX_EPOCH ,
137136 * ,
138137 name : str | None = None ,
@@ -166,8 +165,8 @@ def __init__( # pylint: disable=too-many-arguments
166165
167166 self ._sampling_period = input_sampling_period
168167
169- self ._resampler : Resampler | None = None
170- self ._resampler_sender : Sender [Sample [Quantity ]] | None = None
168+ self ._resampler : Resampler [ SupportsFloatT ] | None = None
169+ self ._resampler_sender : Sender [Sample [SupportsFloatT ]] | None = None
171170
172171 if resampler_config :
173172 assert (
@@ -182,7 +181,9 @@ def __init__( # pylint: disable=too-many-arguments
182181 size .total_seconds () / self ._sampling_period .total_seconds ()
183182 )
184183
185- self ._resampled_data_recv = resampled_data_recv
184+ self ._resampled_data_recv : Receiver [Sample [SupportsFloatT ]] = (
185+ resampled_data_recv
186+ )
186187 self ._buffer = OrderedRingBuffer (
187188 np .empty (shape = num_samples , dtype = float ),
188189 sampling_period = self ._sampling_period ,
@@ -341,11 +342,11 @@ def _configure_resampler(self) -> None:
341342 """Configure the components needed to run the resampler."""
342343 assert self ._resampler is not None
343344
344- async def sink_buffer (sample : Sample [Quantity ]) -> None :
345+ async def sink_buffer (sample : Sample [SupportsFloatT ]) -> None :
345346 if sample .value is not None :
346347 self ._buffer .update (sample )
347348
348- resampler_channel = Broadcast [Sample [Quantity ]]("average" )
349+ resampler_channel = Broadcast [Sample [SupportsFloatT ]]("average" )
349350 self ._resampler_sender = resampler_channel .new_sender ()
350351 self ._resampler .add_timeseries (
351352 "avg" , resampler_channel .new_receiver (), sink_buffer
0 commit comments