Skip to content

Commit c13aeb2

Browse files
committed
Add slingshot based exception handling
Signed-off-by: Greg Haskins <[email protected]>
1 parent 0716661 commit c13aeb2

File tree

9 files changed

+133
-23
lines changed

9 files changed

+133
-23
lines changed

doc/workflows.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,3 +219,21 @@ A query consists of a `query-type` (keyword) and possibly some `args` (any seria
219219
```clojure
220220
(query workflow :my-query {:foo "bar"})
221221
```
222+
223+
## Exceptions
224+
225+
This SDK integrates with the [slingshot](https://github.com/scgilardi/slingshot) library. Stones cast with slingshot's throw+ are serialized and re-thrown across activity and workflow boundaries in a transparent manner that is compatible with slingshot idiomatic try+ based catch blocks.
226+
227+
### Managing Retries
228+
By default, stones cast that are not caught locally by an activity or workflow trigger ApplicationFailure semantics and are thus subject to the overall Retry Policies in place. However, the developer may force a given stone to be non-retriable by setting the flag '::non-retriable?' within the object.
229+
230+
Example:
231+
232+
```clojure
233+
(require `[temporal.exceptions :as e])
234+
(require `[slingshot.slingshot :refer [throw+]])
235+
236+
(defactivity my-activity
237+
[ctx args]
238+
(throw+ {:type ::my-fatal-error :msg "this error is non-retriable" ::e/non-retriable? true}))
239+
```

project.clj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@
1919
[com.taoensso/timbre "6.3.1"]
2020
[com.taoensso/nippy "3.3.0"]
2121
[funcool/promesa "9.2.542"]
22-
[medley "1.4.0"]]
22+
[medley "1.4.0"]
23+
[slingshot "0.12.2"]]
2324
:repl-options {:init-ns user}
2425
:java-source-paths ["src"]
2526
:javac-options ["-target" "11" "-source" "11"]

src/temporal/activity.clj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
(:require [taoensso.timbre :as log]
66
[taoensso.nippy :as nippy]
77
[promesa.core :as p]
8+
[temporal.internal.exceptions :as e]
89
[temporal.internal.activity :as a]
910
[temporal.internal.utils :as u]
1011
[temporal.internal.promise]) ;; needed for IPromise protocol extention
@@ -98,6 +99,7 @@ Arguments:
9899
(log/trace "invoke:" activity "with" params options)
99100
(-> (.executeAsync stub act-name u/bytes-type (u/->objarray params))
100101
(p/then (partial complete-invoke activity))
102+
(p/catch e/slingshot? e/recast-stone)
101103
(p/catch (fn [e]
102104
(log/error e)
103105
(throw e)))))))
@@ -138,6 +140,7 @@ Arguments:
138140
(log/trace "local-invoke:" activity "with" params options)
139141
(-> (.executeAsync stub act-name u/bytes-type (u/->objarray params))
140142
(p/then (partial complete-invoke activity))
143+
(p/catch e/slingshot? e/recast-stone)
141144
(p/catch (fn [e]
142145
(log/error e)
143146
(throw e)))))))

src/temporal/client/core.clj

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
[taoensso.nippy :as nippy]
77
[promesa.core :as p]
88
[temporal.internal.workflow :as w]
9-
[temporal.internal.utils :as u])
9+
[temporal.internal.utils :as u]
10+
[temporal.internal.exceptions :as e])
1011
(:import [java.time Duration]
1112
[io.temporal.client WorkflowClient WorkflowClientOptions WorkflowClientOptions$Builder WorkflowStub]
1213
[io.temporal.serviceclient WorkflowServiceStubs WorkflowServiceStubsOptions WorkflowServiceStubsOptions$Builder]
@@ -156,7 +157,8 @@ defworkflow once the workflow concludes.
156157
"
157158
[{:keys [^WorkflowStub stub] :as workflow}]
158159
(-> (.getResultAsync stub u/bytes-type)
159-
(p/then nippy/thaw)))
160+
(p/then nippy/thaw)
161+
(p/catch e/slingshot? e/recast-stone)))
160162

161163
(defn query
162164
"

src/temporal/exceptions.clj

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
;; Copyright © 2024 Manetu, Inc. All rights reserved
2+
3+
(ns temporal.exceptions)
4+
5+
(def flags {::non-retriable? "'true' indicates this exception is not going to be retried even if it is not included into retry policy doNotRetry list."})

src/temporal/internal/activity.clj

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
(:require [clojure.core.protocols :as p]
55
[clojure.datafy :as d]
66
[clojure.core.async :refer [go <!] :as async]
7+
[slingshot.slingshot :refer [try+ throw+]]
78
[taoensso.timbre :as log]
89
[taoensso.nippy :as nippy]
910
[temporal.internal.utils :as u]
11+
[temporal.internal.exceptions :as e]
1012
[temporal.common :as common])
1113
(:import [java.time Duration]
1214
[io.temporal.activity Activity ActivityInfo DynamicActivity ActivityCancellationType]
@@ -103,11 +105,14 @@
103105
f (u/find-dispatch-fn dispatch activity-type)
104106
a (u/->args args)]
105107
(log/trace activity-id "calling" f "with args:" a)
106-
(try
107-
(result-> activity-id (f ctx a))
108-
(catch Exception e
109-
(log/error e)
110-
(throw e)))))
108+
(try+
109+
(result-> activity-id (f ctx a))
110+
(catch Exception e
111+
(log/error e)
112+
(throw e))
113+
(catch Object o
114+
(log/error &throw-context)
115+
(e/freeze &throw-context)))))
111116

