Skip to content

feat: wait for actions using ActionsClient.wait_for #508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions hcloud/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from __future__ import annotations

import time
from collections.abc import Iterable, Iterator
from itertools import islice
from typing import Callable, TypeVar

T = TypeVar("T")


def batched(iterable: Iterable[T], size: int) -> Iterator[tuple[T, ...]]:
"""
Returns a batch of the provided size from the provided iterable.
"""
iterator = iter(iterable)
while True:
batch = tuple(islice(iterator, size))
if not batch:
break
yield batch


def waiter(timeout: float | None = None) -> Callable[[float], bool]:
"""
Waiter returns a wait function that sleeps the specified amount of seconds, and
handles timeouts.

The wait function returns True if the timeout was reached, False otherwise.

:param timeout: Timeout in seconds, defaults to None.
:return: Wait function.
"""

if timeout:
deadline = time.time() + timeout

def wait(seconds: float) -> bool:
now = time.time()

# Timeout if the deadline exceeded.
if deadline < now:
return True

# The deadline is not exceeded after the sleep time.
if now + seconds < deadline:
time.sleep(seconds)
return False

# The deadline is exceeded after the sleep time, clamp sleep time to
# deadline, and allow one last attempt until next wait call.
time.sleep(deadline - now)
return False

else:

def wait(seconds: float) -> bool:
time.sleep(seconds)
return False

return wait
2 changes: 2 additions & 0 deletions hcloud/actions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Action,
ActionException,
ActionFailedException,
ActionGroupException,
ActionTimeoutException,
)

Expand All @@ -18,6 +19,7 @@
"ActionException",
"ActionFailedException",
"ActionTimeoutException",
"ActionGroupException",
"ActionsClient",
"ActionsPageResult",
"BoundAction",
Expand Down
134 changes: 127 additions & 7 deletions hcloud/actions/client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from __future__ import annotations

import time
import warnings
from typing import TYPE_CHECKING, Any, NamedTuple
from typing import TYPE_CHECKING, Any, Callable, NamedTuple

from .._utils import batched, waiter
from ..core import BoundModelBase, ClientEntityBase, Meta
from .domain import Action, ActionFailedException, ActionTimeoutException
from .domain import (
Action,
ActionFailedException,
ActionGroupException,
ActionTimeoutException,
)

if TYPE_CHECKING:
from .._client import Client
Expand All @@ -16,18 +21,25 @@ class BoundAction(BoundModelBase, Action):

model = Action

def wait_until_finished(self, max_retries: int | None = None) -> None:
def wait_until_finished(
self,
max_retries: int | None = None,
*,
timeout: float | None = None,
) -> None:
"""Wait until the specific action has status=finished.

:param max_retries: int Specify how many retries will be performed before an ActionTimeoutException will be raised.
:param timeout: Timeout in seconds before an ActionTimeoutException will be raised.
:raises: ActionFailedException when action is finished with status==error
:raises: ActionTimeoutException when Action is still in status==running after max_retries is reached.
:raises: ActionTimeoutException when Action is still in status==running after max_retries or timeout is reached.
"""
if max_retries is None:
# pylint: disable=protected-access
max_retries = self._client._client._poll_max_retries

retries = 0
wait = waiter(timeout)
while True:
self.reload()
if self.status != Action.STATUS_RUNNING:
Expand All @@ -36,8 +48,8 @@ def wait_until_finished(self, max_retries: int | None = None) -> None:
retries += 1
if retries < max_retries:
# pylint: disable=protected-access
time.sleep(self._client._client._poll_interval_func(retries))
continue
if not wait(self._client._client._poll_interval_func(retries)):
continue

raise ActionTimeoutException(action=self)

Expand Down Expand Up @@ -129,6 +141,114 @@ class ActionsClient(ResourceActionsClient):
def __init__(self, client: Client):
super().__init__(client, None)

def _get_list_by_ids(self, ids: list[int]) -> list[BoundAction]:
"""
Get a list of Actions by their IDs.

:param ids: List of Action IDs to get.
:raises ValueError: Raise when Action IDs were not found.
:return: List of Actions.
"""
actions: list[BoundAction] = []

for ids_batch in batched(ids, 25):
params: dict[str, Any] = {
"id": ids_batch,
}

response = self._client.request(
method="GET",
url="/actions",
params=params,
)

actions.extend(
BoundAction(self._client.actions, action_data)
for action_data in response["actions"]
)

if len(ids) != len(actions):
found_ids = [a.id for a in actions]
not_found_ids = list(set(ids) - set(found_ids))

raise ValueError(
f"actions not found: {', '.join(str(o) for o in not_found_ids)}"
)

return actions

def wait_for_function(
self,
handle_update: Callable[[BoundAction], None],
actions: list[Action | BoundAction],
*,
timeout: float | None = None,
) -> list[BoundAction]:
"""
Waits until all Actions succeed by polling the API at the interval defined by
the client's poll interval and function. An Action is considered as complete
when its status is either "success" or "error".

The handle_update callback is called every time an Action is updated.

:param handle_update: Function called every time an Action is updated.
:param actions: List of Actions to wait for.
:param timeout: Timeout in seconds.
:raises: ActionFailedException when an Action failed.
:return: List of succeeded Actions.
"""
running: list[BoundAction] = actions.copy() # type: ignore[assignment]
completed: list[BoundAction] = []

