Skip to content

Commit 6ac3b22

Browse files
committed
Enhance worker-options
Signed-off-by: Greg Haskins <[email protected]>
1 parent c8e9e87 commit 6ac3b22

File tree

2 files changed

+50
-9
lines changed

2 files changed

+50
-9
lines changed

src/temporal/client/worker.clj

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
[temporal.internal.activity :as a]
77
[temporal.internal.workflow :as w]
88
[temporal.internal.utils :as u])
9-
(:import [io.temporal.worker Worker WorkerFactory]
9+
(:import [io.temporal.worker Worker WorkerFactory WorkerOptions WorkerOptions$Builder]
1010
[temporal.internal.dispatcher DynamicWorkflowProxy]
1111
[io.temporal.workflow DynamicWorkflow]))
1212

@@ -31,11 +31,22 @@ Initializes a worker instance, suitable for real connections or unit-testing wit
3131
"
3232
Options for configuring workers (See [[start]])
3333
34-
| Value | Mandatory | Description | Type | Default |
35-
| ------------ | ----------- | ----------------------------------------------------------------- | ---------------- | ------- |
36-
| :task-queue | y | The name of the task-queue for this worker instance to listen on. | String / keyword | |
37-
| :ctx | | An opaque handle that is passed back as the first argument of [[temporal.workflow/defworkflow]] and [[temporal.activity/defactivity]], useful for passing state such as database or network connections. | <any> | nil |
38-
| :dispatch | | An optional map explicitly setting the dispatch table | See below | All visible activities/workers are automatically registered |
34+
| Value | Mandatory | Description | Type | Default |
35+
| ------------ | ----------- | ----------------------------------------------------------------- | ---------------- | ------- |
36+
| :task-queue | y | The name of the task-queue for this worker instance to listen on. | String / keyword | |
37+
| :ctx | | An opaque handle that is passed back as the first argument of [[temporal.workflow/defworkflow]] and [[temporal.activity/defactivity]], useful for passing state such as database or network connections. | <any> | nil |
38+
| :dispatch | | An optional map explicitly setting the dispatch table | See below | All visible activities/workers are automatically registered |
39+
| :max-concurrent-activity-task-pollers | | Number of simultaneous poll requests on activity task queue. Consider incrementing if the worker is not throttled due to `MaxActivitiesPerSecond` or `MaxConcurrentActivityExecutionSize` options and still cannot keep up with the request rate. | int | 5 |
40+
| :max-concurrent-activity-execution-size | | Maximum number of activities executed in parallel. | int | 200 |
41+
| :max-concurrent-local-activity-execution-size | | Maximum number of local activities executed in parallel. | int | 200 |
42+
| :max-concurrent-workflow-task-pollers | | Number of simultaneous poll requests on workflow task queue. | int | 2 |
43+
| :max-concurrent-workflow-task-execution-size | | Maximum number of simultaneously executed workflow tasks. | int | 200 |
44+
| :default-deadlock-detection-timeout | | Time period in ms that will be used to detect workflow deadlock. | long | 1000 |
45+
| :default-heartbeat-throttle-interval | | Default amount of time between sending each pending heartbeat. | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | 30s |
46+
| :max-heartbeat-throttle-interval | | Maximum amount of time between sending each pending heartbeat. | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | 60s |
47+
| :local-activity-worker-only | | Worker should only handle workflow tasks and local activities. | boolean | false |
48+
| :max-taskqueue-activities-per-second | | Sets the rate limiting on number of activities per second. | double | 0.0 (unlimited) |
49+
| :max-workers-activities-per-second | | Maximum number of activities started per second. | double | 0.0 (unlimited) |
3950
4051
#### dispatch-table
4152
@@ -53,7 +64,21 @@ Options for configuring workers (See [[start]])
5364
```
5465
5566
"
56-
nil)
67+
{:max-concurrent-activity-task-pollers #(.setMaxConcurrentActivityTaskPollers ^WorkerOptions$Builder %1 %2)
68+
:max-concurrent-activity-execution-size #(.setMaxConcurrentActivityExecutionSize ^WorkerOptions$Builder %1 %2)
69+
:max-concurrent-local-activity-execution-size #(.setMaxConcurrentLocalActivityExecutionSize ^WorkerOptions$Builder %1 %2)
70+
:max-concurrent-workflow-task-pollers #(.setMaxConcurrentWorkflowTaskPollers ^WorkerOptions$Builder %1 %2)
71+
:max-concurrent-workflow-task-execution-size #(.setMaxConcurrentWorkflowTaskExecutionSize ^WorkerOptions$Builder %1 %2)
72+
:default-deadlock-detection-timeout #(.setDefaultDeadlockDetectionTimeout ^WorkerOptions$Builder %1 %2)
73+
:default-heartbeat-throttle-interval #(.setDefaultHeartbeatThrottleInterval ^WorkerOptions$Builder %1 %2)
74+
:max-heartbeat-throttle-interval #(.setMaxHeartbeatThrottleInterval ^WorkerOptions$Builder %1 %2)
75+
:local-activity-worker-only #(.setLocalActivityWorkerOnly ^WorkerOptions$Builder %1 %2)
76+
:max-taskqueue-activities-per-second #(.setMaxTaskQueueActivitiesPerSecond ^WorkerOptions$Builder %1 %2)
77+
:max-workers-activities-per-second #(.setMaxWorkerActivitiesPerSecond ^WorkerOptions$Builder %1 %2)})
78+
79+
(defn ^:no-doc worker-options->
80+
^WorkerOptions [params]
81+
(u/build (WorkerOptions/newBuilder (WorkerOptions/getDefaultInstance)) worker-options params))
5782

5883
(defn start
5984
"
@@ -70,7 +95,7 @@ Arguments:
7095
"
7196
[client {:keys [task-queue] :as options}]
7297
(let [factory (WorkerFactory/newInstance client)
73-
worker (.newWorker factory (u/namify task-queue))]
98+
worker (.newWorker factory (u/namify task-queue) (worker-options-> options))]
7499
(init worker options)
75100
(.start factory)
76101
{:factory factory :worker worker}))

test/temporal/test/types.clj

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33
(ns temporal.test.types
44
(:require [clojure.test :refer :all]
55
[temporal.client.core :as client]
6-
[temporal.internal.workflow :as workflow])
6+
[temporal.internal.workflow :as workflow]
7+
[temporal.client.worker :as worker])
78
(:import [java.time Duration]
89
[io.grpc Grpc InsecureChannelCredentials Metadata]
910
[io.grpc.netty.shaded.io.grpc.netty GrpcSslContexts]))
@@ -50,3 +51,18 @@
5051
(let [options {:target "foo:1234" :namespace "default"}]
5152
(is (some? (client/stub-options-> options)))
5253
(is (some? (client/client-options-> options))))))
54+
55+
(deftest worker-options
56+
(testing "Verify that our worker-options work"
57+
(let [x (worker/worker-options-> {:max-concurrent-activity-task-pollers 2
58+
:max-concurrent-activity-execution-size 10
59+
:max-concurrent-local-activity-execution-size 2
60+
:max-concurrent-workflow-task-pollers 2
61+
:max-concurrent-workflow-task-execution-size 10
62+
:default-deadlock-detection-timeout 1000
63+
:default-heartbeat-throttle-interval (Duration/ofSeconds 10)
64+
:max-heartbeat-throttle-interval (Duration/ofSeconds 10)
65+
:local-activity-worker-only false
66+
:max-taskqueue-activities-per-second 1.0
67+
:max-workers-activities-per-second 1.0})]
68+
(is (-> x (.isLocalActivityWorkerOnly) false?)))))

0 commit comments

Comments
 (0)