Skip to content

Commit 0a5fbfe

Browse files
committed
throw when channel is closed due to server error
Some server errors lead to the channel being closed (server initiated). Certain API calls with invalid parameters may also trigger that (e.g. trying to redeclare an existing exchange as a different type). No response is sent for the failed API request in that case. Synchronous API calls that do not check for this condition would time out under this situation. With this change, all synchronous API calls check for channel errors and throw an exception. fixes #23
1 parent 5a3805d commit 0a5fbfe

File tree

2 files changed

+21
-8
lines changed

2 files changed

+21
-8
lines changed

src/protocol.jl

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -631,13 +631,6 @@ function _wait_resp(sendmethod, chan::MessageChannel, default_result::T,
631631
result = default_result
632632
if !nowait
633633
reply = Channel{T}(1)
634-
# timer to time the request out, in case of an error
635-
t = Timer(timeout) do t
636-
try
637-
put!(reply, timeout_result)
638-
catch
639-
end
640-
end
641634
# register a callback
642635
handle(chan, resp_class, resp_meth, resp_handler, reply)
643636
end
@@ -646,7 +639,18 @@ function _wait_resp(sendmethod, chan::MessageChannel, default_result::T,
646639

647640
if !nowait
648641
# wait for response
649-
result = take!(reply)
642+
result = timeout_result
643+
if :ok === timedwait(()->(isready(reply) || !isopen(chan)), Float64(timeout); pollint=0.01)
644+
if isready(reply)
645+
result = take!(reply)
646+
else
647+
error_message = "Connection closed"
648+
if nothing !== chan.closereason
649+
error_message = string(error_message, " - ", string(chan.closereason.code), " (", convert(String, chan.closereason.msg), ")")
650+
end
651+
throw(AMQPClientException(error_message))
652+
end
653+
end
650654
close(reply)
651655
end
652656
result

test/test_coverage.jl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,15 @@ function runtests(;virtualhost="/", host="localhost", port=AMQPClient.AMQP_DEFAU
3535
testlog("creating exchanges...")
3636
@test exchange_declare(chan1, EXCG_DIRECT, EXCHANGE_TYPE_DIRECT; arguments=Dict{String,Any}("Hello"=>"World", "Foo"=>"bar"))
3737
@test exchange_declare(chan1, EXCG_FANOUT, EXCHANGE_TYPE_FANOUT)
38+
# redeclaring the exchange with same attributes should be fine
39+
@test exchange_declare(chan1, EXCG_FANOUT, EXCHANGE_TYPE_FANOUT)
40+
# redeclaring an existing exchange with different attributes should fail
41+
@test_throws AMQPClient.AMQPClientException exchange_declare(chan1, EXCG_FANOUT, EXCHANGE_TYPE_DIRECT)
42+
43+
# must reconnect as channel gets closed after a channel exception
44+
close(chan1) # closing an already closed channel should be fine
45+
chan1 = channel(conn, AMQPClient.UNUSED_CHANNEL, true)
46+
@test chan1.id == 1
3847

3948
# create and bind queues
4049
testlog("creating queues...")

0 commit comments

Comments
 (0)