Skip to content

Commit 7a4c47b

Browse files
committed
feat: implement pool_only_after_response option to restrict keep alive
1 parent 0ce55d6 commit 7a4c47b

File tree

2 files changed

+63
-26
lines changed

2 files changed

+63
-26
lines changed

lib/resty/http.lua

Lines changed: 51 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,9 @@ function _M.new(_)
134134
if not sock then
135135
return nil, err
136136
end
137-
return setmetatable({ sock = sock, keepalive = true }, mt)
137+
return setmetatable({
138+
sock = sock, keepalive_supported = true, keepalive_ready = false, pool_only_after_response = false
139+
}, mt)
138140
end
139141

140142

@@ -195,7 +197,7 @@ function _M.tcp_only_connect(self, ...)
195197
self.port = nil
196198
end
197199

198-
self.keepalive = true
200+
self.keepalive_supported = true
199201
self.ssl = false
200202

201203
return sock:connect(...)
@@ -208,7 +210,11 @@ function _M.set_keepalive(self, ...)
208210
return nil, "not initialized"
209211
end
210212

211-
if self.keepalive == true then
213+
if self.keepalive_supported == true then
214+
if self.pool_only_after_response and not self.keepalive_ready then
215+
return nil, "response not fully read"
216+
end
217+
212218
return sock:setkeepalive(...)
213219
else
214220
-- The server said we must close the connection, so we cannot setkeepalive.
@@ -429,7 +435,18 @@ end
429435
_M.transfer_encoding_is_chunked = transfer_encoding_is_chunked
430436

431437

432-
local function _chunked_body_reader(sock, default_chunk_size)
438+
local function _reader_keepalive_ready_mark(http_client)
439+
return co_wrap(function()
440+
http_client.keepalive_ready = true
441+
end)
442+
end
443+
444+
local function _reader_keepalive_ready_no_op()
445+
return co_wrap(function() end)
446+
end
447+
448+
449+
local function _chunked_body_reader(keepalive_ready_callback, sock, default_chunk_size)
433450
return co_wrap(function(max_chunk_size)
434451
local remaining = 0
435452
local length
@@ -487,11 +504,13 @@ local function _chunked_body_reader(sock, default_chunk_size)
487504
end
488505

489506
until length == 0
507+
508+
keepalive_ready_callback()
490509
end)
491510
end
492511

493512

494-
local function _body_reader(sock, content_length, default_chunk_size)
513+
local function _body_reader(keepalive_ready_callback, sock, content_length, default_chunk_size)
495514
return co_wrap(function(max_chunk_size)
496515
max_chunk_size = max_chunk_size or default_chunk_size
497516

@@ -521,6 +540,7 @@ local function _body_reader(sock, content_length, default_chunk_size)
521540
elseif not max_chunk_size then
522541
-- We have a length and potentially keep-alive, but want everything.
523542
co_yield(sock:receive(content_length))
543+
keepalive_ready_callback()
524544

525545
else
526546
-- We have a length and potentially a keep-alive, and wish to stream
@@ -549,6 +569,7 @@ local function _body_reader(sock, content_length, default_chunk_size)
549569
end
550570

551571
until length == 0
572+
keepalive_ready_callback()
552573
end
553574
end)
554575
end
@@ -587,9 +608,10 @@ local function _read_body(res)
587608
end
588609

589610

590-
local function _trailer_reader(sock)
611+
local function _trailer_reader(keepalive_ready_callback, sock)
591612
return co_wrap(function()
592613
co_yield(_receive_headers(sock))
614+
keepalive_ready_callback()
593615
end)
594616
end
595617

@@ -781,7 +803,8 @@ function _M.read_response(self, params)
781803
end
782804

783805

784-
local res_headers, err = _receive_headers(sock)
806+
local res_headers
807+
res_headers, err = _receive_headers(sock)
785808
if not res_headers then
786809
return nil, err
787810
end
@@ -791,38 +814,48 @@ function _M.read_response(self, params)
791814
if ok then
792815
if (version == 1.1 and str_find(connection, "close", 1, true)) or
793816
(version == 1.0 and not str_find(connection, "keep-alive", 1, true)) then
794-
self.keepalive = false
817+
self.keepalive_supported = false
795818
end
796819
else
797820
-- no connection header
798821
if version == 1.0 then
799-
self.keepalive = false
822+
self.keepalive_supported = false
800823
end
801824
end
802825

803826
local body_reader = _no_body_reader
804-
local trailer_reader, err
827+
local trailer_reader
805828
local has_body = false
829+
local has_trailer = false
830+
-- If there are no trailers - fully reading response body means socket is ready to be pooled
831+
local body_reader_keepalive_ready_callback = _reader_keepalive_ready_mark(self)
832+
833+
if res_headers["Trailer"] then
834+
has_trailer = true
835+
-- If there are trailers - fully reading response body doesn't mean socket is ready to be pooled
836+
body_reader_keepalive_ready_callback = _reader_keepalive_ready_no_op()
837+
end
806838

