Skip to content

Commit 14af126

Browse files
committed
fix bug in retry_policy when max_retry_interval is not set
Signed-off-by: Filinto Duran <[email protected]>
1 parent fb05dff commit 14af126

File tree

3 files changed

+327
-1
lines changed

3 files changed

+327
-1
lines changed

durabletask/task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ def compute_next_delay(self) -> Optional[timedelta]:
445445
next_delay_f = min(
446446
next_delay_f, self._retry_policy.max_retry_interval.total_seconds()
447447
)
448-
return timedelta(seconds=next_delay_f)
448+
return timedelta(seconds=next_delay_f)
449449

450450
return None
451451

tests/aio/test_e2e.py

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -795,3 +795,231 @@ def skip_if_no_runtime():
795795
endpoint = os.getenv("DURABLETASK_GRPC_ENDPOINT", "localhost:4001")
796796
if not is_runtime_available(endpoint):
797797
pytest.skip(f"DurableTask runtime not available at {endpoint}")
798+
799+
800+
def test_async_activity_retry_with_backoff():
801+
"""Test that activities are retried with proper backoff and max attempts."""
802+
skip_if_no_runtime()
803+
804+
from datetime import timedelta
805+
806+
from durabletask import task
807+
808+
attempt_counter = 0
809+
810+
async def retry_orchestrator(ctx: AsyncWorkflowContext, _):
811+
retry_policy = task.RetryPolicy(
812+
first_retry_interval=timedelta(seconds=1),
813+
max_number_of_attempts=3,
814+
backoff_coefficient=2,
815+
max_retry_interval=timedelta(seconds=10),
816+
retry_timeout=timedelta(seconds=30),
817+
)
818+
result = await ctx.call_activity(failing_activity, retry_policy=retry_policy)
819+
return result
820+
821+
def failing_activity(ctx, _):
822+
nonlocal attempt_counter
823+
attempt_counter += 1
824+
raise RuntimeError(f"Attempt {attempt_counter} failed")
825+
826+
with TaskHubGrpcWorker() as worker:
827+
worker.add_orchestrator(retry_orchestrator)
828+
worker.add_activity(failing_activity)
829+
worker.start()
830+
worker.wait_for_ready(timeout=10)
831+
832+
client = TaskHubGrpcClient()
833+
instance_id = client.schedule_new_orchestration(retry_orchestrator)
834+
state = client.wait_for_orchestration_completion(instance_id, timeout=30)
835+
836+
assert state is not None
837+
assert state.runtime_status.name == "FAILED"
838+
assert state.failure_details is not None
839+
assert "Attempt 3 failed" in state.failure_details.message
840+
assert attempt_counter == 3
841+
842+
843+
def test_async_sub_orchestrator_retry():
844+
"""Test that sub-orchestrators are retried on failure."""
845+
skip_if_no_runtime()
846+
847+
from datetime import timedelta
848+
849+
from durabletask import task
850+
851+
child_attempt_counter = 0
852+
activity_attempt_counter = 0
853+
854+
async def parent_orchestrator(ctx: AsyncWorkflowContext, _):
855+
retry_policy = task.RetryPolicy(
856+
first_retry_interval=timedelta(seconds=1),
857+
max_number_of_attempts=3,
858+
backoff_coefficient=1,
859+
)
860+
result = await ctx.call_sub_orchestrator(child_orchestrator, retry_policy=retry_policy)
861+
return result
862+
863+
async def child_orchestrator(ctx: AsyncWorkflowContext, _):
864+
nonlocal child_attempt_counter
865+
if not ctx.is_replaying:
866+
child_attempt_counter += 1
867+
retry_policy = task.RetryPolicy(
868+
first_retry_interval=timedelta(seconds=1),
869+
max_number_of_attempts=3,
870+
backoff_coefficient=1,
871+
)
872+
result = await ctx.call_activity(failing_activity, retry_policy=retry_policy)
873+
return result
874+
875+
def failing_activity(ctx, _):
876+
nonlocal activity_attempt_counter
877+
activity_attempt_counter += 1
878+
raise RuntimeError("Kah-BOOOOM!!!")
879+
880+
with TaskHubGrpcWorker() as worker:
881+
worker.add_orchestrator(parent_orchestrator)
882+
worker.add_orchestrator(child_orchestrator)
883+
worker.add_activity(failing_activity)
884+
worker.start()
885+
worker.wait_for_ready(timeout=10)
886+
887+
client = TaskHubGrpcClient()
888+
instance_id = client.schedule_new_orchestration(parent_orchestrator)
889+
state = client.wait_for_orchestration_completion(instance_id, timeout=40)
890+
891+
assert state is not None
892+
assert state.runtime_status.name == "FAILED"
893+
assert state.failure_details is not None
894+
# Each child orchestrator attempt retries the activity 3 times
895+
# 3 child attempts × 3 activity attempts = 9 total
896+
assert activity_attempt_counter == 9
897+
assert child_attempt_counter == 3
898+
899+
900+
def test_async_retry_timeout():
901+
"""Test that retry timeout limits the number of attempts."""
902+
skip_if_no_runtime()
903+
904+
from datetime import timedelta
905+
906+
from durabletask import task
907+
908+
attempt_counter = 0
909+
910+
async def timeout_orchestrator(ctx: AsyncWorkflowContext, _):
911+
# Max 5 attempts, but timeout at 14 seconds
912+
# Attempts: 1s + 2s + 4s + 8s = 15s, so only 4 attempts should happen
913+
retry_policy = task.RetryPolicy(
914+
first_retry_interval=timedelta(seconds=1),
915+
max_number_of_attempts=5,
916+
backoff_coefficient=2,
917+
max_retry_interval=timedelta(seconds=10),
918+
retry_timeout=timedelta(seconds=14),
919+
)
920+
result = await ctx.call_activity(failing_activity, retry_policy=retry_policy)
921+
return result
922+
923+
def failing_activity(ctx, _):
924+
nonlocal attempt_counter
925+
attempt_counter += 1
926+
raise RuntimeError(f"Attempt {attempt_counter} failed")
927+
928+
with TaskHubGrpcWorker() as worker:
929+
worker.add_orchestrator(timeout_orchestrator)
930+
worker.add_activity(failing_activity)
931+
worker.start()
932+
worker.wait_for_ready(timeout=10)
933+
934+
client = TaskHubGrpcClient()
935+
instance_id = client.schedule_new_orchestration(timeout_orchestrator)
936+
state = client.wait_for_orchestration_completion(instance_id, timeout=40)
937+
938+
assert state is not None
939+
assert state.runtime_status.name == "FAILED"
940+
# Should only attempt 4 times due to timeout (1s + 2s + 4s + 8s would exceed 14s)
941+
assert attempt_counter == 4
942+
943+
944+
def test_async_non_retryable_error():
945+
"""Test that NonRetryableError prevents retries."""
946+
skip_if_no_runtime()
947+
948+
from datetime import timedelta
949+
950+
from durabletask import task
951+
952+
attempt_counter = 0
953+
954+
async def non_retryable_orchestrator(ctx: AsyncWorkflowContext, _):
955+
retry_policy = task.RetryPolicy(
956+
first_retry_interval=timedelta(seconds=1),
957+
max_number_of_attempts=5,
958+
backoff_coefficient=1,
959+
)
960+
result = await ctx.call_activity(non_retryable_activity, retry_policy=retry_policy)
961+
return result
962+
963+
def non_retryable_activity(ctx, _):
964+
nonlocal attempt_counter
965+
attempt_counter += 1
966+
raise task.NonRetryableError("This should not be retried")
967+
968+
with TaskHubGrpcWorker() as worker:
969+
worker.add_orchestrator(non_retryable_orchestrator)
970+
worker.add_activity(non_retryable_activity)
971+
worker.start()
972+
worker.wait_for_ready(timeout=10)
973+
974+
client = TaskHubGrpcClient()
975+
instance_id = client.schedule_new_orchestration(non_retryable_orchestrator)
976+
state = client.wait_for_orchestration_completion(instance_id, timeout=20)
977+
978+
assert state is not None
979+
assert state.runtime_status.name == "FAILED"
980+
assert state.failure_details is not None
981+
assert "NonRetryableError" in state.failure_details.error_type
982+
# Should only attempt once since it's non-retryable
983+
assert attempt_counter == 1
984+
985+
986+
def test_async_successful_retry():
987+
"""Test that an activity succeeds after retries."""
988+
skip_if_no_runtime()
989+
990+
from datetime import timedelta
991+
992+
from durabletask import task
993+
994+
attempt_counter = 0
995+
996+
async def successful_retry_orchestrator(ctx: AsyncWorkflowContext, _):
997+
retry_policy = task.RetryPolicy(
998+
first_retry_interval=timedelta(seconds=1),
999+
max_number_of_attempts=5,
1000+
backoff_coefficient=1,
1001+
)
1002+
result = await ctx.call_activity(eventually_succeeds_activity, retry_policy=retry_policy)
1003+
return result
1004+
1005+
def eventually_succeeds_activity(ctx, _):
1006+
nonlocal attempt_counter
1007+
attempt_counter += 1
1008+
if attempt_counter < 3:
1009+
raise RuntimeError(f"Attempt {attempt_counter} failed")
1010+
return f"Success on attempt {attempt_counter}"
1011+
1012+
with TaskHubGrpcWorker() as worker:
1013+
worker.add_orchestrator(successful_retry_orchestrator)
1014+
worker.add_activity(eventually_succeeds_activity)
1015+
worker.start()
1016+
worker.wait_for_ready(timeout=10)
1017+
1018+
client = TaskHubGrpcClient()
1019+
instance_id = client.schedule_new_orchestration(successful_retry_orchestrator)
1020+
state = client.wait_for_orchestration_completion(instance_id, timeout=30)
1021+
1022+
assert state is not None
1023+
assert state.runtime_status.name == "COMPLETED"
1024+
assert state.serialized_output == '"Success on attempt 3"'
1025+
assert attempt_counter == 3

