Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit 449c45a

Browse files
authored
Merge pull request #3 from mesosphere/jdef/zkclose-1
idempotent Close; queueRequest guards against deadlocks on closed connections
2 parents 695728a + 0858988 commit 449c45a

File tree

2 files changed

+72
-7
lines changed

2 files changed

+72
-7
lines changed

zk/conn.go

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ type Conn struct {
8282
eventChan chan Event
8383
eventCallback EventCallback // may be nil
8484
shouldQuit chan struct{}
85+
shouldQuitOnce sync.Once
8586
pingInterval time.Duration
8687
recvTimeout time.Duration
8788
connectTimeout time.Duration
@@ -315,12 +316,14 @@ func WithMaxConnBufferSize(maxBufferSize int) connOption {
315316
}
316317

317318
func (c *Conn) Close() {
318-
close(c.shouldQuit)
319+
c.shouldQuitOnce.Do(func() {
320+
close(c.shouldQuit)
319321

320-
select {
321-
case <-c.queueRequest(opClose, &closeRequest{}, &closeResponse{}, nil):
322-
case <-time.After(time.Second):
323-
}
322+
select {
323+
case <-c.queueRequest(opClose, &closeRequest{}, &closeResponse{}, nil):
324+
case <-time.After(time.Second):
325+
}
326+
})
324327
}
325328

326329
// State returns the current state of the connection.
@@ -977,10 +980,30 @@ func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}, recv
977980
opcode: opcode,
978981
pkt: req,
979982
recvStruct: res,
980-
recvChan: make(chan response, 1),
983+
recvChan: make(chan response, 2),
981984
recvFunc: recvFunc,
982985
}
983-
c.sendChan <- rq
986+
987+
switch opcode {
988+
case opClose:
989+
// always attempt to send close ops.
990+
c.sendChan <- rq
991+
default:
992+
// otherwise avoid deadlocks for dumb clients who aren't aware that
993+
// the ZK connection is closed yet.
994+
select {
995+
case <-c.shouldQuit:
996+
rq.recvChan <- response{-1, ErrConnectionClosed}
997+
case c.sendChan <- rq:
998+
// check for a tie
999+
select {
1000+
case <-c.shouldQuit:
1001+
// maybe the caller gets this, maybe not- we tried.
1002+
rq.recvChan <- response{-1, ErrConnectionClosed}
1003+
default:
1004+
}
1005+
}
1006+
}
9841007
return rq.recvChan
9851008
}
9861009

zk/zk_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,37 @@ func TestCreate(t *testing.T) {
9393
}
9494
}
9595

96+
func TestOpsAfterCloseDontDeadlock(t *testing.T) {
97+
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
98+
if err != nil {
99+
t.Fatal(err)
100+
}
101+
defer ts.Stop()
102+
zk, _, err := ts.ConnectAll()
103+
if err != nil {
104+
t.Fatalf("Connect returned error: %+v", err)
105+
}
106+
zk.Close()
107+
108+
path := "/gozk-test"
109+
110+
ch := make(chan struct{})
111+
go func() {
112+
defer close(ch)
113+
for range make([]struct{}, 30) {
114+
if _, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err == nil {
115+
t.Fatal("Create did not return error")
116+
}
117+
}
118+
}()
119+
select {
120+
case <-ch:
121+
// expected
122+
case <-time.After(10 * time.Second):
123+
t.Fatal("ZK connection deadlocked when executing ops after a Close operation")
124+
}
125+
}
126+
96127
func TestMulti(t *testing.T) {
97128
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
98129
if err != nil {
@@ -139,6 +170,7 @@ func TestIfAuthdataSurvivesReconnect(t *testing.T) {
139170
if err != nil {
140171
t.Fatal(err)
141172
}
173+
defer ts.Stop()
142174

143175
zk, _, err := ts.ConnectAll()
144176
if err != nil {
@@ -666,6 +698,16 @@ func TestRequestFail(t *testing.T) {
666698
}
667699
}
668700

701+
func TestIdempotentClose(t *testing.T) {
702+
zk, _, err := Connect([]string{"127.0.0.1:32444"}, time.Second*15)
703+
if err != nil {
704+
t.Fatal(err)
705+
}
706+
// multiple calls to Close() should not panic
707+
zk.Close()
708+
zk.Close()
709+
}
710+
669711
func TestSlowServer(t *testing.T) {
670712
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
671713
if err != nil {

0 commit comments

Comments
 (0)