112117
(defn dispatcher [ctx dispatch]
113118
(reify DynamicActivity

src/temporal/internal/exceptions.clj

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
;; Copyright © 2024 Manetu, Inc. All rights reserved
2+
3+
(ns temporal.internal.exceptions
4+
(:require [slingshot.slingshot :refer [throw+]]
5+
[temporal.exceptions :as e]
6+
[temporal.internal.utils :as u])
7+
(:import [io.temporal.failure ApplicationFailure]))
8+
9+
(def exception-type (name ::slingshot))
10+
11+
(defn slingshot? [ex]
12+
(and (instance? ApplicationFailure (ex-cause ex))
13+
(= exception-type (.getType (cast ApplicationFailure (ex-cause ex))))))
14+
15+
(defn freeze
16+
[{{:keys [::e/non-retriable?] :or {non-retriable? false}} :object :as context}]
17+
(let [o (u/->objarray context)
18+
t (if non-retriable?
19+
(ApplicationFailure/newNonRetryableFailure nil exception-type o)
20+
(ApplicationFailure/newFailure nil exception-type o))]
21+
(throw t)))
22+
23+
(defn recast-stone [ex]
24+
(let [stone (->> ex ex-cause (cast ApplicationFailure) (.getDetails) u/->args :object)]
25+
(throw+ stone))) ;; FIXME: Does not preserve the original stack-trace

src/temporal/internal/workflow.clj

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
(ns ^:no-doc temporal.internal.workflow
44
(:require [clojure.core.protocols :as p]
55
[clojure.datafy :as d]
6+
[slingshot.slingshot :refer [try+]]
67
[taoensso.timbre :as log]
78
[taoensso.nippy :as nippy]
89
[temporal.common :as common]
910
[temporal.internal.utils :as u]
10-
[temporal.internal.signals :as s])
11+
[temporal.internal.signals :as s]
12+
[temporal.internal.exceptions :as e])
1113
(:import [io.temporal.workflow Workflow WorkflowInfo]
1214
[io.temporal.client WorkflowOptions WorkflowOptions$Builder]))
1315

@@ -51,17 +53,20 @@
5153

5254
(defn execute
5355
[ctx dispatch args]
54-
(try
55-
(let [{:keys [workflow-type workflow-id]} (get-info)
56-
d (u/find-dispatch dispatch workflow-type)
57-
f (:fn d)
58-
a (u/->args args)
59-
_ (log/trace workflow-id "calling" f "with args:" a)
60-
r (if (-> d :type (= :legacy))
61-
(f ctx {:args a :signals (s/create-signal-chan)})
62-
(f a))]
63-
(log/trace workflow-id "result:" r)
64-
(nippy/freeze r))
65-
(catch Exception e
66-
(log/error e)
67-
(throw e))))
56+
(try+
57+
(let [{:keys [workflow-type workflow-id]} (get-info)
58+
d (u/find-dispatch dispatch workflow-type)
59+
f (:fn d)
60+
a (u/->args args)
61+
_ (log/trace workflow-id "calling" f "with args:" a)
62+
r (if (-> d :type (= :legacy))
63+
(f ctx {:args a :signals (s/create-signal-chan)})
64+
(f a))]
65+
(log/trace workflow-id "result:" r)
66+
(nippy/freeze r))
67+
(catch Exception e
68+
(log/error e)
69+
(throw e))
70+
(catch Object o
71+
(log/error &throw-context)
72+
(e/freeze &throw-context))))

test/temporal/test/slingshot.clj

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
;; Copyright © Manetu, Inc. All rights reserved
2+
3+
(ns temporal.test.slingshot
4+
(:require [clojure.test :refer :all]
5+
[taoensso.timbre :as log]
6+
[slingshot.slingshot :refer [try+ throw+]]
7+
[temporal.client.core :as c]
8+
[temporal.workflow :refer [defworkflow]]
9+
[temporal.activity :refer [defactivity] :as a]
10+
[temporal.exceptions :as e]
11+
[temporal.test.utils :as t]))
12+
13+
(use-fixtures :once t/wrap-service)
14+
15+
(defactivity slingshot-nonretriable-activity
16+
[ctx {:keys [name] :as args}]
17+
(log/info "slingshot-nonretriable-activity:" args)
18+
(throw+ {:type ::test1 ::e/non-retriable? true}))
19+
20+
(defactivity slingshot-retriable-activity
21+
[ctx {:keys [name] :as args}]
22+
(log/info "slingshot-retriable-activity:" args)
23+
(throw+ {:type ::test2}))
24+
25+
(defworkflow slingshot-workflow
26+
[args]
27+
(log/info "slingshot-workflow:" args)
28+
(try+
29+
@(a/invoke slingshot-nonretriable-activity args)
30+
(catch [:type ::test1] _
31+
(log/info "caught stone 1")
32+
(try+
33+
@(a/invoke slingshot-retriable-activity args)
34+
(catch [:type ::test2] _
35+
(log/info "caught stone 2")
36+
(throw+ {:type ::test3}))))))
37+
38+
(deftest the-test
39+
(testing "Verifies that we can catch slingshot stones across activity/workflow boundaries"
40+
(let [workflow (t/create-workflow slingshot-workflow)]
41+
(c/start workflow {})
42+
(try+
43+
@(c/get-result workflow)
44+
(throw (ex-info "should not get here" {}))
45+
(catch [:type ::test3] _
46+
(log/info "caught stone 3"))))))

0 commit comments

Comments
 (0)