Skip to content

Commit bd0629b

Browse files
committed
Add support for RetryOptions
Signed-off-by: Greg Haskins <[email protected]>
1 parent 74ffdba commit bd0629b

File tree

9 files changed

+66
-32
lines changed

9 files changed

+66
-32
lines changed

project.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,5 @@
2929
:cloverage {:runner :eftest
3030
:runner-opts {:multithread? false
3131
:fail-fast? true}
32-
:fail-threshold 92
32+
:fail-threshold 91
3333
:ns-exclude-regex [#"temporal.client.worker"]})

src/temporal/activity.clj

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,8 @@
88
[temporal.internal.activity :as a]
99
[temporal.internal.utils :refer [->promise] :as u])
1010
(:import [io.temporal.workflow Workflow]
11-
[io.temporal.activity ActivityOptions ActivityOptions$Builder]
1211
[java.time Duration]))
1312

14-
(def ^:no-doc invoke-option-spec
15-
{:start-to-close-timeout #(.setStartToCloseTimeout ^ActivityOptions$Builder %1 %2)})
16-
17-
(defn- invoke-options->
18-
^ActivityOptions [params]
19-
(u/build (ActivityOptions/newBuilder) invoke-option-spec params))
20-
2113
(def ^:no-doc default-invoke-options {:start-to-close-timeout (Duration/ofSeconds 3)})
2214

2315
(defn invoke
@@ -36,7 +28,8 @@ the evaluation of the defactivity once the activity concludes.
3628
([activity params] (invoke activity params default-invoke-options))
3729
([activity params options]
3830
(let [act-name (a/get-annotation activity)
39-
stub (Workflow/newUntypedActivityStub (invoke-options-> options))]
31+
stub (Workflow/newUntypedActivityStub (a/invoke-options-> options))]
32+
(log/trace "invoke:" activity "with" params options)
4033
(-> (->promise
4134
(.executeAsync stub act-name u/bytes-type (u/->objarray params)))
4235
(p/then nippy/thaw)))))

src/temporal/client/core.clj

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
[promesa.core :as p]
77
[temporal.internal.workflow :as w]
88
[temporal.internal.utils :as u])
9-
(:import [io.temporal.client WorkflowClient WorkflowStub WorkflowOptions WorkflowOptions$Builder]
9+
(:import [io.temporal.client WorkflowClient WorkflowStub]
1010
[io.temporal.serviceclient WorkflowServiceStubs]))
1111

1212
(defn create-client
@@ -18,14 +18,6 @@ workflow clients (See [[create-workflow]]).
1818
(let [service (WorkflowServiceStubs/newLocalServiceStubs)]
1919
(WorkflowClient/newInstance service)))
2020

