From 231053bfe2441af6ab5bba67cf70acf88a58ce5b Mon Sep 17 00:00:00 2001 From: ishveda Date: Thu, 2 Apr 2020 15:42:03 +0300 Subject: [PATCH 1/3] check and clean nodes which were potentially created by the server, but client is no more aware of it --- zk/conn.go | 63 +++++++++++++++++++++++++++++++++++++++++++++++++++--- zk/lock.go | 38 +++++++++++++++++++++----------- 2 files changed, 86 insertions(+), 15 deletions(-) diff --git a/zk/conn.go b/zk/conn.go index da9503a2..4737df66 100644 --- a/zk/conn.go +++ b/zk/conn.go @@ -105,6 +105,8 @@ type Conn struct { debugCloseRecvLoop bool debugReauthDone chan struct{} + cleanupChan chan string + logger Logger logInfo bool // true if information messages are logged; false if only errors are logged @@ -206,6 +208,7 @@ func Connect(servers []string, sessionTimeout time.Duration, options ...connOpti logger: DefaultLogger, logInfo: true, // default is true for backwards compatability buf: make([]byte, bufferSize), + cleanupChan: make(chan string), } // Set provided options. @@ -225,6 +228,12 @@ func Connect(servers []string, sessionTimeout time.Duration, options ...connOpti conn.invalidateWatches(ErrClosing) close(conn.eventChan) }() + + go func() { + conn.cleanLoop() + close(conn.cleanupChan) + }() + return conn, ec, nil } @@ -1102,7 +1111,7 @@ func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl case ErrConnectionClosed: children, _, err := c.Children(rootPath) if err != nil { - return "", err + return protectedPath, err } for _, p := range children { parts := strings.Split(p, "/") @@ -1115,10 +1124,10 @@ func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl case nil: return newPath, nil default: - return "", err + return protectedPath, err } } - return "", err + return protectedPath, err } func (c *Conn) Delete(path string, version int32) error { @@ -1276,3 +1285,51 @@ func (c *Conn) Server() string { defer c.serverMu.Unlock() return c.server } + +// cleanLoop cleans obsolete nodes, which were created, but after some erroneous behaviour (e.g. ErrConnectionClosed) +// are left bound to the session, but the client is not aware of it anymore, since the error has been propagated up the call stack +func (c *Conn) cleanLoop() { + for { + select { + case l := <-c.cleanupChan: + c.clean(l) + case <-c.shouldQuit: + return + } + } +} + +// clean deletes the node if it still exists, +// the input nodePath might not have sequence number at the end, so we have to check whether the prefix, +// protected by guid, matches one of the children of the root path +func (c *Conn) clean(nodePath string) { + parts := strings.Split(nodePath, "/") + rootPath := strings.Join(parts[:len(parts)-1], "/") + nodeName := parts[len(parts)-1] + + for { + children, _, err := c.Children(rootPath) + if err != nil { + if err == ErrNoNode { + break + } + continue + } + exist := false + for _, p := range children { + if strings.HasPrefix(p, nodeName) { + exist = true + break + } + } + + if exist { + if err := c.Delete(nodePath, -1); err != nil { + if err != ErrNoNode { + continue + } + } + } + break + } +} diff --git a/zk/lock.go b/zk/lock.go index 3c35a427..6d002d2c 100644 --- a/zk/lock.go +++ b/zk/lock.go @@ -43,13 +43,24 @@ func parseSeq(path string) (int, error) { // is acquired or an error occurs. If this instance already has the lock // then ErrDeadlock is returned. func (l *Lock) Lock() error { + path, err := l.lock() + if err != nil { + if err == ErrConnectionClosed && path != "" { + l.c.cleanupChan <- path + } + return err + } + return nil +} + +func (l *Lock) lock() (string, error) { + path := "" if l.lockPath != "" { - return ErrDeadlock + return path, ErrDeadlock } prefix := fmt.Sprintf("%s/lock-", l.path) - path := "" var err error for i := 0; i < 3; i++ { path, err = l.c.CreateProtectedEphemeralSequential(prefix, []byte{}, l.acl) @@ -62,35 +73,35 @@ func (l *Lock) Lock() error { pth += "/" + p exists, _, err = l.c.Exists(pth) if err != nil { - return err + return path, err } if exists == true { continue } _, err = l.c.Create(pth, []byte{}, 0, l.acl) if err != nil && err != ErrNodeExists { - return err + return path, err } } } else if err == nil { break } else { - return err + return path, err } } if err != nil { - return err + return path, err } seq, err := parseSeq(path) if err != nil { - return err + return path, err } for { children, _, err := l.c.Children(l.path) if err != nil { - return err + return path, err } lowestSeq := seq @@ -99,7 +110,7 @@ func (l *Lock) Lock() error { for _, p := range children { s, err := parseSeq(p) if err != nil { - return err + return path, err } if s < lowestSeq { lowestSeq = s @@ -118,7 +129,7 @@ func (l *Lock) Lock() error { // Wait on the node next in line for the lock _, _, ch, err := l.c.GetW(l.path + "/" + prevSeqPath) if err != nil && err != ErrNoNode { - return err + return path, err } else if err != nil && err == ErrNoNode { // try again continue @@ -126,13 +137,13 @@ func (l *Lock) Lock() error { ev := <-ch if ev.Err != nil { - return ev.Err + return path, ev.Err } } l.seq = seq l.lockPath = path - return nil + return path, nil } // Unlock releases an acquired lock. If the lock is not currently acquired by @@ -142,6 +153,9 @@ func (l *Lock) Unlock() error { return ErrNotLocked } if err := l.c.Delete(l.lockPath, -1); err != nil { + if err == ErrConnectionClosed { + l.c.cleanupChan <- l.lockPath + } return err } l.lockPath = "" From 1e83fc1c1420c730331449677d51ff93decfd0bd Mon Sep 17 00:00:00 2001 From: ishveda Date: Thu, 2 Apr 2020 23:49:44 +0300 Subject: [PATCH 2/3] add clean test --- zk/conn_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/zk/conn_test.go b/zk/conn_test.go index ee670932..fb48228f 100644 --- a/zk/conn_test.go +++ b/zk/conn_test.go @@ -55,3 +55,45 @@ func TestRecurringReAuthHang(t *testing.T) { <-conn.debugReauthDone } + +func TestClean(t *testing.T) { + ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "}) + if err != nil { + t.Fatal(err) + } + defer ts.Stop() + + zk, _, err := ts.ConnectAll() + if err != nil { + t.Fatalf("Connect returned error: %+v", err) + } + defer zk.Close() + + acls := WorldACL(PermAll) + _, err = zk.Create("/cleanup", []byte{}, 0, acls) + if err != nil { + t.Fatalf("Create returned error: %v", err) + } + + path, err := zk.CreateProtectedEphemeralSequential("/cleanup/lock-", []byte{}, acls) + if err != nil { + t.Fatalf("Create returned error: %v", err) + } + + zk.cleanupChan <- path + + exists, _, evCh, err := zk.ExistsW(path) + if exists { + select { + case ev := <-evCh: + if ev.Err != nil { + t.Fatalf("ExistW event returned with error %v", err) + } + if ev.Type != EventNodeDeleted { + t.Fatal("Wrong event received") + } + case <-time.After(1 * time.Second): + t.Fatal("Node is not cleared") + } + } +} From 64ed7173b5ed647f9b7fe7c7adf518a67178f89c Mon Sep 17 00:00:00 2001 From: ishveda Date: Fri, 3 Apr 2020 02:00:44 +0300 Subject: [PATCH 3/3] add clean test --- zk/conn_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zk/conn_test.go b/zk/conn_test.go index fb48228f..509685b9 100644 --- a/zk/conn_test.go +++ b/zk/conn_test.go @@ -96,4 +96,4 @@ func TestClean(t *testing.T) { t.Fatal("Node is not cleared") } } -} +} \ No newline at end of file