From fc5265b7a365f9e4d73240d71077c4b844c6aade Mon Sep 17 00:00:00 2001 From: Fazal Majid Date: Fri, 2 Jul 2021 11:17:52 +0200 Subject: [PATCH 1/9] mitigation for #28 possible data corruption if the metadata for a queue was not synced on previous shutdown --- diskqueue.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/diskqueue.go b/diskqueue.go index 26b3438..4f3abb8 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -473,6 +473,29 @@ func (d *diskQueue) retrieveMetaData() error { d.nextReadFileNum = d.readFileNum d.nextReadPos = d.readPos + // if the metadata was not sync'd at the last shutdown of nsqd + // then the actual file size might actually be larger than the writePos, + // in which case the safest thing to do is skip to the next file for + // writes, and let the reader salvage what it can from the messages in the + // diskqueue beyond the metadata's likely also stale readPos + fileName = d.fileName(d.writeFileNum) + fileInfo, err := os.Stat(fileName) + if err != nil { + return err + } + fileSize := fileInfo.Size() + if d.writePos < fileSize { + d.logf(WARN, + "DISKQUEUE(%s) %s metadata writePos %d < file size of %d, skipping to new file", + d.name, fileName, d.writePos, fileSize) + d.writeFileNum += 1 + d.writePos = 0 + if d.writeFile != nil { + d.writeFile.Close() + d.writeFile = nil + } + } + return nil } From 03359f678c50b62b469f42c8958df3a7b43d9789 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Tue, 6 Jul 2021 18:18:58 +0000 Subject: [PATCH 2/9] Ensure that depth is 0 if readFileNum == writeFileNum and readPos == writePos. --- diskqueue.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/diskqueue.go b/diskqueue.go index 26b3438..64c73ee 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -608,6 +608,8 @@ func (d *diskQueue) handleReadError() { // significant state change, schedule a sync on the next iteration d.needSync = true + + d.checkTailCorruption(d.depth) } // ioLoop provides the backend for exposing a go channel (via ReadChan()) From f2ecc3728b070d1f890ebfd33967378bbc6f2835 Mon Sep 17 00:00:00 2001 From: kevinsebcam Date: Tue, 7 Sep 2021 15:43:11 -0400 Subject: [PATCH 3/9] Add test for tail corruption when last file is corrupted. --- diskqueue_test.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/diskqueue_test.go b/diskqueue_test.go index ba5879c..01d7971 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -247,6 +247,23 @@ func TestDiskQueueCorruption(t *testing.T) { dq.Put(msg) Equal(t, msg, <-dq.ReadChan()) + + dq.Put(msg) + dq.Put(msg) + // corrupt the last file + dqFn = dq.(*diskQueue).fileName(5) + os.Truncate(dqFn, 100) + + Equal(t, int64(2), dq.Depth()) + + // return one message and try reading again from corrupted file + <-dq.ReadChan() + + // give diskqueue time to handle read error + time.Sleep(50 * time.Millisecond) + + // the last log file is now considered corrupted leaving no more log messages + Equal(t, int64(0), dq.Depth()) } type md struct { From 471e89b5389d87431e68e20f2ec7d8c1fbda78e4 Mon Sep 17 00:00:00 2001 From: Pierce Lopez Date: Sun, 12 Sep 2021 00:14:04 -0400 Subject: [PATCH 4/9] CI: switch from Travis-CI to GitHub Actions --- .github/workflows/test.yml | 39 ++++++++++++++++++++++++++++++++++++++ .travis.yml | 8 -------- 2 files changed, 39 insertions(+), 8 deletions(-) create mode 100644 .github/workflows/test.yml delete mode 100644 .travis.yml diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..d3e8636 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,39 @@ +name: tests + +on: + push: {branches: [master]} + pull_request: {branches: [master]} + +jobs: + test: + runs-on: ubuntu-20.04 + timeout-minutes: 10 + strategy: + fail-fast: false + matrix: + gover: + - "1.13" + - "1.14" + - "1.15" + - "1.16" + - "1.17" + goarch: + - "amd64" + - "386" + + container: "golang:${{matrix.gover}}-alpine" + env: + GOARCH: "${{matrix.goarch}}" + + steps: + - uses: actions/checkout@v2 + + - name: test + run: | + GOMAXPROCS=1 go test -v + GOMAXPROCS=4 go test -v -race + + - name: lint + run: | + go vet + gofmt -w *.go && git diff --exit-code diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index e8a6e43..0000000 --- a/.travis.yml +++ /dev/null @@ -1,8 +0,0 @@ -language: go -go: - - 1.x -script: - - GOMAXPROCS=1 go test -v - - GOMAXPROCS=4 go test -v -race -notifications: - email: false From 7e9a60f62c13cf4f5de404a324deace1258fd182 Mon Sep 17 00:00:00 2001 From: Pierce Lopez Date: Sun, 12 Sep 2021 00:21:49 -0400 Subject: [PATCH 5/9] gofmt diskqueue_test.go (one comment indent) --- diskqueue_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/diskqueue_test.go b/diskqueue_test.go index 01d7971..c3981d4 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -262,7 +262,7 @@ func TestDiskQueueCorruption(t *testing.T) { // give diskqueue time to handle read error time.Sleep(50 * time.Millisecond) - // the last log file is now considered corrupted leaving no more log messages + // the last log file is now considered corrupted leaving no more log messages Equal(t, int64(0), dq.Depth()) } From 2e64e45d1051499a4e32bb46d568ee308a53b637 Mon Sep 17 00:00:00 2001 From: Pierce Lopez Date: Sun, 12 Sep 2021 17:07:20 -0400 Subject: [PATCH 6/9] CI: use debian-based images, only test -race on amd64 debian-based images have gcc (needed for -race) and git (possible with alpine image but would need more install steps) go test -race is not supported on 386 Also, go test without -v, but list tests first: go-diskqueue tests produce a lot of distracting error logging output, and if a test fails, a pretty good amount of logging context is provided. --- .github/workflows/test.yml | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d3e8636..d32a127 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -21,7 +21,7 @@ jobs: - "amd64" - "386" - container: "golang:${{matrix.gover}}-alpine" + container: "golang:${{matrix.gover}}" env: GOARCH: "${{matrix.goarch}}" @@ -30,8 +30,12 @@ jobs: - name: test run: | - GOMAXPROCS=1 go test -v - GOMAXPROCS=4 go test -v -race + go test -list Test + GOMAXPROCS=1 go test + if [ "$GOARCH" = "amd64" ]; then + echo "With -race:" + GOMAXPROCS=4 go test -race + fi - name: lint run: | From 31c99977909cf9d699a65a469174b7cf0fbc7ec0 Mon Sep 17 00:00:00 2001 From: Pierce Lopez Date: Sun, 12 Sep 2021 17:07:50 -0400 Subject: [PATCH 7/9] README: update build/test and docs badges switch Travic-CI badge to a GitHub Actions badge update GoDoc badge to pkg.go.dev "Go Reference" badge (it was already being redirecting to the new site) --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 5605a06..c069678 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # go-diskqueue -[![Build Status](https://secure.travis-ci.org/nsqio/go-diskqueue.png?branch=master)](http://travis-ci.org/nsqio/go-diskqueue) [![GoDoc](https://godoc.org/github.com/nsqio/go-diskqueue?status.svg)](https://godoc.org/github.com/nsqio/go-diskqueue) [![GitHub release](https://img.shields.io/github/release/nsqio/go-diskqueue.svg)](https://github.com/nsqio/go-diskqueue/releases/latest) +[![Build Status](https://github.com/nsqio/go-diskqueue/workflows/tests/badge.svg)](https://github.com/nsqio/go-diskqueue/actions) [![Go Reference](https://pkg.go.dev/badge/github.com/nsqio/go-diskqueue.svg)](https://pkg.go.dev/github.com/nsqio/go-diskqueue) [![GitHub release](https://img.shields.io/github/release/nsqio/go-diskqueue.svg)](https://github.com/nsqio/go-diskqueue/releases/latest) A Go package providing a filesystem-backed FIFO queue From eefc786a4bda5cc0ee57cc07baf4b47f12ba2961 Mon Sep 17 00:00:00 2001 From: Pierce Lopez Date: Sat, 11 Sep 2021 23:44:41 -0400 Subject: [PATCH 8/9] switch to next file before maxBytesPerFile is reached rather than switching *after* maxBytesPerFile is reached fixes https://github.com/nsqio/go-diskqueue/issues/30 --- diskqueue.go | 54 +++++++++++++++++++++++------------------------ diskqueue_test.go | 28 +++++++++++++----------- 2 files changed, 43 insertions(+), 39 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 9a982a5..ee6c22d 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -358,6 +358,33 @@ func (d *diskQueue) readOne() ([]byte, error) { func (d *diskQueue) writeOne(data []byte) error { var err error + dataLen := int32(len(data)) + totalBytes := int64(4 + dataLen) + + if dataLen < d.minMsgSize || dataLen > d.maxMsgSize { + return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize) + } + + // will not wrap-around if maxBytesPerFile + maxMsgSize < Int64Max + if d.writePos > 0 && d.writePos+totalBytes > d.maxBytesPerFile { + if d.readFileNum == d.writeFileNum { + d.maxBytesPerFileRead = d.writePos + } + + d.writeFileNum++ + d.writePos = 0 + + // sync every time we start writing to a new file + err = d.sync() + if err != nil { + d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err) + } + + if d.writeFile != nil { + d.writeFile.Close() + d.writeFile = nil + } + } if d.writeFile == nil { curFileName := d.fileName(d.writeFileNum) d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600) @@ -377,12 +404,6 @@ func (d *diskQueue) writeOne(data []byte) error { } } - dataLen := int32(len(data)) - - if dataLen < d.minMsgSize || dataLen > d.maxMsgSize { - return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize) - } - d.writeBuf.Reset() err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) if err != nil { @@ -402,30 +423,9 @@ func (d *diskQueue) writeOne(data []byte) error { return err } - totalBytes := int64(4 + dataLen) d.writePos += totalBytes d.depth += 1 - if d.writePos >= d.maxBytesPerFile { - if d.readFileNum == d.writeFileNum { - d.maxBytesPerFileRead = d.writePos - } - - d.writeFileNum++ - d.writePos = 0 - - // sync every time we start writing to a new file - err = d.sync() - if err != nil { - d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err) - } - - if d.writeFile != nil { - d.writeFile.Close() - d.writeFile = nil - } - } - return err } diff --git a/diskqueue_test.go b/diskqueue_test.go index c3981d4..fc72406 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -108,23 +108,23 @@ func TestDiskQueueRoll(t *testing.T) { panic(err) } defer os.RemoveAll(tmpDir) - msg := bytes.Repeat([]byte{0}, 10) + msg := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0} ml := int64(len(msg)) dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l) defer dq.Close() NotNil(t, dq) Equal(t, int64(0), dq.Depth()) - for i := 0; i < 10; i++ { + for i := 0; i < 11; i++ { err := dq.Put(msg) Nil(t, err) Equal(t, int64(i+1), dq.Depth()) } Equal(t, int64(1), dq.(*diskQueue).writeFileNum) - Equal(t, int64(0), dq.(*diskQueue).writePos) + Equal(t, int64(ml+4), dq.(*diskQueue).writePos) - for i := 10; i > 0; i-- { + for i := 11; i > 0; i-- { Equal(t, msg, <-dq.ReadChan()) Equal(t, int64(i-1), dq.Depth()) } @@ -216,7 +216,11 @@ func TestDiskQueueCorruption(t *testing.T) { dq := New(dqName, tmpDir, 1000, 10, 1<<10, 5, 2*time.Second, l) defer dq.Close() - msg := make([]byte, 123) // 127 bytes per message, 8 (1016 bytes) messages per file + msg := make([]byte, 120) // 124 bytes per message, 8 messages (992 bytes) per file + msg[0] = 91 + msg[62] = 4 + msg[119] = 211 + for i := 0; i < 25; i++ { dq.Put(msg) } @@ -225,7 +229,7 @@ func TestDiskQueueCorruption(t *testing.T) { // corrupt the 2nd file dqFn := dq.(*diskQueue).fileName(1) - os.Truncate(dqFn, 500) // 3 valid messages, 5 corrupted + os.Truncate(dqFn, 400) // 3 valid messages, 5 corrupted for i := 0; i < 19; i++ { // 1 message leftover in 4th file Equal(t, msg, <-dq.ReadChan()) @@ -451,14 +455,14 @@ func TestDiskQueueResize(t *testing.T) { NotNil(t, dq) Equal(t, int64(0), dq.Depth()) - for i := 0; i < 8; i++ { + for i := 0; i < 9; i++ { msg[0] = byte(i) err := dq.Put(msg) Nil(t, err) } Equal(t, int64(1), dq.(*diskQueue).writeFileNum) - Equal(t, int64(0), dq.(*diskQueue).writePos) - Equal(t, int64(8), dq.Depth()) + Equal(t, int64(ml+4), dq.(*diskQueue).writePos) + Equal(t, int64(9), dq.Depth()) dq.Close() dq = New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, time.Second, l) @@ -469,10 +473,10 @@ func TestDiskQueueResize(t *testing.T) { Nil(t, err) } Equal(t, int64(2), dq.(*diskQueue).writeFileNum) - Equal(t, int64(0), dq.(*diskQueue).writePos) - Equal(t, int64(18), dq.Depth()) + Equal(t, int64(ml+4), dq.(*diskQueue).writePos) + Equal(t, int64(19), dq.Depth()) - for i := 0; i < 8; i++ { + for i := 0; i < 9; i++ { msg[0] = byte(i) Equal(t, msg, <-dq.ReadChan()) } From 2cb43388a24e6c2c24d788fd9a694c7957ae0f25 Mon Sep 17 00:00:00 2001 From: Yun Zhao <153869020@qq.com> Date: Mon, 11 Jan 2021 20:11:48 +0800 Subject: [PATCH 9/9] add PeekChan() --- diskqueue.go | 13 ++++++++ diskqueue_test.go | 77 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+) diff --git a/diskqueue.go b/diskqueue.go index ee6c22d..f01ca71 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -47,6 +47,7 @@ func (l LogLevel) String() string { type Interface interface { Put([]byte) error ReadChan() <-chan []byte // this is expected to be an *unbuffered* channel + PeekChan() <-chan []byte // this is expected to be an *unbuffered* channel Close() error Delete() error Depth() int64 @@ -91,6 +92,9 @@ type diskQueue struct { // exposed via ReadChan() readChan chan []byte + // exposed via PeekChan() + peekChan chan []byte + // internal channels depthChan chan int64 writeChan chan []byte @@ -115,6 +119,7 @@ func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize: minMsgSize, maxMsgSize: maxMsgSize, readChan: make(chan []byte), + peekChan: make(chan []byte), depthChan: make(chan int64), writeChan: make(chan []byte), writeResponseChan: make(chan error), @@ -152,6 +157,10 @@ func (d *diskQueue) ReadChan() <-chan []byte { return d.readChan } +func (d *diskQueue) PeekChan() <-chan []byte { + return d.peekChan +} + // Put writes a []byte to the queue func (d *diskQueue) Put(data []byte) error { d.RLock() @@ -648,6 +657,7 @@ func (d *diskQueue) ioLoop() { var err error var count int64 var r chan []byte + var p chan []byte syncTicker := time.NewTicker(d.syncTimeout) @@ -676,13 +686,16 @@ func (d *diskQueue) ioLoop() { } } r = d.readChan + p = d.peekChan } else { r = nil + p = nil } select { // the Go channel spec dictates that nil channel operations (read or write) // in a select are skipped, we set r to d.readChan only when there is data to read + case p <- dataRead: case r <- dataRead: count++ // moveForward sets needSync flag if a file is removed diff --git a/diskqueue_test.go b/diskqueue_test.go index fc72406..9d9ae0b 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -130,6 +130,83 @@ func TestDiskQueueRoll(t *testing.T) { } } +func TestDiskQueuePeek(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_peek" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + msg := bytes.Repeat([]byte{0}, 10) + ml := int64(len(msg)) + dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l) + defer dq.Close() + NotNil(t, dq) + Equal(t, int64(0), dq.Depth()) + + t.Run("roll", func(t *testing.T) { + for i := 0; i < 10; i++ { + err := dq.Put(msg) + Nil(t, err) + Equal(t, int64(i+1), dq.Depth()) + } + + for i := 10; i > 0; i-- { + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(i-1), dq.Depth()) + } + + Nil(t, dq.Empty()) + }) + + t.Run("peek-read", func(t *testing.T) { + for i := 0; i < 10; i++ { + err := dq.Put(msg) + Nil(t, err) + Equal(t, int64(i+1), dq.Depth()) + } + + for i := 10; i > 0; i-- { + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(i-1), dq.Depth()) + } + + Nil(t, dq.Empty()) + }) + + t.Run("read-peek", func(t *testing.T) { + for i := 0; i < 10; i++ { + err := dq.Put(msg) + Nil(t, err) + Equal(t, int64(i+1), dq.Depth()) + } + + for i := 10; i > 1; i-- { + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(i-1), dq.Depth()) + + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i-1), dq.Depth()) + } + + Nil(t, dq.Empty()) + }) + +} + func assertFileNotExist(t *testing.T, fn string) { f, err := os.OpenFile(fn, os.O_RDONLY, 0600) Equal(t, (*os.File)(nil), f)