1
1
from __future__ import annotations
2
2
3
- import time
4
3
import warnings
5
- from concurrent .futures import ThreadPoolExecutor
6
- from threading import Event
7
4
from typing import TYPE_CHECKING , Any , Callable , NamedTuple
8
5
9
- from .._utils import batched , with_timeout
6
+ from .._utils import batched , waiter
10
7
from ..core import BoundModelBase , ClientEntityBase , Meta
11
8
from .domain import Action , ActionFailedException , ActionTimeoutException
12
9
@@ -19,18 +16,24 @@ class BoundAction(BoundModelBase, Action):
19
16
20
17
model = Action
21
18
22
- def wait_until_finished (self , max_retries : int | None = None ) -> None :
19
+ def wait_until_finished (
20
+ self ,
21
+ max_retries : int | None = None ,
22
+ * ,
23
+ timeout : float | None = None ,
24
+ ) -> None :
23
25
"""Wait until the specific action has status=finished.
24
26
25
27
:param max_retries: int Specify how many retries will be performed before an ActionTimeoutException will be raised.
26
28
:raises: ActionFailedException when action is finished with status==error
27
- :raises: ActionTimeoutException when Action is still in status==running after max_retries is reached.
29
+ :raises: ActionTimeoutException when Action is still in status==running after max_retries or timeout is reached.
28
30
"""
29
31
if max_retries is None :
30
32
# pylint: disable=protected-access
31
33
max_retries = self ._client ._client ._poll_max_retries
32
34
33
35
retries = 0
36
+ wait = waiter (timeout )
34
37
while True :
35
38
self .reload ()
36
39
if self .status != Action .STATUS_RUNNING :
@@ -39,8 +42,8 @@ def wait_until_finished(self, max_retries: int | None = None) -> None:
39
42
retries += 1
40
43
if retries < max_retries :
41
44
# pylint: disable=protected-access
42
- time . sleep (self ._client ._client ._poll_interval_func (retries ))
43
- continue
45
+ if not wait (self ._client ._client ._poll_interval_func (retries )):
46
+ continue
44
47
45
48
raise ActionTimeoutException (action = self )
46
49
@@ -170,9 +173,10 @@ def _get_list_by_ids(self, ids: list[int]) -> list[BoundAction]:
170
173
171
174
def wait_for_function (
172
175
self ,
173
- stop : Event ,
174
176
handle_update : Callable [[BoundAction ], None ],
175
177
actions : list [Action | BoundAction ],
178
+ * ,
179
+ timeout : float | None = None ,
176
180
) -> list [BoundAction ]:
177
181
"""
178
182
Waits until all Actions succeed by polling the API at the interval defined by
@@ -183,6 +187,7 @@ def wait_for_function(
183
187
184
188
:param handle_update: Function called every time an Action is updated.
185
189
:param actions: List of Actions to wait for.
190
+ :param timeout: Timeout in seconds.
186
191
:raises: ActionFailedException when an Action failed.
187
192
:return: List of succeeded Actions.
188
193
"""
@@ -191,10 +196,12 @@ def wait_for_function(
191
196
completed : list [BoundAction ] = []
192
197
193
198
retries = 0
199
+ wait = waiter (timeout )
194
200
while len (running_ids ):
195
201
# pylint: disable=protected-access
196
- if stop .wait (self ._client ._poll_interval_func (retries )):
197
- raise ActionTimeoutException ()
202
+ if wait (self ._client ._poll_interval_func (retries )):
203
+ # TODO: How to raise a timeout exception for many actiosn without exception group.
204
+ raise ActionTimeoutException ("" )
198
205
199
206
retries += 1
200
207
@@ -212,6 +219,7 @@ def wait_for_function(
212
219
def wait_for (
213
220
self ,
214
221
actions : list [Action | BoundAction ],
222
+ * ,
215
223
timeout : float | None = None ,
216
224
) -> list [BoundAction ]:
217
225
"""
@@ -222,6 +230,7 @@ def wait_for(
222
230
If a single Action fails, the function will stop waiting and raise ActionFailedException.
223
231
224
232
:param actions: List of Actions to wait for.
233
+ :param timeout: Timeout in seconds.
225
234
:raises: ActionFailedException when an Action failed.
226
235
:raises: TimeoutError when the Actions did not succeed before timeout.
227
236
:return: List of succeeded Actions.
@@ -231,10 +240,7 @@ def handle_update(update: BoundAction) -> None:
231
240
if update .status == Action .STATUS_ERROR :
232
241
raise ActionFailedException (action = update )
233
242
234
- def run () -> list [BoundAction ]:
235
- return self .wait_for_function (handle_update , actions )
236
-
237
- return with_timeout (timeout )(run )()
243
+ return self .wait_for_function (handle_update , actions , timeout = timeout )
238
244
239
245
def get_list (
240
246
self ,
0 commit comments