-
Notifications
You must be signed in to change notification settings - Fork 3
Add basic pollers #9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
import collections | ||
|
||
from grpc.aio import Metadata | ||
from grpc.aio import UnaryUnaryClientInterceptor, ClientCallDetails | ||
|
||
|
||
class _ClientCallDetails( | ||
collections.namedtuple( | ||
"_ClientCallDetails", ("method", "timeout", "metadata", "credentials", "wait_for_ready") | ||
), | ||
ClientCallDetails, | ||
): | ||
pass | ||
|
||
class MetadataInterceptor(UnaryUnaryClientInterceptor): | ||
def __init__(self, metadata: Metadata): | ||
self._metadata = metadata | ||
|
||
async def intercept_unary_unary(self, continuation, client_call_details: ClientCallDetails, request): | ||
return await continuation(self._replace_details(client_call_details), request) | ||
|
||
|
||
def _replace_details(self, client_call_details: ClientCallDetails) -> ClientCallDetails: | ||
metadata = client_call_details.metadata | ||
if metadata is None: | ||
metadata = self._metadata | ||
else: | ||
metadata += self._metadata | ||
|
||
return _ClientCallDetails( | ||
method=client_call_details.method, | ||
timeout=client_call_details.timeout, | ||
metadata=metadata, | ||
credentials=client_call_details.credentials, | ||
wait_for_ready=client_call_details.wait_for_ready, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
import os | ||
import socket | ||
from typing import TypedDict | ||
|
||
from cadence.api.v1.service_worker_pb2_grpc import WorkerAPIStub | ||
from grpc.aio import Channel | ||
|
||
|
||
class ClientOptions(TypedDict, total=False): | ||
domain: str | ||
identity: str | ||
|
||
class Client: | ||
def __init__(self, channel: Channel, options: ClientOptions): | ||
self._channel = channel | ||
self._worker_stub = WorkerAPIStub(channel) | ||
self._options = options | ||
self._identity = options["identity"] if "identity" in options else f"{os.getpid()}@{socket.gethostname()}" | ||
|
||
|
||
@property | ||
def domain(self) -> str: | ||
return self._options["domain"] | ||
|
||
@property | ||
def identity(self) -> str: | ||
return self._identity | ||
|
||
@property | ||
def worker_stub(self) -> WorkerAPIStub: | ||
return self._worker_stub | ||
|
||
|
||
async def close(self): | ||
await self._channel.close() | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
import asyncio | ||
|
||
from grpc.aio import insecure_channel, Metadata | ||
|
||
from cadence.client import Client, ClientOptions | ||
from cadence._internal.rpc.metadata import MetadataInterceptor | ||
from cadence.worker import Worker | ||
|
||
|
||
async def main(): | ||
# TODO - Hide all this | ||
metadata = Metadata() | ||
metadata["rpc-service"] = "cadence-frontend" | ||
metadata["rpc-encoding"] = "proto" | ||
metadata["rpc-caller"] = "nate" | ||
async with insecure_channel("localhost:7833", interceptors=[MetadataInterceptor(metadata)]) as channel: | ||
client = Client(channel, ClientOptions(domain="foo")) | ||
worker = Worker(client, "task_list") | ||
await worker.run() | ||
|
||
if __name__ == '__main__': | ||
asyncio.run(main()) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
|
||
|
||
from ._worker import ( | ||
Worker, | ||
WorkerOptions | ||
) | ||
|
||
__all__ = [ | ||
"Worker", | ||
"WorkerOptions" | ||
] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
import asyncio | ||
from typing import Optional | ||
|
||
from cadence.api.v1.common_pb2 import Failure | ||
from cadence.api.v1.service_worker_pb2 import PollForActivityTaskResponse, PollForActivityTaskRequest, \ | ||
RespondActivityTaskFailedRequest | ||
from cadence.api.v1.tasklist_pb2 import TaskList, TaskListKind | ||
from cadence.client import Client | ||
from cadence.worker._types import WorkerOptions, _LONG_POLL_TIMEOUT | ||
from cadence.worker._poller import Poller | ||
|
||
|
||
class ActivityWorker: | ||
def __init__(self, client: Client, task_list: str, options: WorkerOptions): | ||
self._client = client | ||
self._task_list = task_list | ||
self._identity = options["identity"] | ||
permits = asyncio.Semaphore(options["max_concurrent_activity_execution_size"]) | ||
self._poller = Poller[PollForActivityTaskResponse](options["activity_task_pollers"], permits, self._poll, self._execute) | ||
# TODO: Local dispatch, local activities, actually running activities, etc | ||
|
||
async def run(self): | ||
await self._poller.run() | ||
|
||
async def _poll(self) -> Optional[PollForActivityTaskResponse]: | ||
task: PollForActivityTaskResponse = await self._client.worker_stub.PollForActivityTask(PollForActivityTaskRequest( | ||
domain=self._client.domain, | ||
task_list=TaskList(name=self._task_list,kind=TaskListKind.TASK_LIST_KIND_NORMAL), | ||
identity=self._identity, | ||
), timeout=_LONG_POLL_TIMEOUT) | ||
|
||
if task.task_token: | ||
return task | ||
else: | ||
return None | ||
|
||
async def _execute(self, task: PollForActivityTaskResponse): | ||
await self._client.worker_stub.RespondActivityTaskFailed(RespondActivityTaskFailedRequest( | ||
task_token=task.task_token, | ||
identity=self._identity, | ||
failure=Failure(reason='error', details=b'not implemented'), | ||
)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
import asyncio | ||
from typing import Optional | ||
|
||
from cadence.api.v1.common_pb2 import Payload | ||
from cadence.api.v1.service_worker_pb2 import PollForDecisionTaskRequest, PollForDecisionTaskResponse, \ | ||
RespondDecisionTaskFailedRequest | ||
from cadence.api.v1.tasklist_pb2 import TaskList, TaskListKind | ||
from cadence.api.v1.workflow_pb2 import DecisionTaskFailedCause | ||
from cadence.client import Client | ||
from cadence.worker._poller import Poller | ||
from cadence.worker._types import WorkerOptions, _LONG_POLL_TIMEOUT | ||
|
||
|
||
class DecisionWorker: | ||
def __init__(self, client: Client, task_list: str, options: WorkerOptions): | ||
self._client = client | ||
self._task_list = task_list | ||
self._identity = options["identity"] | ||
permits = asyncio.Semaphore(options["max_concurrent_decision_task_execution_size"]) | ||
self._poller = Poller[PollForDecisionTaskResponse](options["decision_task_pollers"], permits, self._poll, self._execute) | ||
# TODO: Sticky poller, actually running workflows, etc. | ||
|
||
async def run(self): | ||
await self._poller.run() | ||
|
||
async def _poll(self) -> Optional[PollForDecisionTaskResponse]: | ||
task: PollForDecisionTaskResponse = await self._client.worker_stub.PollForDecisionTask(PollForDecisionTaskRequest( | ||
domain=self._client.domain, | ||
task_list=TaskList(name=self._task_list,kind=TaskListKind.TASK_LIST_KIND_NORMAL), | ||
identity=self._identity, | ||
), timeout=_LONG_POLL_TIMEOUT) | ||
|
||
if task.task_token: | ||
return task | ||
else: | ||
return None | ||
|
||
|
||
async def _execute(self, task: PollForDecisionTaskResponse): | ||
await self._client.worker_stub.RespondDecisionTaskFailed(RespondDecisionTaskFailedRequest( | ||
task_token=task.task_token, | ||
cause=DecisionTaskFailedCause.DECISION_TASK_FAILED_CAUSE_UNHANDLED_DECISION, | ||
identity=self._identity, | ||
details=Payload(data=b'not implemented') | ||
)) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
import asyncio | ||
import logging | ||
from typing import Callable, TypeVar, Generic, Awaitable, Optional | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
T = TypeVar('T') | ||
|
||
class Poller(Generic[T]): | ||
def __init__(self, num_tasks: int, permits: asyncio.Semaphore, poll: Callable[[], Awaitable[Optional[T]]], callback: Callable[[T], Awaitable[None]]): | ||
self._num_tasks = num_tasks | ||
self._permits = permits | ||
self._poll = poll | ||
self._callback = callback | ||
self._background_tasks: set[asyncio.Task[None]] = set() | ||
pass | ||
|
||
async def run(self): | ||
try: | ||
async with asyncio.TaskGroup() as tg: | ||
for i in range(self._num_tasks): | ||
tg.create_task(self._poll_loop()) | ||
except asyncio.CancelledError: | ||
pass | ||
|
||
|
||
async def _poll_loop(self): | ||
while True: | ||
try: | ||
await self._poll_and_dispatch() | ||
except asyncio.CancelledError as e: | ||
raise e | ||
except Exception: | ||
logger.exception('Exception while polling') | ||
|
||
|
||
async def _poll_and_dispatch(self): | ||
await self._permits.acquire() | ||
try: | ||
task = await self._poll() | ||
except Exception as e: | ||
self._permits.release() | ||
raise e | ||
|
||
if task is None: | ||
self._permits.release() | ||
return | ||
|
||
# Need to store a reference to the async task or it may be garbage collected | ||
scheduled = asyncio.create_task(self._execute_callback(task)) | ||
self._background_tasks.add(scheduled) | ||
scheduled.add_done_callback(self._background_tasks.remove) | ||
|
||
async def _execute_callback(self, task: T): | ||
try: | ||
await self._callback(task) | ||
except Exception: | ||
logger.exception('Exception during callback') | ||
finally: | ||
self._permits.release() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
from typing import TypedDict | ||
|
||
|
||
class WorkerOptions(TypedDict, total=False): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we would want to expose WorkerOptions to client like the go client here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They're included in the init.py so they can be imported like:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should not expect customer to search for WorkerOptions definition in _types package, we should define those in some public files so that it's easy for customer to understand it's usage. |
||
max_concurrent_activity_execution_size: int | ||
max_concurrent_decision_task_execution_size: int | ||
task_list_activities_per_second: float | ||
# Remove these in favor of introducing automatic scaling prior to release | ||
activity_task_pollers: int | ||
decision_task_pollers: int | ||
disable_workflow_worker: bool | ||
disable_activity_worker: bool | ||
identity: str | ||
|
||
_DEFAULT_WORKER_OPTIONS: WorkerOptions = { | ||
"max_concurrent_activity_execution_size": 1000, | ||
"max_concurrent_decision_task_execution_size": 1000, | ||
"task_list_activities_per_second": 0.0, | ||
"activity_task_pollers": 2, | ||
"decision_task_pollers": 2, | ||
"disable_workflow_worker": False, | ||
"disable_activity_worker": False, | ||
} | ||
|
||
_LONG_POLL_TIMEOUT = 60.0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
import asyncio | ||
import uuid | ||
from typing import Unpack | ||
|
||
from cadence.client import Client | ||
from cadence.worker._activity import ActivityWorker | ||
from cadence.worker._decision import DecisionWorker | ||
from cadence.worker._types import WorkerOptions, _DEFAULT_WORKER_OPTIONS | ||
|
||
|
||
class Worker: | ||
|
||
def __init__(self, client: Client, task_list: str, **kwargs: Unpack[WorkerOptions]): | ||
self._client = client | ||
self._task_list = task_list | ||
|
||
options = WorkerOptions(**kwargs) | ||
_validate_and_copy_defaults(client, task_list, options) | ||
self._options = options | ||
self._activity_worker = ActivityWorker(client, task_list, options) | ||
self._decision_worker = DecisionWorker(client, task_list, options) | ||
|
||
|
||
async def run(self): | ||
async with asyncio.TaskGroup() as tg: | ||
if not self._options["disable_workflow_worker"]: | ||
tg.create_task(self._decision_worker.run()) | ||
if not self._options["disable_activity_worker"]: | ||
tg.create_task(self._activity_worker.run()) | ||
|
||
|
||
|
||
def _validate_and_copy_defaults(client: Client, task_list: str, options: WorkerOptions): | ||
if "identity" not in options: | ||
options["identity"] = f"{client.identity}@{task_list}@{uuid.uuid4()}" | ||
|
||
# TODO: More validation | ||
|
||
for (key, value) in _DEFAULT_WORKER_OPTIONS.items(): | ||
if key not in options: | ||
# noinspection PyTypedDict | ||
options[key] = value |
Uh oh!
There was an error while loading. Please reload this page.