Skip to content

Commit fd3dc4c

Browse files
committed
Misc updates and bug fixes
- Convert to nippy for serialization - Add poll and >! for signals - Fix bug in is-empty - Clean up unit-tests Signed-off-by: Greg Haskins <[email protected]>
1 parent 332938d commit fd3dc4c

17 files changed

+205
-78
lines changed

project.clj

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@
1414
:dependencies [[org.clojure/clojure "1.11.1"]
1515
[io.temporal/temporal-sdk "1.16.0"]
1616
[io.temporal/temporal-testing "1.16.0"]
17+
[com.taoensso/encore "3.24.0"]
1718
[com.taoensso/timbre "5.2.1"]
19+
[com.taoensso/nippy "3.2.0"]
1820
[funcool/promesa "8.0.450"]]
1921
:repl-options {:init-ns user}
2022
:aot [temporal.internal.dispatcher]
@@ -27,5 +29,5 @@
2729
:cloverage {:runner :eftest
2830
:runner-opts {:multithread? false
2931
:fail-fast? true}
30-
:fail-threshold 91
32+
:fail-threshold 92
3133
:ns-exclude-regex [#"temporal.client.worker"]})

src/temporal/activity.clj

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
(ns temporal.activity
44
"Methods for defining and invoking activity tasks"
5-
(:require [clojure.edn :as edn]
6-
[taoensso.timbre :as log]
5+
(:require [taoensso.timbre :as log]
6+
[taoensso.nippy :as nippy]
77
[promesa.core :as p]
88
[temporal.internal.activity :as a]
99
[temporal.internal.utils :refer [->promise] :as u])
@@ -38,13 +38,13 @@ the evaluation of the defactivity once the activity concludes.
3838
(let [act-name (a/get-annotation activity)
3939
stub (Workflow/newUntypedActivityStub (invoke-options-> options))]
4040
(-> (->promise
41-
(.executeAsync stub act-name String (u/->objarray params)))
42-
(p/then edn/read-string)))))
41+
(.executeAsync stub act-name u/bytes-type (u/->objarray params)))
42+
(p/then nippy/thaw)))))
4343

