Skip to content

Commit faae05a

Browse files
zhengruifengdongjoon-hyun
authored andcommitted
[SPARK-52843][PYTHON][TESTS] Support retry timeout tests
### What changes were proposed in this pull request? Support retry timeout tests ### Why are the changes needed? We have a batch of flaky (ml, streaming, etc) tests which are prone to timeout, existing `eventually` with `timeout` cannot handle this case ### Does this PR introduce _any_ user-facing change? No, test-only ### How was this patch tested? added ut ### Was this patch authored or co-authored using generative AI tooling? no Closes #51537 from zhengruifeng/retry_timeout_tests. Authored-by: Ruifeng Zheng <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 3457df6 commit faae05a

File tree

2 files changed

+32
-10
lines changed

2 files changed

+32
-10
lines changed

python/pyspark/testing/utils.py

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -132,15 +132,15 @@ def write_int(i):
132132
return struct.pack("!i", i)
133133

134134

135-
def timeout(seconds):
135+
def timeout(timeout):
136136
def decorator(func):
137137
def handler(signum, frame):
138-
raise TimeoutError(f"Function {func.__name__} timed out after {seconds} seconds")
138+
raise TimeoutError(f"Function {func.__name__} timed out after {timeout} seconds")
139139

140140
def wrapper(*args, **kwargs):
141141
signal.alarm(0)
142142
signal.signal(signal.SIGALRM, handler)
143-
signal.alarm(seconds)
143+
signal.alarm(timeout)
144144
try:
145145
result = func(*args, **kwargs)
146146
finally:
@@ -155,6 +155,7 @@ def wrapper(*args, **kwargs):
155155
def eventually(
156156
timeout=30.0,
157157
catch_assertions=False,
158+
catch_timeout=False,
158159
):
159160
"""
160161
Wait a given amount of time for a condition to pass, else fail with an error.
@@ -176,9 +177,14 @@ def eventually(
176177
If False (default), do not catch AssertionErrors.
177178
If True, catch AssertionErrors; continue, but save
178179
error to throw upon timeout.
180+
catch_timeout : bool
181+
If False (default), do not catch TimeoutError.
182+
If True, catch TimeoutError; continue, but save
183+
error to throw upon timeout.
179184
"""
180185
assert timeout > 0
181186
assert isinstance(catch_assertions, bool)
187+
assert isinstance(catch_timeout, bool)
182188

183189
def decorator(condition: Callable) -> Callable:
184190
assert isinstance(condition, Callable)
@@ -191,21 +197,26 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
191197
while time() - start_time < timeout:
192198
numTries += 1
193199

194-
if catch_assertions:
195-
try:
196-
lastValue = condition(*args, **kwargs)
197-
except AssertionError as e:
198-
lastValue = e
199-
else:
200+
try:
200201
lastValue = condition(*args, **kwargs)
202+
except AssertionError as e:
203+
if catch_assertions:
204+
lastValue = e
205+
else:
206+
raise e
207+
except TimeoutError as e:
208+
if catch_timeout:
209+
lastValue = e
210+
else:
211+
raise e
201212

202213
if lastValue is True or lastValue is None:
203214
return
204215

205216
print(f"\nAttempt #{numTries} failed!\n{lastValue}")
206217
sleep(0.01)
207218

208-
if isinstance(lastValue, AssertionError):
219+
if isinstance(lastValue, (AssertionError, TimeoutError)):
209220
raise lastValue
210221
else:
211222
raise AssertionError(

python/pyspark/tests/test_util.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,17 @@ def test_parse_memory(self):
148148
with self.assertRaisesRegex(ValueError, "invalid format"):
149149
_parse_memory("2gs")
150150

151+
@eventually(timeout=180, catch_timeout=True)
152+
@timeout(timeout=1)
153+
def test_retry_timeout_test(self):
154+
import random
155+
156+
if random.random() < 0.5:
157+
print("hanging for 1 hour")
158+
time.sleep(3600) # Simulate a long-running operation
159+
else:
160+
print("succeeding immediately")
161+
151162

152163
class HandleWorkerExceptionTests(unittest.TestCase):
153164
exception_bytes = b"ValueError: test_message"

0 commit comments

Comments
 (0)