Skip to content

Commit 5912557

Browse files
committed
Use anyio to wrap the async client into sync
Signed-off-by: Albert Callarisa <[email protected]>
1 parent 7f89f6a commit 5912557

File tree

6 files changed

+226
-232
lines changed

6 files changed

+226
-232
lines changed

durabletask/aio/client.py

Lines changed: 82 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33

44
import logging
55
import uuid
6+
from dataclasses import dataclass
67
from datetime import datetime
7-
from typing import Any, Optional, Sequence, Union
8+
from enum import Enum
9+
from typing import Any, Optional, Sequence, TypeVar, Union
810

911
import grpc
1012
from google.protobuf import wrappers_pb2
@@ -16,13 +18,85 @@
1618
from durabletask import task
1719
from durabletask.aio.internal.grpc_interceptor import DefaultClientInterceptorImpl
1820
from durabletask.aio.internal.shared import ClientInterceptor, get_grpc_aio_channel
19-
from durabletask.client import (
20-
OrchestrationState,
21-
OrchestrationStatus,
22-
TInput,
23-
TOutput,
24-
new_orchestration_state,
25-
)
21+
22+
TInput = TypeVar("TInput")
23+
TOutput = TypeVar("TOutput")
24+
25+
26+
class OrchestrationStatus(Enum):
27+
"""The status of an orchestration instance."""
28+
29+
RUNNING = pb.ORCHESTRATION_STATUS_RUNNING
30+
COMPLETED = pb.ORCHESTRATION_STATUS_COMPLETED
31+
FAILED = pb.ORCHESTRATION_STATUS_FAILED
32+
TERMINATED = pb.ORCHESTRATION_STATUS_TERMINATED
33+
CONTINUED_AS_NEW = pb.ORCHESTRATION_STATUS_CONTINUED_AS_NEW
34+
PENDING = pb.ORCHESTRATION_STATUS_PENDING
35+
SUSPENDED = pb.ORCHESTRATION_STATUS_SUSPENDED
36+
37+
def __str__(self):
38+
return helpers.get_orchestration_status_str(self.value)
39+
40+
41+
@dataclass
42+
class OrchestrationState:
43+
instance_id: str
44+
name: str
45+
runtime_status: OrchestrationStatus
46+
created_at: datetime
47+
last_updated_at: datetime
48+
serialized_input: Optional[str]
49+
serialized_output: Optional[str]
50+
serialized_custom_status: Optional[str]
51+
failure_details: Optional[task.FailureDetails]
52+
53+
def raise_if_failed(self):
54+
if self.failure_details is not None:
55+
raise OrchestrationFailedError(
56+
f"Orchestration '{self.instance_id}' failed: {self.failure_details.message}",
57+
self.failure_details,
58+
)
59+
60+
61+
class OrchestrationFailedError(Exception):
62+
def __init__(self, message: str, failure_details: task.FailureDetails):
63+
super().__init__(message)
64+
self._failure_details = failure_details
65+
66+
@property
67+
def failure_details(self):
68+
return self._failure_details
69+
70+
71+
def new_orchestration_state(
72+
instance_id: str, res: pb.GetInstanceResponse
73+
) -> Optional[OrchestrationState]:
74+
if not res.exists:
75+
return None
76+
77+
state = res.orchestrationState
78+
79+
failure_details = None
80+
if state.failureDetails.errorMessage != "" or state.failureDetails.errorType != "":
81+
failure_details = task.FailureDetails(
82+
state.failureDetails.errorMessage,
83+
state.failureDetails.errorType,
84+
state.failureDetails.stackTrace.value
85+
if not helpers.is_empty(state.failureDetails.stackTrace)
86+
else None,
87+
)
88+
89+
return OrchestrationState(
90+
instance_id,
91+
state.name,
92+
OrchestrationStatus(state.orchestrationStatus),
93+
state.createdTimestamp.ToDatetime(),
94+
state.lastUpdatedTimestamp.ToDatetime(),
95+
state.input.value if not helpers.is_empty(state.input) else None,
96+
state.output.value if not helpers.is_empty(state.output) else None,
97+
state.customStatus.value if not helpers.is_empty(state.customStatus) else None,
98+
failure_details,
99+
)
26100

27101

28102
class AsyncTaskHubGrpcClient:

0 commit comments

Comments
 (0)