|
30 | 30 | from lightning.app.api.request_types import _APIRequest, _CommandRequest, _DeltaRequest |
31 | 31 | from lightning.app.core.constants import ( |
32 | 32 | BATCH_DELTA_COUNT, |
| 33 | + CHECK_ERROR_QUEUE_INTERVAL, |
33 | 34 | DEBUG_ENABLED, |
34 | 35 | FLOW_DURATION_SAMPLES, |
35 | 36 | FLOW_DURATION_THRESHOLD, |
@@ -165,6 +166,7 @@ def __init__( |
165 | 166 |
|
166 | 167 | self._last_run_time: float = 0.0 |
167 | 168 | self._run_times: list = [] |
| 169 | + self._last_check_error_queue: float = 0.0 |
168 | 170 |
|
169 | 171 | # Path attributes can't get properly attached during the initialization, because the full name |
170 | 172 | # is only available after all Flows and Works have been instantiated. |
@@ -318,10 +320,12 @@ def batch_get_state_changed_from_queue(q: BaseQueue, timeout: Optional[float] = |
318 | 320 | return [] |
319 | 321 |
|
320 | 322 | def check_error_queue(self) -> None: |
321 | | - exception: Exception = self.get_state_changed_from_queue(self.error_queue) # type: ignore[assignment,arg-type] |
322 | | - if isinstance(exception, Exception): |
323 | | - self.exception = exception |
324 | | - self.stage = AppStage.FAILED |
| 323 | + if (time() - self._last_check_error_queue) > CHECK_ERROR_QUEUE_INTERVAL: |
| 324 | + exception: Exception = self.get_state_changed_from_queue(self.error_queue) # type: ignore[assignment,arg-type] |
| 325 | + if isinstance(exception, Exception): |
| 326 | + self.exception = exception |
| 327 | + self.stage = AppStage.FAILED |
| 328 | + self._last_check_error_queue = time() |
325 | 329 |
|
326 | 330 | @property |
327 | 331 | def flows(self) -> List[Union[LightningWork, "LightningFlow"]]: |
|
0 commit comments