Skip to content

Commit f7f4bea

Browse files
authored
[train] Add try-except for pg.wait() (#60743)
## Description placement_group() is async and returns a handle immediately, If the PG gets removed between creation request and readiness, `pg.wait()` throws “not found” during wait, which in turn creates a controller error, we should treat this as WGStartupTimeoutError. ## Related issues [#870](anyscale#870) --------- Signed-off-by: Lehui Liu <lehui@anyscale.com>
1 parent 7bf368c commit f7f4bea

File tree

2 files changed

+29
-2
lines changed

2 files changed

+29
-2
lines changed

python/ray/train/v2/_internal/execution/worker_group/placement_group_handle.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
from abc import ABC, abstractmethod
23
from typing import TYPE_CHECKING, Union
34

@@ -7,6 +8,8 @@
78
if TYPE_CHECKING:
89
from ray.util.tpu import SlicePlacementGroup
910

11+
logger = logging.getLogger(__name__)
12+
1013

1114
class PlacementGroupHandle(ABC):
1215
"""Unified interface for placement groups in Ray Train.
@@ -63,7 +66,14 @@ def ready(self) -> ObjectRef:
6366
return self._pg.ready()
6467

6568
def wait(self, timeout_seconds: Union[float, int] = 30) -> bool:
66-
return self._pg.wait(timeout_seconds)
69+
try:
70+
return self._pg.wait(timeout_seconds)
71+
except Exception:
72+
logger.warning(
73+
"Placement group wait failed; treating as not ready.",
74+
exc_info=True,
75+
)
76+
return False
6777

6878
def shutdown(self) -> None:
6979
remove_placement_group(self._pg)
@@ -83,7 +93,14 @@ def ready(self) -> ObjectRef:
8393
return self._spg.placement_group.ready()
8494

8595
def wait(self, timeout_seconds: Union[float, int] = 30) -> bool:
86-
return self._spg.placement_group.wait(timeout_seconds)
96+
try:
97+
return self._spg.placement_group.wait(timeout_seconds)
98+
except Exception:
99+
logger.warning(
100+
"Slice placement group wait failed; treating as not ready.",
101+
exc_info=True,
102+
)
103+
return False
87104

88105
def shutdown(self) -> None:
89106
self._spg.shutdown()

python/ray/train/v2/tests/test_placement_group_handle.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ def test_default_handle_wait():
5656
handle.shutdown()
5757

5858

59+
def test_default_handle_wait_not_found_returns_false():
60+
"""wait() should return False if the placement group no longer exists."""
61+
mock_pg = MagicMock()
62+
mock_pg.wait.side_effect = Exception(
63+
"Placement group PlacementGroupID(abc) does not exist."
64+
)
65+
handle = DefaultPlacementGroupHandle(mock_pg)
66+
assert handle.wait(timeout_seconds=0) is False
67+
68+
5969
def test_default_handle_shutdown():
6070
"""shutdown() should remove the placement group."""
6171
pg = placement_group([{"CPU": 1}])

0 commit comments

Comments
 (0)