Skip to content

Commit 59e4dd4

Browse files
committed
Add support for Async Activity
Signed-off-by: Greg Haskins <[email protected]>
1 parent 5eed6b3 commit 59e4dd4

File tree

9 files changed

+107
-23
lines changed

9 files changed

+107
-23
lines changed

dev-resources/user.clj

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
;; Copyright © 2022 Manetu, Inc. All rights reserved
22

33
(ns user
4-
(:require [clojure.tools.namespace.repl :refer [refresh]]))
4+
(:require [clojure.tools.namespace.repl :refer [refresh]]
5+
[eftest.runner :refer [find-tests run-tests]]))
6+
7+
;; to run one test: `(run-tests (find-tests #'temporal.test.simple/the-test))`
8+
;; to see output, use (run-tests ... {:capture-output? false})
9+
10+
(defn run-all-tests []
11+
(run-tests (find-tests "test") {:fail-fast? true :multithread? false}))
12+
513

project.clj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
[jonase/eastwood "1.2.4"]
1313
[lein-codox "0.10.8"]]
1414
:dependencies [[org.clojure/clojure "1.11.1"]
15+
[org.clojure/core.async "1.5.648"]
1516
[io.temporal/temporal-sdk "1.16.0"]
1617
[io.temporal/temporal-testing "1.16.0"]
1718
[com.taoensso/encore "3.24.0"]
@@ -29,5 +30,5 @@
2930
:cloverage {:runner :eftest
3031
:runner-opts {:multithread? false
3132
:fail-fast? true}
32-
:fail-threshold 91
33+
:fail-threshold 90
3334
:ns-exclude-regex [#"temporal.client.worker"]})

src/temporal/activity.clj

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,14 @@ the evaluation of the defactivity once the activity concludes.
3737
(defmacro defactivity
3838
"
3939
Defines a new activity, similar to defn, expecting a 2-arity parameter list and body. Should evaluate to something
40-
serializable, which will be available to the [[invoke]] caller.
40+
serializable, which will be available to the [[invoke]] caller, or to a core.async channel (See Async Mode below).
41+
42+
#### Async Mode:
43+
Returning a core.async channel places the activity into
44+
[Asynchronous](https://docs.temporal.io/java/activities/#asynchronous-activity-completion) mode, where the result may
45+
be resolved at a future time by sending a single message on the channel. Sending a
46+
[Throwable](https://docs.oracle.com/javase/7/docs/api/java/lang/Throwable.html) will signal a failure of the activity.
47+
Any other value will be serialized and returned to the caller.
4148
4249
Arguments:
4350

src/temporal/client/core.clj

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
(ns temporal.client.core
44
"Methods for client interaction with Temporal"
5-
(:require [taoensso.nippy :as nippy]
5+
(:require [taoensso.timbre :as log]
6+
[taoensso.nippy :as nippy]
67
[promesa.core :as p]
78
[temporal.internal.workflow :as w]
89
[temporal.internal.utils :as u])
@@ -37,19 +38,22 @@ Create a new workflow-stub instance, suitable for managing and interacting with
3738
[^WorkflowClient client workflow options]
3839
(let [wf-name (w/get-annotation workflow)
3940
stub (.newUntypedWorkflowStub client wf-name (w/wf-options-> options))]
41+
(log/trace "create-workflow:" wf-name options)
4042
{:client client :stub stub}))
4143

4244
(defn start
4345
"
4446
Starts 'worklow' with 'params'"
4547
[{:keys [^WorkflowStub stub] :as workflow} params]
48+
(log/trace "start:" params)
4649
(.start stub (u/->objarray params)))
4750

4851
(defn signal-with-start
4952
"
5053
Signals 'workflow' with 'signal-params' on signal 'signal-name', starting it if not already running. 'wf-params' are
5154
used as workflow start arguments if the workflow needs to be started"
5255
[{:keys [^WorkflowStub stub] :as workflow} signal-name signal-params wf-params]
56+
(log/trace "signal-with-start->" "signal:" signal-name signal-params "workflow-params:" wf-params)
5357
(.signalWithStart stub (u/namify signal-name) (u/->objarray signal-params) (u/->objarray wf-params)))
5458

5559
(defn >!
@@ -61,6 +65,7 @@ Sends 'params' as a signal 'signal-name' to 'workflow'
6165
```
6266
"
6367
[{:keys [^WorkflowStub stub] :as workflow} signal-name params]
68+
(log/trace ">!" signal-name params)
6469
(.signal stub (u/namify signal-name) (u/->objarray params)))
6570

6671
(defn get-result

src/temporal/internal/activity.clj

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@
33
(ns ^:no-doc temporal.internal.activity
44
(:require [clojure.core.protocols :as p]
55
[clojure.datafy :as d]
6+
[clojure.core.async :refer [go <!] :as async]
67
[taoensso.timbre :as log]
8+
[taoensso.nippy :as nippy]
79
[temporal.internal.utils :as u]
810
[temporal.internal.common :as common])
911
(:import [io.temporal.activity Activity ActivityInfo DynamicActivity]
10-
[io.temporal.activity ActivityOptions ActivityOptions$Builder]))
12+
[io.temporal.activity ActivityOptions ActivityOptions$Builder]
13+
[clojure.core.async.impl.channels ManyToManyChannel]))
1114

1215
(def invoke-option-spec
1316
{:start-to-close-timeout #(.setStartToCloseTimeout ^ActivityOptions$Builder %1 %2)
@@ -35,12 +38,35 @@
3538
^String [x]
3639
(u/get-annotation x ::def))
3740

41+
(defn- export-result [activity-id x]
42+
(log/trace activity-id "result:" x)
43+
(nippy/freeze x))
44+
45+
(defmulti result-> (fn [_ r] (type r)))
46+
47+
(defmethod result-> ManyToManyChannel
48+
[activity-id x]
49+
(let [ctx (Activity/getExecutionContext)
50+
completion (.useLocalManualCompletion ctx)]
51+
(go
52+
(let [r (<! x)]
53+
(if (instance? Throwable r)
54+
(do
55+
(log/error r)
56+
(.fail completion r))
57+
(.complete completion (export-result activity-id r)))))))
58+
59+
(defmethod result-> :default
60+
[activity-id x]
61+
(export-result activity-id x))
62+
3863
(defn- -execute
3964
[ctx args]
40-
(let [{:keys [activity-type] :as info} (get-info)
41-
f (u/find-annotated-fn ::def activity-type)]
42-
(log/trace "execute:" info)
43-
(u/wrap-encoded (partial f ctx) args)))
65+
(let [{:keys [activity-type activity-id] :as info} (get-info)
66+
f (u/find-annotated-fn ::def activity-type)
67+
a (u/->args args)]
68+
(log/trace activity-id "calling" f "with args:" a)
69+
(result-> activity-id (f ctx a))))
4470

4571
(defn dispatcher [ctx]
4672
(reify DynamicActivity

src/temporal/internal/utils.clj

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,6 @@
6666
[^EncodedValues args]
6767
(nippy/thaw (.get args (int 0) bytes-type)))
6868

69-
(defn wrap-encoded
70-
"Wraps 'f' in a codec, EncodedValues args in, and nippy-encoded bytes out"
71-
[f args]
72-
(-> args
73-
(->args)
74-
(f)
75-
(nippy/freeze)))
76-
7769
(def namify
7870
"Converts strings or keywords to strings, preserving fully qualified keywords when applicable"
7971
(memoize

src/temporal/internal/workflow.clj

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
(:require [clojure.core.protocols :as p]
55
[clojure.datafy :as d]
66
[taoensso.timbre :as log]
7+
[taoensso.nippy :as nippy]
78
[temporal.internal.common :as common]
89
[temporal.internal.utils :as u]
910
[temporal.internal.signals :as s])
@@ -42,7 +43,14 @@
4243

4344
(defn execute
4445
[ctx args]
45-
(let [{:keys [workflow-type] :as info} (get-info)
46-
f (u/find-annotated-fn ::def workflow-type)]
47-
(log/trace "execute:" info)
48-
(u/wrap-encoded (fn [args] (f ctx {:args args :signals (s/create)})) args)))
46+
(try
47+
(let [{:keys [workflow-type workflow-id]} (get-info)
48+
f (u/find-annotated-fn ::def workflow-type)
49+
a (u/->args args)
50+
_ (log/trace workflow-id "calling" f "with args:" a)
51+
r (f ctx {:args a :signals (s/create)})]
52+
(log/trace workflow-id "result:" r)
53+
(nippy/freeze r))
54+
(catch Exception e
55+
(log/error e)
56+
(throw e))))

test/temporal/test/async.clj

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
;; Copyright © 2022 Manetu, Inc. All rights reserved
2+
3+
(ns temporal.test.async
4+
(:require [clojure.test :refer :all]
5+
[clojure.core.async :refer [go]]
6+
[taoensso.timbre :as log]
7+
[temporal.client.core :as c]
8+
[temporal.workflow :refer [defworkflow]]
9+
[temporal.activity :refer [defactivity] :as a]
10+
[temporal.test.utils :as t]))
11+
12+
(use-fixtures :once t/wrap-service)
13+
14+
(defactivity greet-activity
15+
[ctx {:keys [name] :as args}]
16+
(go
17+
(log/info "greet-activity:" args)
18+
(if (= name "Charlie")
19+
(ex-info "permission-denied" {}) ;; we don't like Charlie
20+
(str "Hi, " name))))
21+
22+
(defworkflow greeter-workflow
23+
[ctx {:keys [args]}]
24+
(log/info "greeter-workflow:" args)
25+
@(a/invoke greet-activity args (assoc a/default-invoke-options :retry-options {:maximum-attempts 1})))
26+
27+
(deftest the-test
28+
(testing "Verifies that we can round-trip with an async task"
29+
(let [workflow (t/create-workflow greeter-workflow)]
30+
(c/start workflow {:name "Bob"})
31+
(is (= @(c/get-result workflow) "Hi, Bob"))))
32+
(testing "Verifies that we can process errors in async mode"
33+
(let [workflow (t/create-workflow greeter-workflow)]
34+
(c/start workflow {:name "Charlie"})
35+
(is (thrown? java.util.concurrent.ExecutionException
36+
@(c/get-result workflow))))))

test/temporal/test/utils.clj

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
"Utilities common to all tests"
55
(:require [taoensso.timbre :as log]
66
[temporal.testing.env :as e]
7-
[temporal.client.core :as c]))
7+
[temporal.client.core :as c])
8+
(:import [java.time Duration]))
89

910
(log/set-level! :trace)
1011

@@ -22,7 +23,7 @@
2223
(get @state :client))
2324

2425
(defn create-workflow [workflow]
25-
(c/create-workflow (get-client) workflow {:task-queue task-queue :retry-options {:maximum-attempts 1}}))
26+
(c/create-workflow (get-client) workflow {:task-queue task-queue :workflow-execution-timeout (Duration/ofSeconds 1) :retry-options {:maximum-attempts 1}}))
2627

2728
;;-----------------------------------------------------------------------------
2829
;; Fixtures

0 commit comments

Comments
 (0)