Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ test-unit:
test-e2e:
pytest -m e2e --verbose

coverage-clean:
rm -f .coverage .coverage.* coverage.xml

coverage-all: coverage-clean
pytest -m "not e2e" --cov=durabletask --cov-branch --cov-report=term-missing --cov-report=xml
pytest -m e2e --cov=durabletask --cov-branch --cov-report=term-missing --cov-report=xml --cov-append

install:
python3 -m pip install .

Expand All @@ -18,4 +25,4 @@ gen-proto:
python3 -m grpc_tools.protoc --proto_path=. --python_out=. --pyi_out=. --grpc_python_out=. ./durabletask/internal/orchestrator_service.proto
rm durabletask/internal/*.proto

.PHONY: init test-unit test-e2e gen-proto install
.PHONY: init test-unit test-e2e coverage-clean coverage-unit coverage-e2e coverage-all gen-proto install
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ Certain aspects like multi-app activities require the full dapr runtime to be ru
```shell
dapr init || true

dapr run --app-id test-app --dapr-grpc-port 4001 --components-path ./examples/components/
dapr run --app-id test-app --dapr-grpc-port 4001 --resources-path ./examples/components/
```

To run the E2E tests on a specific python version (eg: 3.11), run the following command from the project root:
Expand Down
1 change: 0 additions & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +0,0 @@
grpcio-tools==1.62.3 # 1.62.X is the latest version before protobuf 1.26.X is used which has breaking changes for Python # supports protobuf 6.x and aligns with generated code
76 changes: 72 additions & 4 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,28 @@ def __init__(
interceptors=interceptors,
options=channel_options,
)
self._channel = channel
self._stub = stubs.TaskHubSidecarServiceStub(channel)
self._logger = shared.get_logger("client", log_handler, log_formatter)

def __enter__(self):
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add context manager option for clean closing

return self

def __exit__(self, exc_type, exc, tb):
try:
self.close()
finally:
return False

def close(self) -> None:
"""Close the underlying gRPC channel."""
try:
# grpc.Channel.close() is idempotent
self._channel.close()
except Exception:
# Best-effort cleanup
pass

def schedule_new_orchestration(
self,
orchestrator: Union[task.Orchestrator[TInput, TOutput], str],
Expand Down Expand Up @@ -188,10 +207,59 @@ def wait_for_orchestration_completion(
) -> Optional[OrchestrationState]:
req = pb.GetInstanceRequest(instanceId=instance_id, getInputsAndOutputs=fetch_payloads)
try:
grpc_timeout = None if timeout == 0 else timeout
self._logger.info(
f"Waiting {'indefinitely' if timeout == 0 else f'up to {timeout}s'} for instance '{instance_id}' to complete."
)
# gRPC timeout mapping (pytest unit tests may pass None explicitly)
grpc_timeout = None if (timeout is None or timeout == 0) else timeout

# If timeout is None or 0, skip pre-checks/polling and call server-side wait directly
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

improves resource consumption on server side that might also lag behind client side

if timeout is None or timeout == 0:
self._logger.info(
f"Waiting {'indefinitely' if not timeout else f'up to {timeout}s'} for instance '{instance_id}' to complete."
)
res: pb.GetInstanceResponse = self._stub.WaitForInstanceCompletion(
req, timeout=grpc_timeout
)
state = new_orchestration_state(req.instanceId, res)
return state

# For positive timeout, best-effort pre-check and short polling to avoid long server waits
try:
# First check if the orchestration is already completed
current_state = self.get_orchestration_state(
instance_id, fetch_payloads=fetch_payloads
)
if current_state and current_state.runtime_status in [
OrchestrationStatus.COMPLETED,
OrchestrationStatus.FAILED,
OrchestrationStatus.TERMINATED,
]:
return current_state

# Poll for completion with exponential backoff to handle eventual consistency
import time

poll_timeout = min(timeout, 10)
poll_start = time.time()
poll_interval = 0.1

while time.time() - poll_start < poll_timeout:
current_state = self.get_orchestration_state(
instance_id, fetch_payloads=fetch_payloads
)

if current_state and current_state.runtime_status in [
OrchestrationStatus.COMPLETED,
OrchestrationStatus.FAILED,
OrchestrationStatus.TERMINATED,
]:
return current_state

time.sleep(poll_interval)
poll_interval = min(poll_interval * 1.5, 1.0) # Exponential backoff, max 1s
except Exception:
# Ignore pre-check/poll issues (e.g., mocked stubs in unit tests) and fall back
pass

self._logger.info(f"Waiting up to {timeout}s for instance '{instance_id}' to complete.")
res: pb.GetInstanceResponse = self._stub.WaitForInstanceCompletion(
req, timeout=grpc_timeout
)
Expand Down
224 changes: 224 additions & 0 deletions durabletask/deterministic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
"""
Copyright 2025 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

"""
Deterministic utilities for Durable Task workflows (async and generator).

This module provides deterministic alternatives to non-deterministic Python
functions, ensuring workflow replay consistency across different executions.
It is shared by both the asyncio authoring model and the generator-based model.
"""

import hashlib
import random
import string as _string
import uuid
from collections.abc import Sequence
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Optional, TypeVar


@dataclass
class DeterminismSeed:
"""Seed data for deterministic operations."""

instance_id: str
orchestration_unix_ts: int

def to_int(self) -> int:
"""Convert seed to integer for PRNG initialization."""
combined = f"{self.instance_id}:{self.orchestration_unix_ts}"
hash_bytes = hashlib.sha256(combined.encode("utf-8")).digest()
return int.from_bytes(hash_bytes[:8], byteorder="big")


def derive_seed(instance_id: str, orchestration_time: datetime) -> int:
"""
Derive a deterministic seed from instance ID and orchestration time.
"""
ts = int(orchestration_time.timestamp())
return DeterminismSeed(instance_id=instance_id, orchestration_unix_ts=ts).to_int()


def deterministic_random(instance_id: str, orchestration_time: datetime) -> random.Random:
"""
Create a deterministic random number generator.
"""
seed = derive_seed(instance_id, orchestration_time)
return random.Random(seed)


def deterministic_uuid4(rnd: random.Random) -> uuid.UUID:
"""
Generate a deterministic UUID4 using the provided random generator.

Note: This is deprecated in favor of deterministic_uuid_v5 which matches
the .NET implementation for cross-language compatibility.
"""
bytes_ = bytes(rnd.randrange(0, 256) for _ in range(16))
bytes_list = list(bytes_)
bytes_list[6] = (bytes_list[6] & 0x0F) | 0x40 # Version 4
bytes_list[8] = (bytes_list[8] & 0x3F) | 0x80 # Variant bits
return uuid.UUID(bytes=bytes(bytes_list))


def deterministic_uuid_v5(instance_id: str, current_datetime: datetime, counter: int) -> uuid.UUID:
"""
Generate a deterministic UUID v5 matching the .NET implementation.

This implementation matches the durabletask-dotnet NewGuid() method:
https://github.com/microsoft/durabletask-dotnet/blob/main/src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs

Args:
instance_id: The orchestration instance ID.
current_datetime: The current orchestration datetime (frozen during replay).
counter: The per-call counter (starts at 0 on each replay).

Returns:
A deterministic UUID v5 that will be the same across replays.
"""
# DNS namespace UUID - same as .NET DnsNamespaceValue
namespace = uuid.UUID("9e952958-5e33-4daf-827f-2fa12937b875")

# Build name matching .NET format: instanceId_datetime_counter
# Using isoformat() which produces ISO 8601 format similar to .NET's ToString("o")
name = f"{instance_id}_{current_datetime.isoformat()}_{counter}"

# Generate UUID v5 (SHA-1 based, matching .NET)
return uuid.uuid5(namespace, name)


class DeterministicContextMixin:
"""
Mixin providing deterministic helpers for workflow contexts.

Assumes the inheriting class exposes `instance_id` and `current_utc_datetime` attributes.

This implementation matches the .NET durabletask SDK approach with an explicit
counter for UUID generation that resets on each replay.
"""

def __init__(self, *args, **kwargs):
"""Initialize the mixin with UUID and timestamp counters."""
super().__init__(*args, **kwargs)
# Counter for deterministic UUID generation (matches .NET newGuidCounter)
# This counter resets to 0 on each replay, ensuring determinism
self._uuid_counter: int = 0
# Counter for deterministic timestamp sequencing (resets on replay)
self._timestamp_counter: int = 0

def now(self) -> datetime:
"""Alias for deterministic current_utc_datetime."""
return self.current_utc_datetime # type: ignore[attr-defined]

def random(self) -> random.Random:
"""Return a PRNG seeded deterministically from instance id and orchestration time."""
rnd = deterministic_random(
self.instance_id, # type: ignore[attr-defined]
self.current_utc_datetime, # type: ignore[attr-defined]
)
# Mark as deterministic for asyncio sandbox detector whitelisting of bound methods (randint, random)
try:
rnd._dt_deterministic = True
except Exception:
pass
return rnd

def uuid4(self) -> uuid.UUID:
"""
Return a deterministically generated UUID v5 with explicit counter.
https://www.sohamkamani.com/uuid-versions-explained/#v5-non-random-uuids

This matches the .NET implementation's NewGuid() method which uses:
- Instance ID
- Current UTC datetime (frozen during replay)
- Per-call counter (resets to 0 on each replay)

The counter ensures multiple calls produce different UUIDs while maintaining
determinism across replays.
"""
# Lazily initialize counter if not set by __init__ (for compatibility)
if not hasattr(self, "_uuid_counter"):
self._uuid_counter = 0

result = deterministic_uuid_v5(
self.instance_id, # type: ignore[attr-defined]
self.current_utc_datetime, # type: ignore[attr-defined]
self._uuid_counter,
)
self._uuid_counter += 1
return result

def new_guid(self) -> uuid.UUID:
"""Alias for uuid4 for API parity with other SDKs."""
return self.uuid4()

def random_string(self, length: int, *, alphabet: Optional[str] = None) -> str:
"""Return a deterministically generated random string of the given length."""
if length < 0:
raise ValueError("length must be non-negative")
chars = alphabet if alphabet is not None else (_string.ascii_letters + _string.digits)
if not chars:
raise ValueError("alphabet must not be empty")
rnd = self.random()
size = len(chars)
return "".join(chars[rnd.randrange(0, size)] for _ in range(length))

def random_int(self, min_value: int = 0, max_value: int = 2**31 - 1) -> int:
"""Return a deterministic random integer in the specified range."""
if min_value > max_value:
raise ValueError("min_value must be <= max_value")
rnd = self.random()
return rnd.randint(min_value, max_value)

T = TypeVar("T")

def random_choice(self, sequence: Sequence[T]) -> T:
"""Return a deterministic random element from a non-empty sequence."""
if not sequence:
raise IndexError("Cannot choose from empty sequence")
rnd = self.random()
return rnd.choice(sequence)

def now_with_sequence(self) -> datetime:
"""
Return deterministic timestamp with microsecond increment per call.

Each call returns: current_utc_datetime + (counter * 1 microsecond)

This provides ordered, unique timestamps for tracing/telemetry while maintaining
determinism across replays. The counter resets to 0 on each replay (similar to
_uuid_counter pattern).

Perfect for preserving event ordering within a workflow without requiring activities.

Returns:
datetime: Deterministic timestamp that increments on each call

Example:
```python
def workflow(ctx):
t1 = ctx.now_with_sequence() # 2024-01-01 12:00:00.000000
result = yield ctx.call_activity(some_activity, input="data")
t2 = ctx.now_with_sequence() # 2024-01-01 12:00:00.000001
# t1 < t2, preserving order for telemetry
```
"""
offset = timedelta(microseconds=self._timestamp_counter)
self._timestamp_counter += 1
return self.current_utc_datetime + offset # type: ignore[attr-defined]

def current_utc_datetime_with_sequence(self):
"""Alias for now_with_sequence for API parity with other SDKs."""
return self.now_with_sequence()
2 changes: 1 addition & 1 deletion durabletask/internal/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def get_logger(
# Add a default log handler if none is provided
if log_handler is None:
log_handler = logging.StreamHandler()
log_handler.setLevel(logging.INFO)
log_handler.setLevel(logging.DEBUG)
logger.handlers.append(log_handler)

# Set a default log formatter to our handler if none is provided
Expand Down
Loading
Loading