retries = 0
wait = waiter(timeout)
while len(running) > 0:
# pylint: disable=protected-access
if wait(self._client._poll_interval_func(retries)):
raise ActionGroupException(
failed=[ActionTimeoutException(action) for action in running],
completed=completed,
)

retries += 1

running = self._get_list_by_ids([a.id for a in running])

for update in running:
if update.status != Action.STATUS_RUNNING:
running.remove(update)
completed.append(update)

handle_update(update)

return completed

def wait_for(
self,
actions: list[Action | BoundAction],
*,
timeout: float | None = None,
) -> list[BoundAction]:
"""
Waits until all Actions succeed by polling the API at the interval defined by
the client's poll interval and function. An Action is considered as complete
when its status is either "success" or "error".

If a single Action fails, the function will stop waiting and raise ActionFailedException.

:param actions: List of Actions to wait for.
:param timeout: Timeout in seconds.
:raises: ActionFailedException when an Action failed.
:raises: TimeoutError when the Actions did not succeed before timeout.
:return: List of succeeded Actions.
"""

def handle_update(update: BoundAction) -> None:
if update.status == Action.STATUS_ERROR:
raise ActionFailedException(action=update)

return self.wait_for_function(handle_update, actions, timeout=timeout)

def get_list(
self,
status: list[str] | None = None,
Expand Down
13 changes: 13 additions & 0 deletions hcloud/actions/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,16 @@ class ActionFailedException(ActionException):

class ActionTimeoutException(ActionException):
"""The pending action timed out"""


class ActionGroupException(HCloudException):
"""An exception for a group of actions"""

def __init__(
self,
failed: list[ActionException],
completed: list[BoundAction] | None = None,
):
super().__init__("Multiple pending actions failed")
self.failed = failed
self.completed = completed
88 changes: 88 additions & 0 deletions tests/unit/actions/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from hcloud.actions import (
Action,
ActionFailedException,
ActionGroupException,
ActionsClient,
ActionTimeoutException,
BoundAction,
Expand Down Expand Up @@ -197,3 +198,90 @@ def test_get_all(self, actions_client, generic_action_list, params):
assert action2._client == actions_client._client.actions
assert action2.id == 2
assert action2.command == "stop_server"

def test_wait_for(self, actions_client: ActionsClient):
actions = [Action(id=1), Action(id=2)]

# Speed up test by not really waiting
actions_client._client._poll_interval_func = mock.MagicMock()
actions_client._client._poll_interval_func.return_value = 0.1

actions_client._client.request.side_effect = [
{
"actions": [
{"id": 1, "status": "running"},
{"id": 2, "status": "success"},
]
},
{
"actions": [
{"id": 1, "status": "success"},
]
},
]

actions = actions_client.wait_for(actions)

actions_client._client.request.assert_has_calls(
[
mock.call(method="GET", url="/actions", params={"id": (1, 2)}),
mock.call(method="GET", url="/actions", params={"id": (1,)}),
]
)

assert len(actions) == 2

def test_wait_for_error(self, actions_client: ActionsClient):
actions = [Action(id=1), Action(id=2)]

# Speed up test by not really waiting
actions_client._client._poll_interval_func = mock.MagicMock()
actions_client._client._poll_interval_func.return_value = 0.1

actions_client._client.request.side_effect = [
{
"actions": [
{"id": 1, "status": "running"},
{
"id": 2,
"status": "error",
"error": {"code": "failed", "message": "Action failed"},
},
]
},
]

with pytest.raises(ActionFailedException):
actions_client.wait_for(actions)

actions_client._client.request.assert_has_calls(
[
mock.call(method="GET", url="/actions", params={"id": (1, 2)}),
]
)

def test_wait_for_timeout(self, actions_client: ActionsClient):
actions = [
Action(id=1, status="running", command="create_server"),
Action(id=2, status="running", command="start_server"),
]

# Speed up test by not really waiting
actions_client._client._poll_interval_func = mock.MagicMock()
actions_client._client._poll_interval_func.return_value = 0.1

actions_client._client.request.return_value = {
"actions": [
{"id": 1, "status": "running", "command": "create_server"},
{"id": 2, "status": "running", "command": "start_server"},
]
}

with pytest.raises(ActionGroupException):
actions_client.wait_for(actions, timeout=0.2)

actions_client._client.request.assert_has_calls(
[
mock.call(method="GET", url="/actions", params={"id": (1, 2)}),
]
)
21 changes: 21 additions & 0 deletions tests/unit/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from __future__ import annotations

import time

from hcloud._utils import batched, waiter


def test_batched():
assert list(o for o in batched([1, 2, 3, 4, 5], 2)) == [(1, 2), (3, 4), (5,)]


def test_waiter():
wait = waiter(timeout=0.2)
assert wait(0.1) is False
time.sleep(0.2)
assert wait(1) is True

# Clamp sleep to deadline
wait = waiter(timeout=0.2)
assert wait(0.3) is False
assert wait(1) is True