Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .cljfmt.edn
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
:remove-consecutive-blank-lines? true
:insert-missing-whitespace? true
:align-associative? false
:indents {#re "^(?!catch-kondo-errors).*" [[:block 0]]
catch-kondo-errors [[:inner 0]]}
:extra-indents {#re "^(?!catch-kondo-errors).*" [[:block 0]]
catch-kondo-errors [[:inner 0]]}
:test-code
(comment
(:require
Expand Down
26 changes: 20 additions & 6 deletions src/lsp4clj/server.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
[lsp4clj.protocols.endpoint :as protocols.endpoint]
[lsp4clj.trace :as trace]
[promesa.core :as p]
[promesa.exec :as p.exec]
[promesa.protocols :as p.protocols])
(:import
(java.util.concurrent CancellationException)))
Expand Down Expand Up @@ -95,7 +96,7 @@
;; client. This cannot be `(-> (p/deferred) (p/catch))` because that returns
;; a promise which, when cancelled, does nothing because there's no
;; exception handler chained onto it. Instead, we must cancel the
;; `(p/deffered)` promise itself.
;; `(p/deferred)` promise itself.
(p/catch p CancellationException
(fn [_]
(protocols.endpoint/send-notification server "$/cancelRequest" {:id id})))
Expand Down Expand Up @@ -196,6 +197,7 @@
trace-ch
tracer*
^java.time.Clock clock
response-executor
on-close
request-id*
pending-sent-requests*
Expand Down Expand Up @@ -351,9 +353,19 @@
(if-let [{:keys [p started] :as req} (get pending-requests id)]
(do
(trace this trace/received-response req resp started now)
(if error
(p/reject! p (ex-info "Received error response" resp))
(p/resolve! p result)))
;; Note that we are called from the server's pipeline, a core.async
;; go-loop, and therefore must not block. Callbacks of the pending
;; request's promise (`p`) will be executed in the completing
;; thread, whatever that thread is. Since the callbacks are not
;; under our control, they are under our users' control, they could
;; block. Therefore, we do not want the completing thread to be our
;; thread. This is very easy for users to miss, therefore we
;; complete the promise using an explicit executor.
(p.exec/submit! response-executor
(fn []
(if error
(p/reject! p (ex-info "Received error response" resp))
(p/resolve! p result)))))
(trace this trace/received-unmatched-response resp now)))
(catch Throwable e
(log-error-receiving this e resp))))
Expand Down Expand Up @@ -410,9 +422,10 @@
(update server :tracer* reset! (trace/tracer-for-level trace-level)))

(defn chan-server
[{:keys [output-ch input-ch log-ch trace? trace-level trace-ch clock on-close]
[{:keys [output-ch input-ch log-ch trace? trace-level trace-ch clock on-close response-executor]
:or {clock (java.time.Clock/systemDefaultZone)
on-close (constantly nil)}}]
on-close (constantly nil)
response-executor :default}}]
(let [;; before defaulting trace-ch, so that default is "off"
tracer (trace/tracer-for-level (or trace-level
(when (or trace? trace-ch) "verbose")
Expand All @@ -427,6 +440,7 @@
:tracer* (atom tracer)
:clock clock
:on-close on-close
:response-executor response-executor
:request-id* (atom 0)
:pending-sent-requests* (atom {})
:pending-received-requests* (atom {})
Expand Down
63 changes: 63 additions & 0 deletions test/lsp4clj/server_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,69 @@
(h/assert-take output-ch)))
(server/shutdown server))))

(defn- core-async-dispatch-thread? [^Thread thread]
(re-matches #"async-dispatch-\d+" (.getName thread)))

(deftest can-determine-core-async-dispatch-thread
(testing "current thread"
(is (not (core-async-dispatch-thread? (Thread/currentThread)))))
(testing "thread running go blocks"
(let [thread (async/<!! (async/go (Thread/currentThread)))]
(is (core-async-dispatch-thread? thread))))
(testing "thread running core.async thread macro"
(let [thread (async/<!! (async/thread (Thread/currentThread)))]
(is (not (core-async-dispatch-thread? thread))))))

(deftest request-should-complete-on-a-suitable-executor
(testing "successful completion"
(let [input-ch (async/chan 3)
output-ch (async/chan 3)
server (server/chan-server {:output-ch output-ch
:input-ch input-ch})
_ (server/start server nil)
thread-p (-> (server/send-request server "req" {:body "foo"})
(p/then (fn [_] (Thread/currentThread))))
client-rcvd-msg (h/assert-take output-ch)
_ (async/put! input-ch (lsp.responses/response (:id client-rcvd-msg) {:result "good"}))
thread (deref thread-p 100 nil)]
(is (not (core-async-dispatch-thread? thread)))
(is (instance? java.util.concurrent.ForkJoinWorkerThread thread)
"completes on default ForkJoinPool executor")
(server/shutdown server)))
(testing "exceptional completion"
(let [input-ch (async/chan 3)
output-ch (async/chan 3)
server (server/chan-server {:output-ch output-ch
:input-ch input-ch})
_ (server/start server nil)
thread-p (-> (server/send-request server "req" {:body "foo"})
(p/catch (fn [_] (Thread/currentThread))))
client-rcvd-msg (h/assert-take output-ch)
_ (async/put! input-ch
(-> (lsp.responses/response (:id client-rcvd-msg))
(lsp.responses/error {:code 1234
:message "Something bad"
:data {:body "foo"}})))
thread (deref thread-p 100 nil)]
(is (not (core-async-dispatch-thread? thread)))
(is (instance? java.util.concurrent.ForkJoinWorkerThread thread)
"completes on default ForkJoinPool executor")
(server/shutdown server)))
(testing "completion with :current-thread executor for legacy behavior"
(let [input-ch (async/chan 3)
output-ch (async/chan 3)
server (server/chan-server {:output-ch output-ch
:input-ch input-ch
:response-executor :current-thread})
_ (server/start server nil)
thread-p (-> (server/send-request server "req" {:body "foo"})
(p/then (fn [_] (Thread/currentThread))))
client-rcvd-msg (h/assert-take output-ch)
_ (async/put! input-ch (lsp.responses/response (:id client-rcvd-msg) {:result "good"}))
thread (deref thread-p 100 nil)]
(is (core-async-dispatch-thread? thread) "completes on core.async dispatch thread")
(server/shutdown server))))

(def fixed-clock
(-> (java.time.LocalDateTime/of 2022 03 05 13 35 23 0)
(.toInstant java.time.ZoneOffset/UTC)
Expand Down