diff --git a/hcloud/_utils.py b/hcloud/_utils.py new file mode 100644 index 00000000..1ece2320 --- /dev/null +++ b/hcloud/_utils.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +import time +from collections.abc import Iterable, Iterator +from itertools import islice +from typing import Callable, TypeVar + +T = TypeVar("T") + + +def batched(iterable: Iterable[T], size: int) -> Iterator[tuple[T, ...]]: + """ + Returns a batch of the provided size from the provided iterable. + """ + iterator = iter(iterable) + while True: + batch = tuple(islice(iterator, size)) + if not batch: + break + yield batch + + +def waiter(timeout: float | None = None) -> Callable[[float], bool]: + """ + Waiter returns a wait function that sleeps the specified amount of seconds, and + handles timeouts. + + The wait function returns True if the timeout was reached, False otherwise. + + :param timeout: Timeout in seconds, defaults to None. + :return: Wait function. + """ + + if timeout: + deadline = time.time() + timeout + + def wait(seconds: float) -> bool: + now = time.time() + + # Timeout if the deadline exceeded. + if deadline < now: + return True + + # The deadline is not exceeded after the sleep time. + if now + seconds < deadline: + time.sleep(seconds) + return False + + # The deadline is exceeded after the sleep time, clamp sleep time to + # deadline, and allow one last attempt until next wait call. + time.sleep(deadline - now) + return False + + else: + + def wait(seconds: float) -> bool: + time.sleep(seconds) + return False + + return wait diff --git a/hcloud/actions/__init__.py b/hcloud/actions/__init__.py index 8a96e5de..7bfdefab 100644 --- a/hcloud/actions/__init__.py +++ b/hcloud/actions/__init__.py @@ -10,6 +10,7 @@ Action, ActionException, ActionFailedException, + ActionGroupException, ActionTimeoutException, ) @@ -18,6 +19,7 @@ "ActionException", "ActionFailedException", "ActionTimeoutException", + "ActionGroupException", "ActionsClient", "ActionsPageResult", "BoundAction", diff --git a/hcloud/actions/client.py b/hcloud/actions/client.py index 7ec192c8..505b2c8e 100644 --- a/hcloud/actions/client.py +++ b/hcloud/actions/client.py @@ -1,11 +1,16 @@ from __future__ import annotations -import time import warnings -from typing import TYPE_CHECKING, Any, NamedTuple +from typing import TYPE_CHECKING, Any, Callable, NamedTuple +from .._utils import batched, waiter from ..core import BoundModelBase, ClientEntityBase, Meta -from .domain import Action, ActionFailedException, ActionTimeoutException +from .domain import ( + Action, + ActionFailedException, + ActionGroupException, + ActionTimeoutException, +) if TYPE_CHECKING: from .._client import Client @@ -16,18 +21,25 @@ class BoundAction(BoundModelBase, Action): model = Action - def wait_until_finished(self, max_retries: int | None = None) -> None: + def wait_until_finished( + self, + max_retries: int | None = None, + *, + timeout: float | None = None, + ) -> None: """Wait until the specific action has status=finished. :param max_retries: int Specify how many retries will be performed before an ActionTimeoutException will be raised. + :param timeout: Timeout in seconds before an ActionTimeoutException will be raised. :raises: ActionFailedException when action is finished with status==error - :raises: ActionTimeoutException when Action is still in status==running after max_retries is reached. + :raises: ActionTimeoutException when Action is still in status==running after max_retries or timeout is reached. """ if max_retries is None: # pylint: disable=protected-access max_retries = self._client._client._poll_max_retries retries = 0 + wait = waiter(timeout) while True: self.reload() if self.status != Action.STATUS_RUNNING: @@ -36,8 +48,8 @@ def wait_until_finished(self, max_retries: int | None = None) -> None: retries += 1 if retries < max_retries: # pylint: disable=protected-access - time.sleep(self._client._client._poll_interval_func(retries)) - continue + if not wait(self._client._client._poll_interval_func(retries)): + continue raise ActionTimeoutException(action=self) @@ -129,6 +141,114 @@ class ActionsClient(ResourceActionsClient): def __init__(self, client: Client): super().__init__(client, None) + def _get_list_by_ids(self, ids: list[int]) -> list[BoundAction]: + """ + Get a list of Actions by their IDs. + + :param ids: List of Action IDs to get. + :raises ValueError: Raise when Action IDs were not found. + :return: List of Actions. + """ + actions: list[BoundAction] = [] + + for ids_batch in batched(ids, 25): + params: dict[str, Any] = { + "id": ids_batch, + } + + response = self._client.request( + method="GET", + url="/actions", + params=params, + ) + + actions.extend( + BoundAction(self._client.actions, action_data) + for action_data in response["actions"] + ) + + if len(ids) != len(actions): + found_ids = [a.id for a in actions] + not_found_ids = list(set(ids) - set(found_ids)) + + raise ValueError( + f"actions not found: {', '.join(str(o) for o in not_found_ids)}" + ) + + return actions + + def wait_for_function( + self, + handle_update: Callable[[BoundAction], None], + actions: list[Action | BoundAction], + *, + timeout: float | None = None, + ) -> list[BoundAction]: + """ + Waits until all Actions succeed by polling the API at the interval defined by + the client's poll interval and function. An Action is considered as complete + when its status is either "success" or "error". + + The handle_update callback is called every time an Action is updated. + + :param handle_update: Function called every time an Action is updated. + :param actions: List of Actions to wait for. + :param timeout: Timeout in seconds. + :raises: ActionFailedException when an Action failed. + :return: List of succeeded Actions. + """ + running: list[BoundAction] = actions.copy() # type: ignore[assignment] + completed: list[BoundAction] = [] + + retries = 0 + wait = waiter(timeout) + while len(running) > 0: + # pylint: disable=protected-access + if wait(self._client._poll_interval_func(retries)): + raise ActionGroupException( + failed=[ActionTimeoutException(action) for action in running], + completed=completed, + ) + + retries += 1 + + running = self._get_list_by_ids([a.id for a in running]) + + for update in running: + if update.status != Action.STATUS_RUNNING: + running.remove(update) + completed.append(update) + + handle_update(update) + + return completed + + def wait_for( + self, + actions: list[Action | BoundAction], + *, + timeout: float | None = None, + ) -> list[BoundAction]: + """ + Waits until all Actions succeed by polling the API at the interval defined by + the client's poll interval and function. An Action is considered as complete + when its status is either "success" or "error". + + If a single Action fails, the function will stop waiting and raise ActionFailedException. + + :param actions: List of Actions to wait for. + :param timeout: Timeout in seconds. + :raises: ActionFailedException when an Action failed. + :raises: TimeoutError when the Actions did not succeed before timeout. + :return: List of succeeded Actions. + """ + + def handle_update(update: BoundAction) -> None: + if update.status == Action.STATUS_ERROR: + raise ActionFailedException(action=update) + + return self.wait_for_function(handle_update, actions, timeout=timeout) + def get_list( self, status: list[str] | None = None, diff --git a/hcloud/actions/domain.py b/hcloud/actions/domain.py index aa7f2fd9..76f8d6c8 100644 --- a/hcloud/actions/domain.py +++ b/hcloud/actions/domain.py @@ -98,3 +98,16 @@ class ActionFailedException(ActionException): class ActionTimeoutException(ActionException): """The pending action timed out""" + + +class ActionGroupException(HCloudException): + """An exception for a group of actions""" + + def __init__( + self, + failed: list[ActionException], + completed: list[BoundAction] | None = None, + ): + super().__init__("Multiple pending actions failed") + self.failed = failed + self.completed = completed diff --git a/tests/unit/actions/test_client.py b/tests/unit/actions/test_client.py index f9943087..71c8fb51 100644 --- a/tests/unit/actions/test_client.py +++ b/tests/unit/actions/test_client.py @@ -7,6 +7,7 @@ from hcloud.actions import ( Action, ActionFailedException, + ActionGroupException, ActionsClient, ActionTimeoutException, BoundAction, @@ -197,3 +198,90 @@ def test_get_all(self, actions_client, generic_action_list, params): assert action2._client == actions_client._client.actions assert action2.id == 2 assert action2.command == "stop_server" + + def test_wait_for(self, actions_client: ActionsClient): + actions = [Action(id=1), Action(id=2)] + + # Speed up test by not really waiting + actions_client._client._poll_interval_func = mock.MagicMock() + actions_client._client._poll_interval_func.return_value = 0.1 + + actions_client._client.request.side_effect = [ + { + "actions": [ + {"id": 1, "status": "running"}, + {"id": 2, "status": "success"}, + ] + }, + { + "actions": [ + {"id": 1, "status": "success"}, + ] + }, + ] + + actions = actions_client.wait_for(actions) + + actions_client._client.request.assert_has_calls( + [ + mock.call(method="GET", url="/actions", params={"id": (1, 2)}), + mock.call(method="GET", url="/actions", params={"id": (1,)}), + ] + ) + + assert len(actions) == 2 + + def test_wait_for_error(self, actions_client: ActionsClient): + actions = [Action(id=1), Action(id=2)] + + # Speed up test by not really waiting + actions_client._client._poll_interval_func = mock.MagicMock() + actions_client._client._poll_interval_func.return_value = 0.1 + + actions_client._client.request.side_effect = [ + { + "actions": [ + {"id": 1, "status": "running"}, + { + "id": 2, + "status": "error", + "error": {"code": "failed", "message": "Action failed"}, + }, + ] + }, + ] + + with pytest.raises(ActionFailedException): + actions_client.wait_for(actions) + + actions_client._client.request.assert_has_calls( + [ + mock.call(method="GET", url="/actions", params={"id": (1, 2)}), + ] + ) + + def test_wait_for_timeout(self, actions_client: ActionsClient): + actions = [ + Action(id=1, status="running", command="create_server"), + Action(id=2, status="running", command="start_server"), + ] + + # Speed up test by not really waiting + actions_client._client._poll_interval_func = mock.MagicMock() + actions_client._client._poll_interval_func.return_value = 0.1 + + actions_client._client.request.return_value = { + "actions": [ + {"id": 1, "status": "running", "command": "create_server"}, + {"id": 2, "status": "running", "command": "start_server"}, + ] + } + + with pytest.raises(ActionGroupException): + actions_client.wait_for(actions, timeout=0.2) + + actions_client._client.request.assert_has_calls( + [ + mock.call(method="GET", url="/actions", params={"id": (1, 2)}), + ] + ) diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py new file mode 100644 index 00000000..56987957 --- /dev/null +++ b/tests/unit/test_utils.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +import time + +from hcloud._utils import batched, waiter + + +def test_batched(): + assert list(o for o in batched([1, 2, 3, 4, 5], 2)) == [(1, 2), (3, 4), (5,)] + + +def test_waiter(): + wait = waiter(timeout=0.2) + assert wait(0.1) is False + time.sleep(0.2) + assert wait(1) is True + + # Clamp sleep to deadline + wait = waiter(timeout=0.2) + assert wait(0.3) is False + assert wait(1) is True