Skip to content

Commit 018ba0a

Browse files
committed
Add basic pollers
1 parent 1548145 commit 018ba0a

File tree

13 files changed

+532
-0
lines changed

13 files changed

+532
-0
lines changed

cadence/_internal/rpc/__init__.py

Whitespace-only changes.

cadence/_internal/rpc/metadata.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import collections
2+
from typing import Callable, Union
3+
4+
from grpc.aio import Metadata
5+
from grpc.aio import UnaryUnaryClientInterceptor, ClientCallDetails
6+
from grpc.aio import UnaryUnaryCall
7+
8+
9+
class _ClientCallDetails(
10+
collections.namedtuple(
11+
"_ClientCallDetails", ("method", "timeout", "metadata", "credentials", "wait_for_ready")
12+
),
13+
ClientCallDetails,
14+
):
15+
pass
16+
17+
class MetadataInterceptor(UnaryUnaryClientInterceptor):
18+
def __init__(self, metadata: Metadata):
19+
self._metadata = metadata
20+
21+
async def intercept_unary_unary(self, continuation, client_call_details: ClientCallDetails, request):
22+
return await continuation(self._replace_details(client_call_details), request)
23+
24+
25+
def _replace_details(self, client_call_details: ClientCallDetails) -> ClientCallDetails:
26+
metadata = client_call_details.metadata
27+
if metadata is None:
28+
metadata = self._metadata
29+
else:
30+
metadata += self._metadata
31+
32+
return _ClientCallDetails(
33+
method=client_call_details.method,
34+
timeout=client_call_details.timeout,
35+
metadata=metadata,
36+
credentials=client_call_details.credentials,
37+
wait_for_ready=client_call_details.wait_for_ready,
38+
)

cadence/client.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import os
2+
import socket
3+
from typing import TypedDict, Unpack, Callable
4+
5+
import grpc
6+
7+
from cadence.api.v1.service_worker_pb2_grpc import WorkerAPIStub
8+
from grpc.aio import Channel, ClientInterceptor
9+
10+
11+
class ClientOptions(TypedDict, total=False):
12+
domain: str
13+
identity: str
14+
15+
class Client:
16+
def __init__(self, channel: Channel, options: ClientOptions):
17+
self._channel = channel
18+
self._worker_stub = WorkerAPIStub(channel)
19+
self._options = options
20+
self._identity = options["identity"] if "identity" in options else f"{os.getpid()}@{socket.gethostname()}"
21+
22+
23+
@property
24+
def domain(self) -> str:
25+
return self._options["domain"]
26+
27+
@property
28+
def identity(self) -> str:
29+
return self._identity
30+
31+
@property
32+
def worker_stub(self) -> WorkerAPIStub:
33+
return self._worker_stub
34+
35+
36+
async def close(self):
37+
await self._channel.close()
38+
39+

cadence/sample/client_example.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import asyncio
2+
3+
from grpc.aio import insecure_channel, Metadata
4+
5+
from cadence.client import Client, ClientOptions
6+
from cadence._internal.rpc.metadata import MetadataInterceptor
7+
from cadence.worker import Worker
8+
9+
10+
async def main():
11+
# TODO - Hide all this
12+
metadata = Metadata()
13+
metadata["rpc-service"] = "cadence-frontend"
14+
metadata["rpc-encoding"] = "proto"
15+
metadata["rpc-caller"] = "nate"
16+
async with insecure_channel("localhost:7833", interceptors=[MetadataInterceptor(metadata)]) as channel:
17+
client = Client(channel, ClientOptions(domain="foo"))
18+
worker = Worker(client, "task_list")
19+
await worker.run()
20+
21+
if __name__ == '__main__':
22+
asyncio.run(main())

cadence/worker/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
2+
3+
from ._worker import (
4+
Worker,
5+
WorkerOptions
6+
)
7+
8+
__all__ = [
9+
"Worker",
10+
"WorkerOptions"
11+
]

