Skip to content
This repository was archived by the owner on Jul 21, 2021. It is now read-only.

Commit cc6f56c

Browse files
author
James DeFelice
committed
idempotent Close; queueRequest guards against deadlocks on closed connections
1 parent 471cd4e commit cc6f56c

File tree

1 file changed

+33
-10
lines changed

1 file changed

+33
-10
lines changed

zk/conn.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ type Conn struct {
8282
eventChan chan Event
8383
eventCallback EventCallback // may be nil
8484
shouldQuit chan struct{}
85+
shouldQuitOnce sync.Once
8586
pingInterval time.Duration
8687
recvTimeout time.Duration
8788
connectTimeout time.Duration
@@ -301,18 +302,20 @@ func WithMaxBufferSize(maxBufferSize int) connOption {
301302
// to a limit of 1mb. This option should be used for non-standard server setup
302303
// where znode is bigger than default 1mb.
303304
func WithMaxConnBufferSize(maxBufferSize int) connOption {
304-
return func(c *Conn) {
305-
c.buf = make([]byte, maxBufferSize)
306-
}
305+
return func(c *Conn) {
306+
c.buf = make([]byte, maxBufferSize)
307+
}
307308
}
308309

309310
func (c *Conn) Close() {
310-
close(c.shouldQuit)
311+
c.shouldQuitOnce.Do(func() {
312+
close(c.shouldQuit)
311313

312-
select {
313-
case <-c.queueRequest(opClose, &closeRequest{}, &closeResponse{}, nil):
314-
case <-time.After(time.Second):
315-
}
314+
select {
315+
case <-c.queueRequest(opClose, &closeRequest{}, &closeResponse{}, nil):
316+
case <-time.After(time.Second):
317+
}
318+
})
316319
}
317320

318321
// State returns the current state of the connection.
@@ -904,10 +907,30 @@ func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}, recv
904907
opcode: opcode,
905908
pkt: req,
906909
recvStruct: res,
907-
recvChan: make(chan response, 1),
910+
recvChan: make(chan response, 2),
908911
recvFunc: recvFunc,
909912
}
910-
c.sendChan <- rq
913+
914+
switch opcode {
915+
case opClose:
916+
// always attempt to send close ops.
917+
c.sendChan <- rq
918+
default:
919+
// otherwise avoid deadlocks for dumb clients who aren't aware that
920+
// the ZK connection is closed yet.
921+
select {
922+
case <-c.shouldQuit:
923+
rq.recvChan <- response{-1, ErrConnectionClosed}
924+
case c.sendChan <- rq:
925+
// check for a tie
926+
select {
927+
case <-c.shouldQuit:
928+
// maybe the caller gets this, maybe not- we tried.
929+
rq.recvChan <- response{-1, ErrConnectionClosed}
930+
default:
931+
}
932+
}
933+
}
911934
return rq.recvChan
912935
}
913936

0 commit comments

Comments
 (0)