4444
(defmacro defactivity
4545
"
4646
Defines a new activity, similar to defn, expecting a 2-arity parameter list and body. Should evaluate to something
47-
serializable with [prn-str](https://clojuredocs.org/clojure.core/prn-str), which will be available to the [[invoke]] caller.
47+
serializable, which will be available to the [[invoke]] caller.
4848
4949
Arguments:
5050

src/temporal/client/core.clj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
(ns temporal.client.core
44
"Methods for client interaction with Temporal"
5-
(:require [clojure.edn :as edn]
5+
(:require [taoensso.nippy :as nippy]
66
[promesa.core :as p]
77
[temporal.internal.workflow :as w]
88
[temporal.internal.utils :as u])
@@ -87,8 +87,8 @@ defworkflow once the workflow concludes.
8787
```
8888
"
8989
[{:keys [^WorkflowStub stub] :as workflow}]
90-
(-> (.getResultAsync stub String)
91-
(p/then edn/read-string)))
90+
(-> (.getResultAsync stub u/bytes-type)
91+
(p/then nippy/thaw)))
9292

9393
(defn cancel
9494
"

src/temporal/core.clj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,7 @@
1616
(Workflow/await (->supplier pred)))
1717
([duration pred]
1818
(Workflow/await duration (->supplier pred))))
19+
20+
(defn gen-uuid
21+
[]
22+
(str (Workflow/randomUUID)))

src/temporal/internal/signals.clj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
(fn [s]
1818
(let [ch (or (get-ch s signal-name)
1919
(ConcurrentLinkedQueue.))]
20+
(log/trace "saving signal:" signal-name payload)
2021
(.add ch payload)
2122
(assoc s signal-name ch)))))
2223

src/temporal/internal/utils.clj

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
(ns ^:no-doc temporal.internal.utils
44
(:require [clojure.string :as string]
5-
[clojure.edn :as edn]
65
[taoensso.timbre :as log]
6+
[taoensso.nippy :as nippy]
77
[promesa.core :as p])
88
(:import [io.temporal.common.converter EncodedValues]
99
[io.temporal.workflow Promise
@@ -15,6 +15,8 @@
1515
Functions$Func5
1616
Functions$Func6]))
1717

18+
(def ^Class bytes-type (Class/forName "[B"))
19+
1820
(defn build [builder spec params]
1921
(doseq [[key value] params]
2022
(log/trace "building" builder "->" key "=" value)
@@ -52,22 +54,22 @@
5254
(str "." sym)))
5355

5456
(defn ->objarray
55-
"Converts a collection of serializable elements to an array of Objects, suitable for many Temporal APIs"
56-
[coll]
57-
(into-array Object [(prn-str coll)]))
57+
"Serializes x to an array of Objects, suitable for many Temporal APIs"
58+
[x]
59+
(into-array Object [(nippy/freeze x)]))
5860

5961
(defn ->args
6062
"Decodes EncodedValues to native clojure data type. Assumes all data is in the first element"
6163
[^EncodedValues args]
62-
(edn/read-string (.get args (int 0) ^Class String)))
64+
(nippy/thaw (.get args (int 0) bytes-type)))
6365

6466
(defn wrap-encoded
65-
"Wraps 'f' in a codec, EncodedValues args in, and edn out"
67+
"Wraps 'f' in a codec, EncodedValues args in, and nippy-encoded bytes out"
6668
[f args]
6769
(-> args
6870
(->args)
6971
(f)
70-
(prn-str)))
72+
(nippy/freeze)))
7173

7274
(def namify
7375
"Converts strings or keywords to strings, preserving fully qualified keywords when applicable"

src/temporal/internal/workflow.clj

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
(defn execute
2727
[ctx args]
2828
(let [{:keys [workflow-type] :as info} (get-info)
29-
f (u/find-annotated-fn ::def workflow-type)
30-
signals (s/create)]
29+
f (u/find-annotated-fn ::def workflow-type)]
3130
(log/trace "execute:" info)
32-
(u/wrap-encoded (fn [args] (f ctx {:args args :signals signals})) args)))
31+
(u/wrap-encoded (fn [args] (f ctx {:args args :signals (s/create)})) args)))

src/temporal/signals.clj

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,43 @@
55
(:require [taoensso.timbre :as log]
66
[temporal.core :as core]
77
[temporal.internal.utils :as u]
8-
[temporal.internal.signals :as s]))
8+
[temporal.internal.signals :as s])
9+
(:import [io.temporal.workflow Workflow]))
910

1011
(defn is-empty?
1112
"Returns 'true' if 'signal-name' either doesn't exist or exists but has no pending messages"
1213
[state signal-name]
13-
(let [ch (s/get-ch @state signal-name)]
14-
(or (nil? ch) (not (.isEmpty ch)))))
14+
(let [signal-name (u/namify signal-name)
15+
ch (s/get-ch @state signal-name)
16+
r (or (nil? ch) (.isEmpty ch))]
17+
(log/trace "is-empty?:" @state signal-name r)
18+
r))
19+
20+
(defn- rx
21+
[state signal-name]
22+
(let [signal-name (u/namify signal-name)
23+
ch (s/get-ch @state signal-name)
24+
m (.poll ch)]
25+
(log/trace "rx:" signal-name m)
26+
m))
27+
28+
(defn poll
29+
"Non-blocking check of the signal. Consumes and returns a message if found, otherwise returns 'nil'"
30+
[state signal-name]
31+
(when-not (is-empty? state signal-name)
32+
(rx state signal-name)))
1533

1634
(defn <!
1735
"Light-weight/parking receive of a single message"
1836
([state] (<! state ::default))
1937
([state signal-name]
20-
(let [signal-name (u/namify signal-name)]
21-
(log/trace "waiting on:" signal-name)
22-
(core/await (partial is-empty? state signal-name))
23-
(let [ch (s/get-ch @state signal-name)
24-
m (.poll ch)]
25-
(log/trace "rx:" m)
26-
m))))
38+
(log/trace "waiting on:" signal-name)
39+
(core/await #(not (is-empty? state signal-name)))
40+
(rx state signal-name)))
41+
42+
(defn >!
43+
"Sends `payload` to `workflow-id` via signal `signal-name`."
44+
[^String workflow-id signal-name payload]
45+
(let [signal-name (u/namify signal-name)
46+
stub (Workflow/newUntypedExternalWorkflowStub workflow-id)]
47+
(.signal stub signal-name (u/->objarray payload))))

src/temporal/workflow.clj

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,17 @@
66
[temporal.internal.utils :as u]
77
[temporal.internal.workflow :as w]))
88

9+
(defn get-info
10+
"
11+
Return info about the current workflow
12+
"
13+
[]
14+
(w/get-info))
15+
916
(defmacro defworkflow
1017
"
1118
Defines a new workflow, similar to defn, expecting a 2-arity parameter list and body. Should evaluate to something
12-
serializable with [prn-str](https://clojuredocs.org/clojure.core/prn-str), which will become available for
13-
[[temporal.client.workflow/get-result]].
19+
serializable, which will become available for [[temporal.client.workflow/get-result]].
1420
1521
Arguments:
1622

test/temporal/test/client_signal.clj

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
;; Copyright © 2022 Manetu, Inc. All rights reserved
2+
3+
(ns temporal.test.client-signal
4+
(:require [clojure.test :refer :all]
5+
[taoensso.timbre :as log]
6+
[temporal.client.core :refer [>!] :as c]
7+
[temporal.signals :refer [<!]]
8+
[temporal.workflow :refer [defworkflow]]
9+
[temporal.test.utils :as t]))
10+
11+
(use-fixtures :once t/wrap-service)
12+
13+
(def signal-name ::signal)
14+
15+
(defn lazy-signals [signals]
16+
(lazy-seq (when-let [m (<! signals signal-name)]
17+
(cons m (lazy-signals signals)))))
18+
19+
(defworkflow test-workflow
20+
[ctx {:keys [signals] {:keys [nr] :as args} :args}]
21+
(log/info "test-workflow:" args)
22+
(doall (take nr (lazy-signals signals))))
23+
24+
(def expected 3)
25+
26+
(deftest the-test
27+
(testing "Verifies that we can send signals from a client"
28+
(let [workflow (t/create-workflow test-workflow)]
29+
(c/start workflow {:nr expected})
30+
(dotimes [n expected]
31+
(>! workflow signal-name n))
32+
(is (-> workflow c/get-result deref count (= expected))))))

0 commit comments

Comments
 (0)