21-
(def ^:no-doc wf-option-spec
22-
{:task-queue #(.setTaskQueue ^WorkflowOptions$Builder %1 (u/namify %2))
23-
:workflow-id #(.setWorkflowId ^WorkflowOptions$Builder %1 %2)})
24-
25-
(defn- wf-options->
26-
^WorkflowOptions [params]
27-
(u/build (WorkflowOptions/newBuilder) wf-option-spec params))
28-
2921
(defn create-workflow
3022
"
3123
Create a new workflow-stub instance, suitable for managing and interacting with a workflow through it's lifecycle.
@@ -44,7 +36,7 @@ Create a new workflow-stub instance, suitable for managing and interacting with
4436
"
4537
[^WorkflowClient client workflow options]
4638
(let [wf-name (w/get-annotation workflow)
47-
stub (.newUntypedWorkflowStub client wf-name (wf-options-> options))]
39+
stub (.newUntypedWorkflowStub client wf-name (w/wf-options-> options))]
4840
{:client client :stub stub}))
4941

5042
(defn start

src/temporal/internal/activity.clj

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,18 @@
44
(:require [clojure.core.protocols :as p]
55
[clojure.datafy :as d]
66
[taoensso.timbre :as log]
7-
[temporal.internal.utils :as u])
8-
(:import [io.temporal.activity Activity ActivityInfo DynamicActivity]))
7+
[temporal.internal.utils :as u]
8+
[temporal.internal.common :as common])
9+
(:import [io.temporal.activity Activity ActivityInfo DynamicActivity]
10+
[io.temporal.activity ActivityOptions ActivityOptions$Builder]))
11+
12+
(def invoke-option-spec
13+
{:start-to-close-timeout #(.setStartToCloseTimeout ^ActivityOptions$Builder %1 %2)
14+
:retry-options #(.setRetryOptions %1 (common/retry-options-> %2))})
15+
16+
(defn invoke-options->
17+
^ActivityOptions [params]
18+
(u/build (ActivityOptions/newBuilder) invoke-option-spec params))
919

1020
(extend-protocol p/Datafiable
1121
ActivityInfo

src/temporal/internal/common.clj

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
;; Copyright © 2022 Manetu, Inc. All rights reserved
2+
3+
(ns temporal.internal.common
4+
(:require [temporal.internal.utils :as u])
5+
(:import [io.temporal.common RetryOptions RetryOptions$Builder]))
6+
7+
(def retry-option-spec
8+
{:initial-interval #(.setInitialInterval ^RetryOptions$Builder %1 %2)
9+
:backoff-coefficient #(.setBackoffCoefficient ^RetryOptions$Builder %1 %2)
10+
:maximum-attempts #(.setMaximumAttempts ^RetryOptions$Builder %1 %2)
11+
:maximum-interval #(.setMaximumInterval ^RetryOptions$Builder %1 %2)
12+
:do-not-retry #(.setDoNotRetry ^RetryOptions$Builder %1 (into-array String %2))})
13+
14+
(defn retry-options->
15+
^RetryOptions [params]
16+
(u/build (RetryOptions/newBuilder) retry-option-spec params))
17+

src/temporal/internal/utils.clj

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
(def ^Class bytes-type (Class/forName "[B"))
1919

2020
(defn build [builder spec params]
21-
(doseq [[key value] params]
22-
(log/trace "building" builder "->" key "=" value)
23-
((get spec key) builder value))
24-
(.build builder))
21+
(try
22+
(doseq [[key value] params]
23+
(log/trace "building" builder "->" key "=" value)
24+
((get spec key) builder value))
25+
(.build builder)
26+
(catch Exception e
27+
(log/error e))))
2528

2629
(defn get-annotation
2730
"Retrieves metadata annotation 'a' from 'v'"
@@ -102,8 +105,16 @@
102105
(apply [_ x1 x2 x3 x4 x5 x6]
103106
(f x1 x2 x3 x4 x5 x6))))
104107

105-
(defn ->promise
106-
[^Promise p]
108+
(defn promise-impl
109+
[f]
107110
(p/create
108111
(fn [resolve reject]
109-
(resolve (.get p)))))
112+
(try
113+
(let [^Promise p (f)]
114+
(resolve (.get p)))
115+
(catch Exception e
116+
(reject e))))))
117+
118+
(defmacro ->promise
119+
[& body]
120+
`(promise-impl (fn [] (do ~@body))))

src/temporal/internal/workflow.clj

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,20 @@
44
(:require [clojure.core.protocols :as p]
55
[clojure.datafy :as d]
66
[taoensso.timbre :as log]
7+
[temporal.internal.common :as common]
78
[temporal.internal.utils :as u]
89
[temporal.internal.signals :as s])
9-
(:import [io.temporal.workflow Workflow WorkflowInfo]))
10+
(:import [io.temporal.workflow Workflow WorkflowInfo]
11+
[io.temporal.client WorkflowOptions WorkflowOptions$Builder]))
12+
13+
(def wf-option-spec
14+
{:task-queue #(.setTaskQueue ^WorkflowOptions$Builder %1 (u/namify %2))
15+
:workflow-id #(.setWorkflowId ^WorkflowOptions$Builder %1 %2)
16+
:retry-options #(.setRetryOptions %1 (common/retry-options-> %2))})
17+
18+
(defn wf-options->
19+
^WorkflowOptions [params]
20+
(u/build (WorkflowOptions/newBuilder) wf-option-spec params))
1021

1122
(extend-protocol p/Datafiable
1223
WorkflowInfo

test/temporal/test/simple.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
(defworkflow greeter-workflow
1919
[ctx {:keys [args]}]
2020
(log/info "greeter-workflow:" args)
21-
@(a/invoke greet-activity args))
21+
@(a/invoke greet-activity args (assoc a/default-invoke-options :retry-options {:maximum-attempts 1})))
2222

2323
(deftest the-test
2424
(testing "Verifies that we can round-trip through start"

test/temporal/test/utils.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
(get @state :client))
2323

2424
(defn create-workflow [workflow]
25-
(c/create-workflow (get-client) workflow {:task-queue task-queue}))
25+
(c/create-workflow (get-client) workflow {:task-queue task-queue :retry-options {:maximum-attempts 1}}))
2626

2727
;;-----------------------------------------------------------------------------
2828
;; Fixtures

0 commit comments

Comments
 (0)