Skip to content
This repository was archived by the owner on Jul 21, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 60 additions & 3 deletions zk/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ type Conn struct {
debugCloseRecvLoop bool
debugReauthDone chan struct{}

cleanupChan chan string

logger Logger
logInfo bool // true if information messages are logged; false if only errors are logged

Expand Down Expand Up @@ -206,6 +208,7 @@ func Connect(servers []string, sessionTimeout time.Duration, options ...connOpti
logger: DefaultLogger,
logInfo: true, // default is true for backwards compatability
buf: make([]byte, bufferSize),
cleanupChan: make(chan string),
}

// Set provided options.
Expand All @@ -225,6 +228,12 @@ func Connect(servers []string, sessionTimeout time.Duration, options ...connOpti
conn.invalidateWatches(ErrClosing)
close(conn.eventChan)
}()

go func() {
conn.cleanLoop()
close(conn.cleanupChan)
}()

return conn, ec, nil
}

Expand Down Expand Up @@ -1102,7 +1111,7 @@ func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl
case ErrConnectionClosed:
children, _, err := c.Children(rootPath)
if err != nil {
return "", err
return protectedPath, err
}
for _, p := range children {
parts := strings.Split(p, "/")
Expand All @@ -1115,10 +1124,10 @@ func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl
case nil:
return newPath, nil
default:
return "", err
return protectedPath, err
}
}
return "", err
return protectedPath, err
}

func (c *Conn) Delete(path string, version int32) error {
Expand Down Expand Up @@ -1276,3 +1285,51 @@ func (c *Conn) Server() string {
defer c.serverMu.Unlock()
return c.server
}

// cleanLoop cleans obsolete nodes, which were created, but after some erroneous behaviour (e.g. ErrConnectionClosed)
// are left bound to the session, but the client is not aware of it anymore, since the error has been propagated up the call stack
func (c *Conn) cleanLoop() {
for {
select {
case l := <-c.cleanupChan:
c.clean(l)
case <-c.shouldQuit:
return
}
}
}

// clean deletes the node if it still exists,
// the input nodePath might not have sequence number at the end, so we have to check whether the prefix,
// protected by guid, matches one of the children of the root path
func (c *Conn) clean(nodePath string) {
parts := strings.Split(nodePath, "/")
rootPath := strings.Join(parts[:len(parts)-1], "/")
nodeName := parts[len(parts)-1]

for {
children, _, err := c.Children(rootPath)
if err != nil {
if err == ErrNoNode {
break
}
continue
}
exist := false
for _, p := range children {
if strings.HasPrefix(p, nodeName) {
exist = true
break
}
}

if exist {
if err := c.Delete(nodePath, -1); err != nil {
if err != ErrNoNode {
continue
}
}
}
break
}
}
42 changes: 42 additions & 0 deletions zk/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,45 @@ func TestRecurringReAuthHang(t *testing.T) {

<-conn.debugReauthDone
}

func TestClean(t *testing.T) {
ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()

zk, _, err := ts.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()

acls := WorldACL(PermAll)
_, err = zk.Create("/cleanup", []byte{}, 0, acls)
if err != nil {
t.Fatalf("Create returned error: %v", err)
}

path, err := zk.CreateProtectedEphemeralSequential("/cleanup/lock-", []byte{}, acls)
if err != nil {
t.Fatalf("Create returned error: %v", err)
}

zk.cleanupChan <- path

exists, _, evCh, err := zk.ExistsW(path)
if exists {
select {
case ev := <-evCh:
if ev.Err != nil {
t.Fatalf("ExistW event returned with error %v", err)
}
if ev.Type != EventNodeDeleted {
t.Fatal("Wrong event received")
}
case <-time.After(1 * time.Second):
t.Fatal("Node is not cleared")
}
}
}
38 changes: 26 additions & 12 deletions zk/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,24 @@ func parseSeq(path string) (int, error) {
// is acquired or an error occurs. If this instance already has the lock
// then ErrDeadlock is returned.
func (l *Lock) Lock() error {
path, err := l.lock()
if err != nil {
if err == ErrConnectionClosed && path != "" {
l.c.cleanupChan <- path
}
return err
}
return nil
}

func (l *Lock) lock() (string, error) {
path := ""
if l.lockPath != "" {
return ErrDeadlock
return path, ErrDeadlock
}

prefix := fmt.Sprintf("%s/lock-", l.path)

path := ""
var err error
for i := 0; i < 3; i++ {
path, err = l.c.CreateProtectedEphemeralSequential(prefix, []byte{}, l.acl)
Expand All @@ -62,35 +73,35 @@ func (l *Lock) Lock() error {
pth += "/" + p
exists, _, err = l.c.Exists(pth)
if err != nil {
return err
return path, err
}
if exists == true {
continue
}
_, err = l.c.Create(pth, []byte{}, 0, l.acl)
if err != nil && err != ErrNodeExists {
return err
return path, err
}
}
} else if err == nil {
break
} else {
return err
return path, err
}
}
if err != nil {
return err
return path, err
}

seq, err := parseSeq(path)
if err != nil {
return err
return path, err
}

for {
children, _, err := l.c.Children(l.path)
if err != nil {
return err
return path, err
}

lowestSeq := seq
Expand All @@ -99,7 +110,7 @@ func (l *Lock) Lock() error {
for _, p := range children {
s, err := parseSeq(p)
if err != nil {
return err
return path, err
}
if s < lowestSeq {
lowestSeq = s
Expand All @@ -118,21 +129,21 @@ func (l *Lock) Lock() error {
// Wait on the node next in line for the lock
_, _, ch, err := l.c.GetW(l.path + "/" + prevSeqPath)
if err != nil && err != ErrNoNode {
return err
return path, err
} else if err != nil && err == ErrNoNode {
// try again
continue
}

ev := <-ch
if ev.Err != nil {
return ev.Err
return path, ev.Err
}
}

l.seq = seq
l.lockPath = path
return nil
return path, nil
}

// Unlock releases an acquired lock. If the lock is not currently acquired by
Expand All @@ -142,6 +153,9 @@ func (l *Lock) Unlock() error {
return ErrNotLocked
}
if err := l.c.Delete(l.lockPath, -1); err != nil {
if err == ErrConnectionClosed {
l.c.cleanupChan <- l.lockPath
}
return err
}
l.lockPath = ""
Expand Down