Skip to content

Commit 5347a42

Browse files
authored
fix: let etcd v3 read_watch handle the case where a chunk contains partial event or multiple events (#154)
1 parent f75f4c5 commit 5347a42

File tree

4 files changed

+204
-26
lines changed

4 files changed

+204
-26
lines changed

.github/workflows/ci.yml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,6 @@ jobs:
1515
include:
1616
- version: 2.2.5
1717
conf: Procfile-single
18-
- version: 3.1.0
19-
conf: Procfile-single
20-
- version: 3.2.0
21-
conf: Procfile-single
22-
- version: 3.3.0
23-
conf: Procfile-single-enable-v2
2418
- version: 3.4.0
2519
conf: Procfile-single-enable-v2
2620
- version: 3.5.0

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ Table of Contents
1111
* [Install](#install)
1212
* [API v2](api_v2.md)
1313
* [API v3](api_v3.md)
14+
* **NOTE**: Requires ETCD version >= v3.4.0
1415

1516
## Install
1617

lib/resty/etcd/v3.lua

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
-- https://github.com/ledgetech/lua-resty-http
2+
local split = require("ngx.re").split
23
local typeof = require("typeof")
34
local cjson = require("cjson.safe")
45
local setmetatable = setmetatable
@@ -688,43 +689,77 @@ local function request_chunk(self, method, path, opts, timeout)
688689

689690

690691
local function read_watch()
692+
body = nil
691693

692694
while(1) do
693-
body, err = res.body_reader()
695+
local chunk, read_err = res.body_reader()
696+
if read_err then
697+
return nil, read_err
698+
end
699+
if not chunk then
700+
break
701+
end
702+
694703
if not body then
695-
return nil, err
704+
body = chunk
705+
else
706+
-- this branch will only be executed in rare cases, for example, a single event json
707+
-- is larger than the proxy_buffer_size of nginx which proxies etcd, so it would be
708+
-- ok to use a string concat directly without worry about the performance.
709+
body = body .. chunk
696710
end
697-
if not utils.is_empty_str(body) then
711+
712+
if not utils.is_empty_str(chunk) and str_byte(chunk, -1) == str_byte("\n") then
698713
break
699714
end
700715

701716
end
702717

703-
body, err = decode_json(body)
704718
if not body then
705-
return nil, "failed to decode json body: " .. (err or " unkwon")
706-
elseif body.error and body.error.http_code >= 500 then
707-
-- health_check retry should do nothing here
708-
-- and let connection closed to create a new one
709-
health_check.report_failure(endpoint.http_host)
710-
return nil, endpoint.http_host .. ": " .. body.error.http_status
719+
return nil, nil
711720
end
712721

713-
if body.result and body.result.events then
714-
for _, event in ipairs(body.result.events) do
715-
if event.kv.value then -- DELETE not have value
716-
event.kv.value = decode_base64(event.kv.value or "")
717-
event.kv.value = self.serializer.deserialize(event.kv.value)
722+
local chunks, split_err = split(body, [[\n]], "jo")
723+
if split_err then
724+
return nil, "failed to split chunks: " .. split_err
725+
end
726+
727+
local all_events = {}
728+
for _, chunk in ipairs(chunks) do
729+
body, err = decode_json(chunk)
730+
if not body then
731+
return nil, "failed to decode json body: " .. (err or " unknown")
732+
elseif body.error and body.error.http_code >= 500 then
733+
-- health_check retry should do nothing here
734+
-- and let connection closed to create a new one
735+
health_check.report_failure(endpoint.http_host)
736+
return nil, endpoint.http_host .. ": " .. body.error.http_status
737+
end
738+
739+
if body.result and body.result.events then
740+
for _, event in ipairs(body.result.events) do
741+
if event.kv.value then -- DELETE not have value
742+
event.kv.value = decode_base64(event.kv.value or "")
743+
event.kv.value = self.serializer.deserialize(event.kv.value)
744+
end
745+
event.kv.key = decode_base64(event.kv.key)
746+
if event.prev_kv then
747+
event.prev_kv.value = decode_base64(event.prev_kv.value or "")
748+
event.prev_kv.value = self.serializer.deserialize(event.prev_kv.value)
749+
event.prev_kv.key = decode_base64(event.prev_kv.key)
750+
end
751+
tab_insert(all_events, event)
718752
end
719-
event.kv.key = decode_base64(event.kv.key)
720-
if event.prev_kv then
721-
event.prev_kv.value = decode_base64(event.prev_kv.value or "")
722-
event.prev_kv.value = self.serializer.deserialize(event.prev_kv.value)
723-
event.prev_kv.key = decode_base64(event.prev_kv.key)
753+
else
754+
if #chunks == 1 then
755+
return body
724756
end
725757
end
726758
end
727759

760+
if #all_events > 1 then
761+
body.result.events = all_events
762+
end
728763
return body
729764
end
730765

t/v3/key.t

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,3 +512,151 @@ passed
512512
request chunk headers: {"foo":"bar"}
513513
--- no_error_log
514514
[error]
515+
516+
517+
518+
=== TEST 13: watch response which http chunk contains partial etcd event response
519+
--- http_config eval: $::HttpConfig
520+
--- config
521+
location /version {
522+
content_by_lua_block {
523+
ngx.say('{"etcdserver":"3.4.0","etcdcluster":"3.4.0"}')
524+
}
525+
}
526+
527+
location /v3/watch {
528+
content_by_lua_block {
529+
-- payload get from tcpdump while running TEST 3 and split the event response into two chunks
530+
531+
ngx.say('{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"271","raft_term":"7"},"created":true}}')
532+
ngx.flush()
533+
ngx.sleep(0.1)
534+
535+
-- partial event without trailing new line
536+
ngx.print('{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437",')
537+
ngx.flush()
538+
ngx.print('"revision":"272","raft_term":"7"},"events"')
539+
ngx.flush()
540+
541+
-- key = /test, value = bcd3
542+
ngx.say(':[{"kv":{"key":"L3Rlc3Q=","create_revision":"156","mod_revision":"272","version":"44","value":"ImJjZDMi"}}]}}')
543+
ngx.flush()
544+
545+
-- ensure client timeout
546+
ngx.sleep(1)
547+
}
548+
}
549+
550+
location /t {
551+
content_by_lua_block {
552+
local etcd, err = require("resty.etcd").new({
553+
protocol = "v3",
554+
http_host = {
555+
"http://127.0.0.1:" .. ngx.var.server_port,
556+
},
557+
})
558+
check_res(etcd, err)
559+
560+
local cur_time = ngx.now()
561+
local body_chunk_fun, err = etcd:watch("/test", {timeout = 0.5})
562+
if not body_chunk_fun then
563+
ngx.say("failed to watch: ", err)
564+
end
565+
566+
local idx = 0
567+
while true do
568+
local chunk, err = body_chunk_fun()
569+
570+
if not chunk then
571+
if err then
572+
ngx.say(err)
573+
end
574+
break
575+
end
576+
577+
idx = idx + 1
578+
ngx.say(idx, ": ", require("cjson").encode(chunk.result))
579+
end
580+
}
581+
}
582+
--- request
583+
GET /t
584+
--- no_error_log
585+
[error]
586+
--- response_body_like eval
587+
qr/1:.*"created":true.*
588+
2:.*"value":"bcd3".*
589+
timeout/
590+
--- timeout: 5
591+
592+
593+
594+
=== TEST 14: watch response which one http chunk contains multiple events chunk
595+
--- http_config eval: $::HttpConfig
596+
--- config
597+
location /version {
598+
content_by_lua_block {
599+
ngx.say('{"etcdserver":"3.4.0","etcdcluster":"3.4.0"}')
600+
}
601+
}
602+
603+
location /v3/watch {
604+
content_by_lua_block {
605+
-- payload get from tcpdump while running TEST 5 and merge two event response into one http chunk
606+
607+
ngx.say('{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"290","raft_term":"8"},"created":true}}')
608+
ngx.flush()
609+
ngx.sleep(0.1)
610+
611+
-- one http chunk contains multiple event response, note the new line at the end of first event response
612+
-- key1 = /wdir/, value1 = bcd4
613+
-- key2 = /wdir/a, value2 = bcd4a
614+
ngx.say('{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"292","raft_term":"8"},"events":[{"kv":{"key":"L3dkaXIv","create_revision":"31","mod_revision":"292","version":"22","value":"ImJjZDQi"}}]}}\n{"result":{"header":{"cluster_id":"14841639068965178418","member_id":"10276657743932975437","revision":"293","raft_term":"8"},"events":[{"kv":{"key":"L3dkaXIvYQ==","create_revision":"293","mod_revision":"293","version":"1","value":"ImJjZDRhIg=="}}]}}')
615+
ngx.flush()
616+
617+
-- ensure client timeout
618+
ngx.sleep(1)
619+
}
620+
}
621+
622+
location /t {
623+
content_by_lua_block {
624+
local etcd, err = require("resty.etcd").new({
625+
protocol = "v3",
626+
http_host = {
627+
"http://127.0.0.1:" .. ngx.var.server_port,
628+
},
629+
})
630+
check_res(etcd, err)
631+
632+
local cur_time = ngx.now()
633+
local body_chunk_fun, err = etcd:watch("/", {timeout = 0.5})
634+
if not body_chunk_fun then
635+
ngx.say("failed to watch: ", err)
636+
end
637+
638+
local idx = 0
639+
while true do
640+
local chunk, err = body_chunk_fun()
641+
642+
if not chunk then
643+
if err then
644+
ngx.say(err)
645+
end
646+
break
647+
end
648+
649+
idx = idx + 1
650+
ngx.say(idx, ": ", require("cjson").encode(chunk.result))
651+
end
652+
}
653+
}
654+
--- request
655+
GET /t
656+
--- no_error_log
657+
[error]
658+
--- response_body_like eval
659+
qr/1:.*"created":true.*
660+
2:.*"value":"bcd4".*"value":"bcd4a".*
661+
timeout/
662+
--- timeout: 5

0 commit comments

Comments
 (0)