Skip to content

Commit c513506

Browse files
committed
add non-retryable, fix retry bug, and add shutdown helpers to reduce noisy logs
Signed-off-by: Filinto Duran <[email protected]>
1 parent 7f89f6a commit c513506

File tree

10 files changed

+714
-138
lines changed

10 files changed

+714
-138
lines changed

README.md

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,97 @@ Orchestrations can be continued as new using the `continue_as_new` API. This API
126126

127127
Orchestrations can be suspended using the `suspend_orchestration` client API and will remain suspended until resumed using the `resume_orchestration` client API. A suspended orchestration will stop processing new events, but will continue to buffer any that happen to arrive until resumed, ensuring that no data is lost. An orchestration can also be terminated using the `terminate_orchestration` client API. Terminated orchestrations will stop processing new events and will discard any buffered events.
128128

129-
### Retry policies (TODO)
129+
### Retry policies
130130

131131
Orchestrations can specify retry policies for activities and sub-orchestrations. These policies control how many times and how frequently an activity or sub-orchestration will be retried in the event of a transient error.
132132

133+
#### Creating a retry policy
134+
135+
```python
136+
from datetime import timedelta
137+
from durabletask import task
138+
139+
retry_policy = task.RetryPolicy(
140+
first_retry_interval=timedelta(seconds=1), # Initial delay before first retry
141+
max_number_of_attempts=5, # Maximum total attempts (includes first attempt)
142+
backoff_coefficient=2.0, # Exponential backoff multiplier (must be >= 1)
143+
max_retry_interval=timedelta(seconds=30), # Cap on retry delay
144+
retry_timeout=timedelta(minutes=5), # Total time limit for all retries (optional)
145+
)
146+
```
147+
148+
**Notes:**
149+
- `max_number_of_attempts` **includes the initial attempt**. For example, `max_number_of_attempts=5` means 1 initial attempt + up to 4 retries.
150+
- `retry_timeout` is optional. If omitted or set to `None`, retries continue until `max_number_of_attempts` is reached.
151+
- `backoff_coefficient` controls exponential backoff: delay = `first_retry_interval * (backoff_coefficient ^ retry_number)`, capped by `max_retry_interval`.
152+
- `non_retryable_error_types` (optional) can specify additional exception types to treat as non-retryable (e.g., `[ValueError, TypeError]`). `NonRetryableError` is always non-retryable regardless of this setting.
153+
154+
#### Using retry policies
155+
156+
Apply retry policies to activities or sub-orchestrations:
157+
158+
```python
159+
def my_orchestrator(ctx: task.OrchestrationContext, input):
160+
# Retry an activity
161+
result = yield ctx.call_activity(my_activity, input=data, retry_policy=retry_policy)
162+
163+
# Retry a sub-orchestration
164+
result = yield ctx.call_sub_orchestrator(child_orchestrator, input=data, retry_policy=retry_policy)
165+
```
166+
167+
#### Non-retryable errors
168+
169+
For errors that should not be retried (e.g., validation failures, permanent errors), raise a `NonRetryableError`:
170+
171+
```python
172+
from durabletask.task import NonRetryableError
173+
174+
def my_activity(ctx: task.ActivityContext, input):
175+
if input is None:
176+
# This error will bypass retry logic and fail immediately
177+
raise NonRetryableError("Input cannot be None")
178+
179+
# Transient errors (network, timeouts, etc.) will be retried
180+
return call_external_service(input)
181+
```
182+
183+
Even with a retry policy configured, `NonRetryableError` will fail immediately without retrying.
184+
185+
#### Error type matching behavior
186+
187+
**Important:** Error type matching uses **exact class name comparison**, not `isinstance()` checks. This is because exception objects are serialized to gRPC protobuf messages, where only the class name (as a string) survives serialization.
188+
189+
**Key implications:**
190+
191+
- **Not inheritance-aware**: If you specify `ValueError` in `non_retryable_error_types`, it will only match exceptions with the exact class name `"ValueError"`. A custom subclass like `CustomValueError(ValueError)` will NOT match.
192+
- **Workaround**: List all exception types explicitly, including subclasses you want to handle.
193+
- **Built-in exception**: `NonRetryableError` is always treated as non-retryable, matched by the name `"NonRetryableError"`.
194+
195+
**Example:**
196+
197+
```python
198+
from datetime import timedelta
199+
from durabletask import task
200+
201+
# Custom exception hierarchy
202+
class ValidationError(ValueError):
203+
pass
204+
205+
# This policy ONLY matches exact "ValueError" by name
206+
retry_policy = task.RetryPolicy(
207+
first_retry_interval=timedelta(seconds=1),
208+
max_number_of_attempts=3,
209+
non_retryable_error_types=[ValueError] # Won't match ValidationError subclass!
210+
)
211+
212+
# To handle both, list them explicitly:
213+
retry_policy = task.RetryPolicy(
214+
first_retry_interval=timedelta(seconds=1),
215+
max_number_of_attempts=3,
216+
non_retryable_error_types=[ValueError, ValidationError] # Both converted to name strings
217+
)
218+
```
219+
133220
## Getting Started
134221

