Skip to content

Commit 5308f07

Browse files
committed
Add support for heartbeats
Signed-off-by: Greg Haskins <[email protected]>
1 parent 5e03ba2 commit 5308f07

File tree

3 files changed

+62
-3
lines changed

3 files changed

+62
-3
lines changed

project.clj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,5 @@
3232
:cloverage {:runner :eftest
3333
:runner-opts {:multithread? false
3434
:fail-fast? true}
35-
:fail-threshold 86
35+
:fail-threshold 87
3636
:ns-exclude-regex [#"temporal.client.worker"]})

src/temporal/activity.clj

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
;; Copyright © 2022 Manetu, Inc. All rights reserved
1+
;; Copyright © 2022-2023 Manetu, Inc. All rights reserved
22

33
(ns temporal.activity
44
"Methods for defining and invoking activity tasks"
@@ -8,7 +8,38 @@
88
[temporal.internal.activity :as a]
99
[temporal.internal.utils :as u]
1010
[temporal.internal.promise]) ;; needed for IPromise protocol extention
11-
(:import [io.temporal.workflow Workflow]))
11+
(:import [io.temporal.workflow Workflow]
12+
[io.temporal.activity Activity]))
13+
14+
(defn heartbeat
15+
"
16+
Used to notify the Workflow Execution that the Activity Execution is alive.
17+
18+
Arguments:
19+
20+
- `details`: The details are accessible through [[get-heartbeat-details]] on the next Activity Execution retry.
21+
"
22+
[details]
23+
(let [ctx (Activity/getExecutionContext)]
24+
(log/trace "heartbeat:" details)
25+
(.heartbeat ctx (nippy/freeze details))))
26+
27+
(defn get-heartbeat-details
28+
"
29+
Extracts Heartbeat details from the last failed attempt. This is used in combination with retry options. An Activity
30+
Execution could be scheduled with optional [[temporal.common/retry-options]]. If an Activity Execution failed then the
31+
server would attempt to dispatch another Activity Task to retry the execution according to the retry options. If there
32+
were Heartbeat details reported by [[heartbeat]] in the last Activity Execution that failed, they would be delivered
33+
along with the Activity Task for the next retry attempt and can be extracted by the Activity implementation.
34+
"
35+
[]
36+
37+
(let [ctx (Activity/getExecutionContext)
38+
details (.getHeartbeatDetails ctx u/bytes-type)]
39+
(let [v (when (.isPresent details)
40+
(nippy/thaw (.get details)))]
41+
(log/trace "get-heartbeat-details:" v)
42+
v)))
1243

1344
(defn- complete-invoke
1445
[activity result]

test/temporal/test/heartbeat.clj

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
;; Copyright © 2023 Manetu, Inc. All rights reserved
2+
;;
3+
(ns temporal.test.heartbeat
4+
(:require [clojure.test :refer :all]
5+
[temporal.client.core :as c]
6+
[temporal.workflow :refer [defworkflow]]
7+
[temporal.activity :refer [defactivity] :as a]
8+
[temporal.test.utils :as t]))
9+
10+
(use-fixtures :once t/wrap-service)
11+
12+
(defactivity heartbeat-activity
13+
[_ _]
14+
(if-let [details (a/get-heartbeat-details)]
15+
details
16+
(do
17+
(a/heartbeat :ok)
18+
(throw (ex-info "heartbeat details not found" {})))))
19+
20+
(defworkflow heartbeat-workflow
21+
[_ _]
22+
@(a/invoke heartbeat-activity {}))
23+
24+
(deftest the-test
25+
(testing "Verifies that heartbeats are handled properly"
26+
(let [workflow (t/create-workflow heartbeat-workflow)]
27+
(c/start workflow {})
28+
(is (-> workflow c/get-result deref (= :ok))))))

0 commit comments

Comments
 (0)