tests/durabletask/test_orchestration_executor.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1705,6 +1705,104 @@ def orchestrator(ctx: task.OrchestrationContext, _):
17051705
assert complete_action.failureDetails.errorMessage.__contains__("Activity task #1 failed: boom")
17061706

17071707

1708+
def test_activity_generic_exception_is_retryable():
1709+
"""Verify that generic Exception is retryable by default (not treated as non-retryable)."""
1710+
1711+
def dummy_activity(ctx, _):
1712+
raise Exception("generic error")
1713+
1714+
def orchestrator(ctx: task.OrchestrationContext, _):
1715+
yield ctx.call_activity(
1716+
dummy_activity,
1717+
retry_policy=task.RetryPolicy(
1718+
first_retry_interval=timedelta(seconds=1),
1719+
max_number_of_attempts=3,
1720+
backoff_coefficient=1,
1721+
),
1722+
)
1723+
1724+
registry = worker._Registry()
1725+
name = registry.add_orchestrator(orchestrator)
1726+
1727+
current_timestamp = datetime.utcnow()
1728+
# First attempt fails
1729+
old_events = [
1730+
helpers.new_orchestrator_started_event(timestamp=current_timestamp),
1731+
helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None),
1732+
helpers.new_task_scheduled_event(1, task.get_name(dummy_activity)),
1733+
]
1734+
new_events = [
1735+
helpers.new_orchestrator_started_event(timestamp=current_timestamp),
1736+
helpers.new_task_failed_event(1, Exception("generic error")),
1737+
]
1738+
1739+
executor = worker._OrchestrationExecutor(registry, TEST_LOGGER)
1740+
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
1741+
actions = result.actions
1742+
# Should schedule a retry timer, not fail immediately
1743+
assert len(actions) == 1
1744+
assert actions[0].HasField("createTimer")
1745+
assert actions[0].id == 2
1746+
1747+
# Simulate the timer firing and activity being rescheduled
1748+
expected_fire_at = current_timestamp + timedelta(seconds=1)
1749+
old_events = old_events + new_events
1750+
current_timestamp = expected_fire_at
1751+
new_events = [
1752+
helpers.new_orchestrator_started_event(current_timestamp),
1753+
helpers.new_timer_fired_event(2, current_timestamp),
1754+
]
1755+
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
1756+
actions = result.actions
1757+
assert len(actions) == 2 # timer + rescheduled task
1758+
assert actions[1].HasField("scheduleTask")
1759+
assert actions[1].id == 1
1760+
1761+
# Second attempt also fails
1762+
old_events = old_events + new_events
1763+
new_events = [
1764+
helpers.new_orchestrator_started_event(current_timestamp),
1765+
helpers.new_task_failed_event(1, Exception("generic error")),
1766+
]
1767+
1768+
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
1769+
actions = result.actions
1770+
# Should schedule another retry timer
1771+
assert len(actions) == 3
1772+
assert actions[2].HasField("createTimer")
1773+
assert actions[2].id == 3
1774+
1775+
# Simulate the timer firing and activity being rescheduled
1776+
expected_fire_at = current_timestamp + timedelta(seconds=1)
1777+
old_events = old_events + new_events
1778+
current_timestamp = expected_fire_at
1779+
new_events = [
1780+
helpers.new_orchestrator_started_event(current_timestamp),
1781+
helpers.new_timer_fired_event(3, current_timestamp),
1782+
]
1783+
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
1784+
actions = result.actions
1785+
assert len(actions) == 3 # timer + rescheduled task
1786+
assert actions[1].HasField("scheduleTask")
1787+
assert actions[1].id == 1
1788+
1789+
# Third attempt fails - should exhaust retries
1790+
old_events = old_events + new_events
1791+
new_events = [
1792+
helpers.new_orchestrator_started_event(current_timestamp),
1793+
helpers.new_task_failed_event(1, Exception("generic error")),
1794+
]
1795+
1796+
result = executor.execute(TEST_INSTANCE_ID, old_events, new_events)
1797+
actions = result.actions
1798+
# Now should fail - no more retries
1799+
complete_action = get_and_validate_single_complete_orchestration_action(actions)
1800+
assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_FAILED
1801+
assert complete_action.failureDetails.errorMessage.__contains__(
1802+
"Activity task #1 failed: generic error"
1803+
)
1804+
1805+
17081806
def test_sub_orchestration_non_retryable_default_exception():
17091807
"""If sub-orchestrator fails with NonRetryableError, do not retry and fail immediately."""
17101808

0 commit comments

Comments
 (0)