Skip to content

Commit f425597

Browse files
committed
Add timeout option to signal receive interface
Signed-off-by: Greg Haskins <[email protected]>
1 parent 78e7b44 commit f425597

File tree

3 files changed

+50
-8
lines changed

3 files changed

+50
-8
lines changed

src/temporal/signals.clj

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,19 @@
3232
(rx state signal-name)))
3333

3434
(defn <!
35-
"Light-weight/parking receive of a single message"
35+
"Light-weight/parking receive of a single message with an optional timeout"
3636
([state] (<! state ::default))
37-
([state signal-name]
38-
(log/trace "waiting on:" signal-name)
39-
(w/await #(not (is-empty? state signal-name)))
40-
(rx state signal-name)))
37+
([state signal-name] (<! state signal-name nil))
38+
([state signal-name timeout]
39+
(log/trace "waiting on:" signal-name "with timeout" timeout)
40+
(let [pred #(not (is-empty? state signal-name))]
41+
(if (some? timeout)
42+
(do
43+
(when (w/await timeout pred)
44+
(rx state signal-name)))
45+
(do
46+
(w/await pred)
47+
(rx state signal-name))))))
4148

4249
(defn >!
4350
"Sends `payload` to `workflow-id` via signal `signal-name`."

src/temporal/workflow.clj

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@
66
[temporal.internal.utils :as u]
77
[temporal.internal.workflow :as w])
88
(:import [io.temporal.workflow Workflow]
9-
[java.util.function Supplier]))
9+
[java.util.function Supplier]
10+
[java.time Duration]))
1011

1112
(defn get-info
1213
"Return info about the current workflow"
1314
[]
1415
(w/get-info))
1516

1617
(defn- ->supplier
17-
[f]
18+
^Supplier [f]
1819
(reify Supplier
1920
(get [_]
2021
(f))))
@@ -23,7 +24,7 @@
2324
"Efficiently parks the workflow until 'pred' evaluates to true. Re-evaluates on each state transition"
2425
([pred]
2526
(Workflow/await (->supplier pred)))
26-
([duration pred]
27+
([^Duration duration pred]
2728
(Workflow/await duration (->supplier pred))))
2829

2930
(defmacro defworkflow

test/temporal/test/signal_timeout.clj

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
;; Copyright © 2022 Manetu, Inc. All rights reserved
2+
3+
(ns temporal.test.signal-timeout
4+
(:require [clojure.test :refer :all]
5+
[taoensso.timbre :as log]
6+
[temporal.client.core :refer [>!] :as c]
7+
[temporal.signals :refer [<!]]
8+
[temporal.workflow :refer [defworkflow]]
9+
[temporal.test.utils :as t])
10+
(:import [java.time Duration]))
11+
12+
(use-fixtures :once t/wrap-service)
13+
14+
(def signal-name ::signal)
15+
16+
(defworkflow timeout-workflow
17+
[ctx {:keys [signals] :as args}]
18+
(log/info "timeout-workflow:" args)
19+
(or (<! signals signal-name (Duration/ofSeconds 1))
20+
:timed-out))
21+
22+
(defn create []
23+
(let [wf (c/create-workflow (t/get-client) timeout-workflow {:task-queue t/task-queue})]
24+
(c/start wf nil)
25+
wf))
26+
27+
(deftest the-test
28+
(testing "Verifies that signals may timeout properly"
29+
(let [wf (create)]
30+
(is (= @(c/get-result wf) :timed-out))))
31+
(testing "Verifies that signals are received properly even when a timeout is requested"
32+
(let [wf (create)]
33+
(>! wf ::signal "Hi, Bob")
34+
(is (= @(c/get-result wf) "Hi, Bob")))))

0 commit comments

Comments
 (0)