Skip to content

Commit bb12854

Browse files
authored
Attempt parallel connections to all addrs when connecting (#1068)
* Attempt parallel connections to all addrs when connecting Part of the "happy eyeballs" recommendations. Note I attempted to preserve the exception throwing behavior we had previously. i.e. if I'm the last task running and errors, I'll close the `Channel{TCPSocket}` with my exception which will then propagate up. We could possibly accumulate all the exceptions into a CompositeException, but meh, I'm not sure if that adds much value. In addition to the new parallel connecting, we're also adjusting how the `connect_timeout` is implemented. Instead of only applying to the tcp connection, we now wrap the entire `newconnection` call, which means any ssl handshake timing will also count towards the timeout. We saw a case at RelationalAI where we had a reasonable connect_timeout (10s), yet then saw long requests (>50s) where all the time was reported in the connection layer. It would seem to suggest that the ssl layer is somehow getting stuck or slow, so the proposal here is that if the ssl operations also count, then we'll more aggressively cancel/restart the connection process in the case of slow ssl handshaking. * Ensure we don't have hanging tasks once our channel has been filled w/ connection
1 parent ba68375 commit bb12854

File tree

2 files changed

+29
-25
lines changed

2 files changed

+29
-25
lines changed

src/Connections.jl

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -501,40 +501,37 @@ function getconnection(::Type{TCPSocket},
501501
# alive in the face of heavy workloads where Julia's task scheduler might take a while to
502502
# keep up with midflight requests
503503
keepalive::Bool=true,
504-
connect_timeout::Int=10,
505504
readtimeout::Int=0,
506505
kw...)::TCPSocket
507506

508507
p::UInt = isempty(port) ? UInt(80) : parse(UInt, port)
509508
@debugv 2 "TCP connect: $host:$p..."
510509
addrs = Sockets.getalladdrinfo(host)
511-
connect_timeout = connect_timeout == 0 && readtimeout > 0 ? readtimeout : connect_timeout
512-
lasterr = ErrorException("unknown connection error")
510+
ch = Channel{TCPSocket}(1)
511+
n = Ref(length(addrs))
513512
for addr in addrs
514-
try
515-
if connect_timeout > 0
516-
tcp = Sockets.TCPSocket()
517-
Sockets.connect!(tcp, addr, p)
518-
try
519-
try_with_timeout(connect_timeout) do _
520-
Sockets.wait_connected(tcp)
521-
keepalive && keepalive!(tcp)
522-
end
523-
catch
524-
close(tcp)
525-
rethrow()
526-
end
527-
else
528-
tcp = Sockets.connect(addr, p)
513+
Threads.@spawn begin
514+
try
515+
isready(ch) && return
516+
tcp = Sockets.connect($addr, p)
517+
isready(ch) && return
529518
keepalive && keepalive!(tcp)
519+
Base.@lock ch begin
520+
isready(ch) && return
521+
put!(ch, tcp)
522+
end
523+
catch e
524+
Base.@lock ch begin
525+
# if we're the last task to fail, and assuming
526+
# all other tasks also failed, then we close
527+
# the channel w/ our exception so the fetch call throws and
528+
# our error propagates
529+
(n[] -= 1) == 0 && close(ch, e)
530+
end
530531
end
531-
return tcp
532-
catch e
533-
lasterr = e isa ConcurrentUtilities.TimeoutException ? ConnectTimeout(host, port) : e
534532
end
535533
end
536-
# If no connetion could be set up, to any address, throw last error
537-
throw(lasterr)
534+
return fetch(ch)
538535
end
539536

540537
const nosslconfig = SSLConfig()

src/clientlayers/ConnectionRequest.jl

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ Close the connection if the request throws an exception.
5555
Otherwise leave it open so that it can be reused.
5656
"""
5757
function connectionlayer(handler)
58-
return function connections(req; proxy=getproxy(req.url.scheme, req.url.host), socket_type::Type=TCPSocket, socket_type_tls::Type=SOCKET_TYPE_TLS[], readtimeout::Int=0, logerrors::Bool=false, logtag=nothing, kw...)
58+
return function connections(req; proxy=getproxy(req.url.scheme, req.url.host), socket_type::Type=TCPSocket, socket_type_tls::Type=SOCKET_TYPE_TLS[], readtimeout::Int=0, connect_timeout::Int=10, logerrors::Bool=false, logtag=nothing, kw...)
5959
local io, stream
6060
if proxy !== nothing
6161
target_url = req.url
@@ -73,10 +73,17 @@ function connectionlayer(handler)
7373
url = target_url = req.url
7474
end
7575

76+
connect_timeout = connect_timeout == 0 && readtimeout > 0 ? readtimeout : connect_timeout
7677
IOType = sockettype(url, socket_type, socket_type_tls)
7778
start_time = time()
7879
try
79-
io = newconnection(IOType, url.host, url.port; readtimeout=readtimeout, kw...)
80+
io = if connect_timeout > 0
81+
try_with_timeout(connect_timeout) do _
82+
newconnection(IOType, url.host, url.port; readtimeout=readtimeout, kw...)
83+
end
84+
else
85+
newconnection(IOType, url.host, url.port; readtimeout=readtimeout, kw...)
86+
end
8087
catch e
8188
if logerrors
8289
err = current_exceptions_to_string()

0 commit comments

Comments
 (0)