Skip to content

Commit 529c748

Browse files
ghaskinsecb
authored andcommitted
Add support for LocalActivity
Signed-off-by: Greg Haskins <[email protected]>
1 parent 4173f07 commit 529c748

File tree

4 files changed

+73
-8
lines changed

4 files changed

+73
-8
lines changed

project.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,5 @@
3030
:cloverage {:runner :eftest
3131
:runner-opts {:multithread? false
3232
:fail-fast? true}
33-
:fail-threshold 87
33+
:fail-threshold 86
3434
:ns-exclude-regex [#"temporal.client.worker"]})

src/temporal/activity.clj

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,13 @@
1313

1414
(def ^:no-doc default-invoke-options {:start-to-close-timeout (Duration/ofSeconds 3)})
1515

16+
(defn- complete-invoke
17+
[activity result]
18+
(log/trace activity "completed with" (count result) "bytes")
19+
(let [r (nippy/thaw result)]
20+
(log/trace activity "results:" r)
21+
r))
22+
1623
(defn invoke
1724
"
1825
Invokes 'activity' with 'params' from within a workflow context. Returns a promise that when derefed will resolve to
@@ -32,11 +39,31 @@ the evaluation of the defactivity once the activity concludes.
3239
stub (Workflow/newUntypedActivityStub (a/invoke-options-> options))]
3340
(log/trace "invoke:" activity "with" params options)
3441
(-> (.executeAsync stub act-name u/bytes-type (u/->objarray params))
35-
(p/then (fn [r]
36-
(log/trace activity "completed with" (count r) "bytes")
37-
(let [r (nippy/thaw r)]
38-
(log/trace activity "results:" r)
39-
r)))
42+
(p/then (partial complete-invoke activity))
43+
(p/catch (fn [e]
44+
(log/error e)
45+
(throw e)))))))
46+
47+
(defn local-invoke
48+
"
49+
Invokes 'activity' with 'params' from within a workflow context as a [Local Activity](https://docs.temporal.io/concepts/what-is-a-local-activity/). Returns a promise that when
50+
derefed will resolve to the evaluation of the defactivity once the activity concludes.
51+
52+
```clojure
53+
(defactivity my-activity
54+
[ctx {:keys [foo] :as args}]
55+
...)
56+
57+
(local-invoke my-activity {:foo \"bar\"} {:start-to-close-timeout (Duration/ofSeconds 3))
58+
```
59+
"
60+
([activity params] (invoke activity params default-invoke-options))
61+
([activity params options]
62+
(let [act-name (a/get-annotation activity)
63+
stub (Workflow/newUntypedLocalActivityStub (a/local-invoke-options-> options))]
64+
(log/trace "local-invoke:" activity "with" params options)
65+
(-> (.executeAsync stub act-name u/bytes-type (u/->objarray params))
66+
(p/then (partial complete-invoke activity))
4067
(p/catch (fn [e]
4168
(log/error e)
4269
(throw e)))))))

src/temporal/internal/activity.clj

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,28 @@
99
[temporal.internal.utils :as u]
1010
[temporal.internal.common :as common])
1111
(:import [io.temporal.activity Activity ActivityInfo DynamicActivity]
12-
[io.temporal.activity ActivityOptions ActivityOptions$Builder]
12+
[io.temporal.activity ActivityOptions ActivityOptions$Builder LocalActivityOptions LocalActivityOptions$Builder]
1313
[clojure.core.async.impl.channels ManyToManyChannel]))
1414

1515
(def invoke-option-spec
1616
{:start-to-close-timeout #(.setStartToCloseTimeout ^ActivityOptions$Builder %1 %2)
17-
:retry-options #(.setRetryOptions %1 (common/retry-options-> %2))})
17+
:retry-options #(.setRetryOptions ^ActivityOptions$Builder %1 (common/retry-options-> %2))})
1818

1919
(defn invoke-options->
2020
^ActivityOptions [params]
2121
(u/build (ActivityOptions/newBuilder) invoke-option-spec params))
2222

23+
(def local-invoke-option-spec
24+
{:start-to-close-timeout #(.setStartToCloseTimeout ^LocalActivityOptions$Builder %1 %2)
25+
:schedule-to-close-timeout #(.setScheduleToCloseTimeout ^LocalActivityOptions$Builder %1 %2)
26+
:retry-options #(.setRetryOptions ^LocalActivityOptions$Builder %1 (common/retry-options-> %2))
27+
:do-not-include-args #(.setDoNotIncludeArgumentsIntoMarker ^LocalActivityOptions$Builder %1 %2)
28+
:local-retry-threshold #(.setLocalRetryThreshold ^LocalActivityOptions$Builder %1 %2)})
29+
30+
(defn local-invoke-options->
31+
^LocalActivityOptions [params]
32+
(u/build (LocalActivityOptions/newBuilder) local-invoke-option-spec params))
33+
2334
(extend-protocol p/Datafiable
2435
ActivityInfo
2536
(datafy [d]

test/temporal/test/local_activity.clj

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
;; Copyright © 2022 Manetu, Inc. All rights reserved
2+
3+
(ns temporal.test.local-activity
4+
(:require [clojure.test :refer :all]
5+
[taoensso.timbre :as log]
6+
[temporal.client.core :as c]
7+
[temporal.workflow :refer [defworkflow]]
8+
[temporal.activity :refer [defactivity] :as a]
9+
[temporal.test.utils :as t]))
10+
11+
(use-fixtures :once t/wrap-service)
12+
13+
(defactivity greet-activity
14+
[ctx {:keys [name] :as args}]
15+
(log/info "greet-activity:" args)
16+
(str "Hi, " name))
17+
18+
(defworkflow greeter-workflow
19+
[ctx {:keys [args]}]
20+
(log/info "greeter-workflow:" args)
21+
@(a/local-invoke greet-activity args (assoc a/default-invoke-options :do-not-include-args true)))
22+
23+
(deftest the-test
24+
(testing "Verifies that we can round-trip through start"
25+
(let [workflow (t/create-workflow greeter-workflow)]
26+
(c/start workflow {:name "Bob"})
27+
(is (= @(c/get-result workflow) "Hi, Bob")))))

0 commit comments

Comments
 (0)