135222
### Prerequisites
@@ -194,7 +281,7 @@ Certain aspects like multi-app activities require the full dapr runtime to be ru
194281
```shell
195282
dapr init || true
196283

197-
dapr run --app-id test-app --dapr-grpc-port 4001 --components-path ./examples/components/
284+
dapr run --app-id test-app --dapr-grpc-port 4001 --resources-path ./examples/components/
198285
```
199286

200287
To run the E2E tests on a specific python version (eg: 3.11), run the following command from the project root:

durabletask/client.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,28 @@ def __init__(
127127
interceptors=interceptors,
128128
options=channel_options,
129129
)
130+
self._channel = channel
130131
self._stub = stubs.TaskHubSidecarServiceStub(channel)
131132
self._logger = shared.get_logger("client", log_handler, log_formatter)
132133

134+
def __enter__(self):
135+
return self
136+
137+
def __exit__(self, exc_type, exc, tb):
138+
try:
139+
self.close()
140+
finally:
141+
return False
142+
143+
def close(self) -> None:
144+
"""Close the underlying gRPC channel."""
145+
try:
146+
# grpc.Channel.close() is idempotent
147+
self._channel.close()
148+
except Exception:
149+
# Best-effort cleanup
150+
pass
151+
133152
def schedule_new_orchestration(
134153
self,
135154
orchestrator: Union[task.Orchestrator[TInput, TOutput], str],

durabletask/task.py

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,29 @@ class OrchestrationStateError(Exception):
233233
pass
234234

235235

236+
class NonRetryableError(Exception):
237+
"""Exception indicating the operation should not be retried.
238+
239+
If an activity or sub-orchestration raises this exception, retry logic will be
240+
bypassed and the failure will be returned immediately to the orchestrator.
241+
"""
242+
243+
pass
244+
245+
246+
def is_error_non_retryable(error_type: str, policy: RetryPolicy) -> bool:
247+
""" Checks whether an error type is non-retryable."""
248+
is_non_retryable = False
249+
if error_type == "NonRetryableError":
250+
is_non_retryable = True
251+
elif (
252+
policy.non_retryable_error_types is not None
253+
and error_type in policy.non_retryable_error_types
254+
):
255+
is_non_retryable = True
256+
return is_non_retryable
257+
258+
236259
class Task(ABC, Generic[T]):
237260
"""Abstract base class for asynchronous tasks in a durable orchestration."""
238261

@@ -397,7 +420,7 @@ def compute_next_delay(self) -> Optional[timedelta]:
397420
next_delay_f = min(
398421
next_delay_f, self._retry_policy.max_retry_interval.total_seconds()
399422
)
400-
return timedelta(seconds=next_delay_f)
423+
return timedelta(seconds=next_delay_f)
401424

402425
return None
403426

@@ -490,6 +513,7 @@ def __init__(
490513
backoff_coefficient: Optional[float] = 1.0,
491514
max_retry_interval: Optional[timedelta] = None,
492515
retry_timeout: Optional[timedelta] = None,
516+
non_retryable_error_types: Optional[list[Union[str, type]]] = None,
493517
):
494518
"""Creates a new RetryPolicy instance.
495519
@@ -505,6 +529,11 @@ def __init__(
505529
The maximum retry interval to use for any retry attempt.
506530
retry_timeout : Optional[timedelta]
507531
The maximum amount of time to spend retrying the operation.
532+
non_retryable_error_types : Optional[list[Union[str, type]]]
533+
A list of exception type names or classes that should not be retried.
534+
If a failure's error type matches any of these, the task fails immediately.
535+
The built-in NonRetryableError is always treated as non-retryable regardless
536+
of this setting.
508537
"""
509538
# validate inputs
510539
if first_retry_interval < timedelta(seconds=0):
@@ -523,6 +552,16 @@ def __init__(
523552
self._backoff_coefficient = backoff_coefficient
524553
self._max_retry_interval = max_retry_interval
525554
self._retry_timeout = retry_timeout
555+
# Normalize non-retryable error type names to a set of strings
556+
names: Optional[set[str]] = None
557+
if non_retryable_error_types:
558+
names = set[str]()
559+
for t in non_retryable_error_types:
560+
if isinstance(t, str) and t:
561+
names.add(t)
562+
elif isinstance(t, type):
563+
names.add(t.__name__)
564+
self._non_retryable_error_types = names
526565

527566
@property
528567
def first_retry_interval(self) -> timedelta:
@@ -549,6 +588,15 @@ def retry_timeout(self) -> Optional[timedelta]:
549588
"""The maximum amount of time to spend retrying the operation."""
550589
return self._retry_timeout
551590

591+
@property
592+
def non_retryable_error_types(self) -> Optional[set[str]]:
593+
"""Set of error type names that should not be retried.
594+
595+
Comparison is performed against the errorType string provided by the
596+
backend (typically the exception class name).
597+
"""
598+
return self._non_retryable_error_types
599+
552600

553601
def get_name(fn: Callable) -> str:
554602
"""Returns the name of the provided function"""

0 commit comments

Comments
 (0)