Skip to content

Commit c70ea97

Browse files
Move latest_sample from FallbackFormulaMetricFetcher to MetricFetcher
Because that's where it is used. Signed-off-by: Elzbieta Kotulska <[email protected]>
1 parent 1731ad4 commit c70ea97

File tree

2 files changed

+8
-30
lines changed

2 files changed

+8
-30
lines changed

src/frequenz/sdk/timeseries/formula_engine/_formula_generators/_fallback_formula_metric_fetcher.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ def __init__(self, formula_generator: FormulaGenerator[QuantityT]):
3434
self._formula_generator: FormulaGenerator[QuantityT] = formula_generator
3535
self._formula_engine: FormulaEngine[QuantityT] | None = None
3636
self._receiver: Receiver[Sample[QuantityT]] | None = None
37-
self._latest_sample: Sample[QuantityT] | None = None
3837

3938
@property
4039
def name(self) -> str:
@@ -46,15 +45,6 @@ def is_running(self) -> bool:
4645
"""Check whether the formula engine is running."""
4746
return self._receiver is not None
4847

49-
@property
50-
def latest_sample(self) -> Sample[QuantityT] | None:
51-
"""Get the latest fetched sample.
52-
53-
Returns:
54-
The latest fetched sample, or `None` if no sample has been fetched yet.
55-
"""
56-
return self._latest_sample
57-
5848
def start(self) -> None:
5949
"""Initialize the formula engine and start fetching samples."""
6050
engine = self._formula_generator.generate()
@@ -87,5 +77,4 @@ def consume(self) -> Sample[QuantityT]:
8777
self._receiver is not None
8878
), f"Fallback metric fetcher: {self.name} was not started"
8979

90-
self._latest_sample = self._receiver.consume()
91-
return self._latest_sample
80+
return self._receiver.consume()

src/frequenz/sdk/timeseries/formula_engine/_formula_steps.py

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -365,16 +365,6 @@ def name(self) -> str:
365365
def is_running(self) -> bool:
366366
"""Check whether the metric fetcher is running."""
367367

368-
@property
369-
@abstractmethod
370-
def latest_sample(self) -> Sample[QuantityT] | None:
371-
"""Get the latest fetched value.
372-
373-
Returns:
374-
The latest fetched value. None if no value has been fetched
375-
of fetcher is not running.
376-
"""
377-
378368
@abstractmethod
379369
def start(self) -> None:
380370
"""Initialize the metric fetcher and start fetching samples."""
@@ -406,6 +396,7 @@ def __init__(
406396
self._next_value: Sample[QuantityT] | None = None
407397
self._nones_are_zeros = nones_are_zeros
408398
self._fallback: FallbackMetricFetcher[QuantityT] | None = fallback
399+
self._latest_fallback_sample: Sample[QuantityT] | None = None
409400

410401
@property
411402
def stream(self) -> Receiver[Sample[QuantityT]]:
@@ -444,9 +435,9 @@ async def _synchronize_and_fetch_fallback(
444435
fetcher or if the fallback fetcher fails to fetch the next value.
445436
"""
446437
# fallback_fetcher was not used, yet. We need to fetch first value.
447-
if fallback_fetcher.latest_sample is None:
438+
if self._latest_fallback_sample is None:
448439
try:
449-
fallback = await fallback_fetcher.receive()
440+
self._latest_fallback_sample = await fallback_fetcher.receive()
450441
except ReceiverError[Any] as err:
451442
_logger.error(
452443
"Fallback metric fetcher %s failed to fetch next value: %s."
@@ -455,16 +446,14 @@ async def _synchronize_and_fetch_fallback(
455446
err,
456447
)
457448
return None
458-
else:
459-
fallback = fallback_fetcher.latest_sample
460449

461-
if primary_fetcher_sample.timestamp < fallback.timestamp:
450+
if primary_fetcher_sample.timestamp < self._latest_fallback_sample.timestamp:
462451
return None
463452

464453
# Synchronize the fallback fetcher with primary one
465-
while primary_fetcher_sample.timestamp > fallback.timestamp:
454+
while primary_fetcher_sample.timestamp > self._latest_fallback_sample.timestamp:
466455
try:
467-
fallback = await fallback_fetcher.receive()
456+
self._latest_fallback_sample = await fallback_fetcher.receive()
468457
except ReceiverError[Any] as err:
469458
_logger.error(
470459
"Fallback metric fetcher %s failed to fetch next value: %s."
@@ -474,7 +463,7 @@ async def _synchronize_and_fetch_fallback(
474463
)
475464
return None
476465

477-
return fallback
466+
return self._latest_fallback_sample
478467

479468
async def fetch_next_with_fallback(
480469
self, fallback_fetcher: FallbackMetricFetcher[QuantityT]

0 commit comments

Comments
 (0)