cadence/worker/_activity.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import asyncio
2+
from typing import Optional
3+
4+
from cadence.api.v1.common_pb2 import Failure
5+
from cadence.api.v1.service_worker_pb2 import PollForActivityTaskResponse, PollForActivityTaskRequest, \
6+
RespondActivityTaskFailedRequest
7+
from cadence.api.v1.tasklist_pb2 import TaskList, TaskListKind
8+
from cadence.client import Client
9+
from cadence.worker._types import WorkerOptions, _LONG_POLL_TIMEOUT
10+
from cadence.worker._poller import Poller
11+
12+
13+
class ActivityWorker:
14+
def __init__(self, client: Client, task_list: str, options: WorkerOptions):
15+
self._client = client
16+
self._task_list = task_list
17+
self._identity = options["identity"]
18+
permits = asyncio.Semaphore(options["max_concurrent_activity_execution_size"])
19+
self._poller = Poller[PollForActivityTaskResponse](options["activity_task_pollers"], permits, self._poll, self._execute)
20+
# TODO: Local dispatch, local activities, actually running activities, etc
21+
22+
async def run(self):
23+
await self._poller.run()
24+
25+
async def _poll(self) -> Optional[PollForActivityTaskResponse]:
26+
task: PollForActivityTaskResponse = await self._client.worker_stub.PollForActivityTask(PollForActivityTaskRequest(
27+
domain=self._client.domain,
28+
task_list=TaskList(name=self._task_list,kind=TaskListKind.TASK_LIST_KIND_NORMAL),
29+
identity=self._identity,
30+
), timeout=_LONG_POLL_TIMEOUT)
31+
32+
if task.task_token:
33+
return task
34+
else:
35+
return None
36+
37+
async def _execute(self, task: PollForActivityTaskResponse):
38+
await self._client.worker_stub.RespondActivityTaskFailed(RespondActivityTaskFailedRequest(
39+
task_token=task.task_token,
40+
identity=self._identity,
41+
failure=Failure(reason='error', details=b'not implemented'),
42+
))
43+

cadence/worker/_decision.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import asyncio
2+
from typing import Optional
3+
4+
from cadence.api.v1.common_pb2 import Payload
5+
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskRequest, PollForDecisionTaskResponse, \
6+
RespondDecisionTaskFailedRequest
7+
from cadence.api.v1.tasklist_pb2 import TaskList, TaskListKind
8+
from cadence.api.v1.workflow_pb2 import DecisionTaskFailedCause
9+
from cadence.client import Client
10+
from cadence.worker._poller import Poller
11+
from cadence.worker._types import WorkerOptions, _LONG_POLL_TIMEOUT
12+
13+
14+
class DecisionWorker:
15+
def __init__(self, client: Client, task_list: str, options: WorkerOptions):
16+
self._client = client
17+
self._task_list = task_list
18+
self._identity = options["identity"]
19+
permits = asyncio.Semaphore(options["max_concurrent_decision_task_execution_size"])
20+
self._poller = Poller[PollForDecisionTaskResponse](options["decision_task_pollers"], permits, self._poll, self._execute)
21+
# TODO: Sticky poller, actually running workflows, etc.
22+
23+
async def run(self):
24+
await self._poller.run()
25+
26+
async def _poll(self) -> Optional[PollForDecisionTaskResponse]:
27+
task: PollForDecisionTaskResponse = await self._client.worker_stub.PollForDecisionTask(PollForDecisionTaskRequest(
28+
domain=self._client.domain,
29+
task_list=TaskList(name=self._task_list,kind=TaskListKind.TASK_LIST_KIND_NORMAL),
30+
identity=self._identity,
31+
), timeout=_LONG_POLL_TIMEOUT)
32+
33+
if task.task_token:
34+
return task
35+
else:
36+
return None
37+
38+
39+
async def _execute(self, task: PollForDecisionTaskResponse):
40+
await self._client.worker_stub.RespondDecisionTaskFailed(RespondDecisionTaskFailedRequest(
41+
task_token=task.task_token,
42+
cause=DecisionTaskFailedCause.DECISION_TASK_FAILED_CAUSE_UNHANDLED_DECISION,
43+
identity=self._identity,
44+
details=Payload(data=b'not implemented')
45+
))
46+