807839
-- Receive the body_reader
808840
if _should_receive_body(params.method, status) then
809841
has_body = true
810842

811843
if version == 1.1 and transfer_encoding_is_chunked(res_headers) then
812-
body_reader, err = _chunked_body_reader(sock)
844+
body_reader, err = _chunked_body_reader(body_reader_keepalive_ready_callback, sock)
813845
else
814-
local ok, length = pcall(tonumber, res_headers["Content-Length"])
846+
local length
847+
ok, length = pcall(tonumber, res_headers["Content-Length"])
815848
if not ok then
816849
-- No content-length header, read until connection is closed by server
817850
length = nil
818851
end
819852

820-
body_reader, err = _body_reader(sock, length)
853+
body_reader, err = _body_reader(body_reader_keepalive_ready_callback, sock, length)
821854
end
822855
end
823856

824-
if res_headers["Trailer"] then
825-
trailer_reader, err = _trailer_reader(sock)
857+
if has_trailer then
858+
trailer_reader, err = _trailer_reader(_reader_keepalive_ready_mark(self), sock)
826859
end
827860

828861
if err then
@@ -981,13 +1014,14 @@ function _M.get_client_body_reader(_, chunksize, sock)
9811014
end
9821015
end
9831016

1017+
local reader_keep_alive_ready_callback = _reader_keepalive_ready_no_op()
9841018
local headers = ngx_req_get_headers()
9851019
local length = headers.content_length
9861020
if length then
987-
return _body_reader(sock, tonumber(length), chunksize)
1021+
return _body_reader(reader_keep_alive_ready_callback, sock, tonumber(length), chunksize)
9881022
elseif transfer_encoding_is_chunked(headers) then
9891023
-- Not yet supported by ngx_lua but should just work...
990-
return _chunked_body_reader(sock, chunksize)
1024+
return _chunked_body_reader(reader_keep_alive_ready_callback, sock, chunksize)
9911025
else
9921026
return nil
9931027
end

lib/resty/http_connect.lua

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,21 @@ be kept alive.
1414
Call it with a single options table as follows:
1515
1616
client:connect {
17-
scheme = "https" -- scheme to use, or nil for unix domain socket
18-
host = "myhost.com", -- target machine, or a unix domain socket
19-
port = nil, -- port on target machine, will default to 80/443 based on scheme
20-
pool = nil, -- connection pool name, leave blank! this function knows best!
21-
pool_size = nil, -- options as per: https://github.com/openresty/lua-nginx-module#tcpsockconnect
17+
scheme = "https" -- scheme to use, or nil for unix domain socket
18+
host = "myhost.com", -- target machine, or a unix domain socket
19+
port = nil, -- port on target machine, will default to 80/443 based on scheme
20+
pool = nil, -- connection pool name, leave blank! this function knows best!
21+
pool_size = nil, -- options as per: https://github.com/openresty/lua-nginx-module#tcpsockconnect
2222
backlog = nil,
23+
pool_only_after_response = false, -- only allow set_keepalive() after http response fully read
2324
2425
-- ssl options as per: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
2526
ssl_server_name = nil,
2627
ssl_send_status_req = nil,
27-
ssl_verify = true, -- NOTE: defaults to true
28-
ctx = nil, -- NOTE: not supported
28+
ssl_verify = true, -- NOTE: defaults to true
29+
ctx = nil, -- NOTE: not supported
2930
30-
proxy_opts, -- proxy opts, defaults to global proxy options
31+
proxy_opts, -- proxy opts, defaults to global proxy options
3132
}
3233
]]
3334
local function connect(self, options)
@@ -44,6 +45,7 @@ local function connect(self, options)
4445

4546
local poolname = options.pool
4647
local pool_size = options.pool_size
48+
local pool_only_after_response = options.pool_only_after_response
4749
local backlog = options.backlog
4850

4951
if request_scheme and not request_port then
@@ -222,11 +224,12 @@ local function connect(self, options)
222224

223225
self.host = request_host
224226
self.port = request_port
225-
self.keepalive = true
227+
self.keepalive_supported = true
226228
self.ssl = ssl
227229
-- set only for http, https has already been handled
228230
self.http_proxy_auth = request_scheme ~= "https" and proxy_authorization or nil
229231
self.path_prefix = path_prefix
232+
self.pool_only_after_response = pool_only_after_response
230233

231234
return true
232235
end

0 commit comments

Comments
 (0)