cadence/worker/_poller.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import asyncio
2+
import logging
3+
from typing import Callable, TypeVar, Generic, Awaitable, Optional
4+
5+
logger = logging.getLogger(__name__)
6+
7+
T = TypeVar('T')
8+
9+
class Poller(Generic[T]):
10+
def __init__(self, num_tasks: int, permits: asyncio.Semaphore, poll: Callable[[], Awaitable[Optional[T]]], callback: Callable[[T], Awaitable[None]]):
11+
self._num_tasks = num_tasks
12+
self._permits = permits
13+
self._poll = poll
14+
self._callback = callback
15+
self._background_tasks: set[asyncio.Task[None]] = set()
16+
pass
17+
18+
async def run(self):
19+
try:
20+
async with asyncio.TaskGroup() as tg:
21+
for i in range(self._num_tasks):
22+
tg.create_task(self._poll_loop())
23+
except asyncio.CancelledError:
24+
pass
25+
26+
27+
async def _poll_loop(self):
28+
while True:
29+
try:
30+
await self._poll_and_dispatch()
31+
except asyncio.CancelledError as e:
32+
raise e
33+
except Exception as e:
34+
logger.exception('Exception while polling')
35+
36+
37+
async def _poll_and_dispatch(self):
38+
await self._permits.acquire()
39+
try:
40+
task = await self._poll()
41+
except Exception as e:
42+
self._permits.release()
43+
raise e
44+
45+
if task is None:
46+
self._permits.release()
47+
return
48+
49+
# Need to store a reference to the async task or it may be garbage collected
50+
scheduled = asyncio.create_task(self._execute_callback(task))
51+
self._background_tasks.add(scheduled)
52+
scheduled.add_done_callback(self._background_tasks.remove)
53+
54+
async def _execute_callback(self, task: T):
55+
try:
56+
await self._callback(task)
57+
except Exception as e:
58+
logger.exception('Exception during callback')
59+
finally:
60+
self._permits.release()

cadence/worker/_types.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from typing import TypedDict
2+
3+
4+
class WorkerOptions(TypedDict, total=False):
5+
max_concurrent_activity_execution_size: int
6+
max_concurrent_decision_task_execution_size: int
7+
task_list_activities_per_second: float
8+
# Remove these in favor of introducing automatic scaling prior to release
9+
activity_task_pollers: int
10+
decision_task_pollers: int
11+
disable_workflow_worker: bool
12+
disable_activity_worker: bool
13+
identity: str
14+
15+
_DEFAULT_WORKER_OPTIONS: WorkerOptions = {
16+
"max_concurrent_activity_execution_size": 1000,
17+
"max_concurrent_decision_task_execution_size": 1000,
18+
"task_list_activities_per_second": 0.0,
19+
"activity_task_pollers": 2,
20+
"decision_task_pollers": 2,
21+
"disable_workflow_worker": False,
22+
"disable_activity_worker": False,
23+
}
24+
25+
_LONG_POLL_TIMEOUT = 60.0

cadence/worker/_worker.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import asyncio
2+
import uuid
3+
from typing import Unpack
4+
5+
from cadence.client import Client
6+
from cadence.worker._activity import ActivityWorker
7+
from cadence.worker._decision import DecisionWorker
8+
from cadence.worker._types import WorkerOptions, _DEFAULT_WORKER_OPTIONS
9+
10+
11+
class Worker:
12+
13+
def __init__(self, client: Client, task_list: str, **kwargs: Unpack[WorkerOptions]):
14+
self._client = client
15+
self._task_list = task_list
16+
17+
options = WorkerOptions(**kwargs)
18+
_validate_and_copy_defaults(client, task_list, options)
19+
self._options = options
20+
self._activity_worker = ActivityWorker(client, task_list, options)
21+
self._decision_worker = DecisionWorker(client, task_list, options)
22+
23+
24+
async def run(self):
25+
async with asyncio.TaskGroup() as tg:
26+
if not self._options["disable_workflow_worker"]:
27+
tg.create_task(self._decision_worker.run())
28+
if not self._options["disable_activity_worker"]:
29+
tg.create_task(self._activity_worker.run())
30+
31+
32+
33+
def _validate_and_copy_defaults(client: Client, task_list: str, options: WorkerOptions):
34+
if "identity" not in options:
35+
options["identity"] = f"{client.identity}@{task_list}@{uuid.uuid4()}"
36+
37+
# TODO: More validation
38+
39+
for (key, value) in _DEFAULT_WORKER_OPTIONS.items():
40+
if key not in options:
41+
# noinspection PyTypedDict
42+
options[key] = value

0 commit comments

Comments
 (0)