diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..d32a127 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,43 @@ +name: tests + +on: + push: {branches: [master]} + pull_request: {branches: [master]} + +jobs: + test: + runs-on: ubuntu-20.04 + timeout-minutes: 10 + strategy: + fail-fast: false + matrix: + gover: + - "1.13" + - "1.14" + - "1.15" + - "1.16" + - "1.17" + goarch: + - "amd64" + - "386" + + container: "golang:${{matrix.gover}}" + env: + GOARCH: "${{matrix.goarch}}" + + steps: + - uses: actions/checkout@v2 + + - name: test + run: | + go test -list Test + GOMAXPROCS=1 go test + if [ "$GOARCH" = "amd64" ]; then + echo "With -race:" + GOMAXPROCS=4 go test -race + fi + + - name: lint + run: | + go vet + gofmt -w *.go && git diff --exit-code diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index e8a6e43..0000000 --- a/.travis.yml +++ /dev/null @@ -1,8 +0,0 @@ -language: go -go: - - 1.x -script: - - GOMAXPROCS=1 go test -v - - GOMAXPROCS=4 go test -v -race -notifications: - email: false diff --git a/README.md b/README.md index 5605a06..c069678 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # go-diskqueue -[![Build Status](https://secure.travis-ci.org/nsqio/go-diskqueue.png?branch=master)](http://travis-ci.org/nsqio/go-diskqueue) [![GoDoc](https://godoc.org/github.com/nsqio/go-diskqueue?status.svg)](https://godoc.org/github.com/nsqio/go-diskqueue) [![GitHub release](https://img.shields.io/github/release/nsqio/go-diskqueue.svg)](https://github.com/nsqio/go-diskqueue/releases/latest) +[![Build Status](https://github.com/nsqio/go-diskqueue/workflows/tests/badge.svg)](https://github.com/nsqio/go-diskqueue/actions) [![Go Reference](https://pkg.go.dev/badge/github.com/nsqio/go-diskqueue.svg)](https://pkg.go.dev/github.com/nsqio/go-diskqueue) [![GitHub release](https://img.shields.io/github/release/nsqio/go-diskqueue.svg)](https://github.com/nsqio/go-diskqueue/releases/latest) A Go package providing a filesystem-backed FIFO queue diff --git a/diskqueue.go b/diskqueue.go index 26b3438..f01ca71 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -47,6 +47,7 @@ func (l LogLevel) String() string { type Interface interface { Put([]byte) error ReadChan() <-chan []byte // this is expected to be an *unbuffered* channel + PeekChan() <-chan []byte // this is expected to be an *unbuffered* channel Close() error Delete() error Depth() int64 @@ -91,6 +92,9 @@ type diskQueue struct { // exposed via ReadChan() readChan chan []byte + // exposed via PeekChan() + peekChan chan []byte + // internal channels depthChan chan int64 writeChan chan []byte @@ -115,6 +119,7 @@ func New(name string, dataPath string, maxBytesPerFile int64, minMsgSize: minMsgSize, maxMsgSize: maxMsgSize, readChan: make(chan []byte), + peekChan: make(chan []byte), depthChan: make(chan int64), writeChan: make(chan []byte), writeResponseChan: make(chan error), @@ -152,6 +157,10 @@ func (d *diskQueue) ReadChan() <-chan []byte { return d.readChan } +func (d *diskQueue) PeekChan() <-chan []byte { + return d.peekChan +} + // Put writes a []byte to the queue func (d *diskQueue) Put(data []byte) error { d.RLock() @@ -358,6 +367,33 @@ func (d *diskQueue) readOne() ([]byte, error) { func (d *diskQueue) writeOne(data []byte) error { var err error + dataLen := int32(len(data)) + totalBytes := int64(4 + dataLen) + + if dataLen < d.minMsgSize || dataLen > d.maxMsgSize { + return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize) + } + + // will not wrap-around if maxBytesPerFile + maxMsgSize < Int64Max + if d.writePos > 0 && d.writePos+totalBytes > d.maxBytesPerFile { + if d.readFileNum == d.writeFileNum { + d.maxBytesPerFileRead = d.writePos + } + + d.writeFileNum++ + d.writePos = 0 + + // sync every time we start writing to a new file + err = d.sync() + if err != nil { + d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err) + } + + if d.writeFile != nil { + d.writeFile.Close() + d.writeFile = nil + } + } if d.writeFile == nil { curFileName := d.fileName(d.writeFileNum) d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600) @@ -377,12 +413,6 @@ func (d *diskQueue) writeOne(data []byte) error { } } - dataLen := int32(len(data)) - - if dataLen < d.minMsgSize || dataLen > d.maxMsgSize { - return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize) - } - d.writeBuf.Reset() err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen) if err != nil { @@ -402,30 +432,9 @@ func (d *diskQueue) writeOne(data []byte) error { return err } - totalBytes := int64(4 + dataLen) d.writePos += totalBytes d.depth += 1 - if d.writePos >= d.maxBytesPerFile { - if d.readFileNum == d.writeFileNum { - d.maxBytesPerFileRead = d.writePos - } - - d.writeFileNum++ - d.writePos = 0 - - // sync every time we start writing to a new file - err = d.sync() - if err != nil { - d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err) - } - - if d.writeFile != nil { - d.writeFile.Close() - d.writeFile = nil - } - } - return err } @@ -473,6 +482,29 @@ func (d *diskQueue) retrieveMetaData() error { d.nextReadFileNum = d.readFileNum d.nextReadPos = d.readPos + // if the metadata was not sync'd at the last shutdown of nsqd + // then the actual file size might actually be larger than the writePos, + // in which case the safest thing to do is skip to the next file for + // writes, and let the reader salvage what it can from the messages in the + // diskqueue beyond the metadata's likely also stale readPos + fileName = d.fileName(d.writeFileNum) + fileInfo, err := os.Stat(fileName) + if err != nil { + return err + } + fileSize := fileInfo.Size() + if d.writePos < fileSize { + d.logf(WARN, + "DISKQUEUE(%s) %s metadata writePos %d < file size of %d, skipping to new file", + d.name, fileName, d.writePos, fileSize) + d.writeFileNum += 1 + d.writePos = 0 + if d.writeFile != nil { + d.writeFile.Close() + d.writeFile = nil + } + } + return nil } @@ -608,6 +640,8 @@ func (d *diskQueue) handleReadError() { // significant state change, schedule a sync on the next iteration d.needSync = true + + d.checkTailCorruption(d.depth) } // ioLoop provides the backend for exposing a go channel (via ReadChan()) @@ -623,6 +657,7 @@ func (d *diskQueue) ioLoop() { var err error var count int64 var r chan []byte + var p chan []byte syncTicker := time.NewTicker(d.syncTimeout) @@ -651,13 +686,16 @@ func (d *diskQueue) ioLoop() { } } r = d.readChan + p = d.peekChan } else { r = nil + p = nil } select { // the Go channel spec dictates that nil channel operations (read or write) // in a select are skipped, we set r to d.readChan only when there is data to read + case p <- dataRead: case r <- dataRead: count++ // moveForward sets needSync flag if a file is removed diff --git a/diskqueue_test.go b/diskqueue_test.go index ba5879c..9d9ae0b 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -108,28 +108,105 @@ func TestDiskQueueRoll(t *testing.T) { panic(err) } defer os.RemoveAll(tmpDir) - msg := bytes.Repeat([]byte{0}, 10) + msg := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0} ml := int64(len(msg)) dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l) defer dq.Close() NotNil(t, dq) Equal(t, int64(0), dq.Depth()) - for i := 0; i < 10; i++ { + for i := 0; i < 11; i++ { err := dq.Put(msg) Nil(t, err) Equal(t, int64(i+1), dq.Depth()) } Equal(t, int64(1), dq.(*diskQueue).writeFileNum) - Equal(t, int64(0), dq.(*diskQueue).writePos) + Equal(t, int64(ml+4), dq.(*diskQueue).writePos) - for i := 10; i > 0; i-- { + for i := 11; i > 0; i-- { Equal(t, msg, <-dq.ReadChan()) Equal(t, int64(i-1), dq.Depth()) } } +func TestDiskQueuePeek(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_peek" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + msg := bytes.Repeat([]byte{0}, 10) + ml := int64(len(msg)) + dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l) + defer dq.Close() + NotNil(t, dq) + Equal(t, int64(0), dq.Depth()) + + t.Run("roll", func(t *testing.T) { + for i := 0; i < 10; i++ { + err := dq.Put(msg) + Nil(t, err) + Equal(t, int64(i+1), dq.Depth()) + } + + for i := 10; i > 0; i-- { + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(i-1), dq.Depth()) + } + + Nil(t, dq.Empty()) + }) + + t.Run("peek-read", func(t *testing.T) { + for i := 0; i < 10; i++ { + err := dq.Put(msg) + Nil(t, err) + Equal(t, int64(i+1), dq.Depth()) + } + + for i := 10; i > 0; i-- { + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(i-1), dq.Depth()) + } + + Nil(t, dq.Empty()) + }) + + t.Run("read-peek", func(t *testing.T) { + for i := 0; i < 10; i++ { + err := dq.Put(msg) + Nil(t, err) + Equal(t, int64(i+1), dq.Depth()) + } + + for i := 10; i > 1; i-- { + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i), dq.Depth()) + + Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(i-1), dq.Depth()) + + Equal(t, msg, <-dq.PeekChan()) + Equal(t, int64(i-1), dq.Depth()) + } + + Nil(t, dq.Empty()) + }) + +} + func assertFileNotExist(t *testing.T, fn string) { f, err := os.OpenFile(fn, os.O_RDONLY, 0600) Equal(t, (*os.File)(nil), f) @@ -216,7 +293,11 @@ func TestDiskQueueCorruption(t *testing.T) { dq := New(dqName, tmpDir, 1000, 10, 1<<10, 5, 2*time.Second, l) defer dq.Close() - msg := make([]byte, 123) // 127 bytes per message, 8 (1016 bytes) messages per file + msg := make([]byte, 120) // 124 bytes per message, 8 messages (992 bytes) per file + msg[0] = 91 + msg[62] = 4 + msg[119] = 211 + for i := 0; i < 25; i++ { dq.Put(msg) } @@ -225,7 +306,7 @@ func TestDiskQueueCorruption(t *testing.T) { // corrupt the 2nd file dqFn := dq.(*diskQueue).fileName(1) - os.Truncate(dqFn, 500) // 3 valid messages, 5 corrupted + os.Truncate(dqFn, 400) // 3 valid messages, 5 corrupted for i := 0; i < 19; i++ { // 1 message leftover in 4th file Equal(t, msg, <-dq.ReadChan()) @@ -247,6 +328,23 @@ func TestDiskQueueCorruption(t *testing.T) { dq.Put(msg) Equal(t, msg, <-dq.ReadChan()) + + dq.Put(msg) + dq.Put(msg) + // corrupt the last file + dqFn = dq.(*diskQueue).fileName(5) + os.Truncate(dqFn, 100) + + Equal(t, int64(2), dq.Depth()) + + // return one message and try reading again from corrupted file + <-dq.ReadChan() + + // give diskqueue time to handle read error + time.Sleep(50 * time.Millisecond) + + // the last log file is now considered corrupted leaving no more log messages + Equal(t, int64(0), dq.Depth()) } type md struct { @@ -434,14 +532,14 @@ func TestDiskQueueResize(t *testing.T) { NotNil(t, dq) Equal(t, int64(0), dq.Depth()) - for i := 0; i < 8; i++ { + for i := 0; i < 9; i++ { msg[0] = byte(i) err := dq.Put(msg) Nil(t, err) } Equal(t, int64(1), dq.(*diskQueue).writeFileNum) - Equal(t, int64(0), dq.(*diskQueue).writePos) - Equal(t, int64(8), dq.Depth()) + Equal(t, int64(ml+4), dq.(*diskQueue).writePos) + Equal(t, int64(9), dq.Depth()) dq.Close() dq = New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, time.Second, l) @@ -452,10 +550,10 @@ func TestDiskQueueResize(t *testing.T) { Nil(t, err) } Equal(t, int64(2), dq.(*diskQueue).writeFileNum) - Equal(t, int64(0), dq.(*diskQueue).writePos) - Equal(t, int64(18), dq.Depth()) + Equal(t, int64(ml+4), dq.(*diskQueue).writePos) + Equal(t, int64(19), dq.Depth()) - for i := 0; i < 8; i++ { + for i := 0; i < 9; i++ { msg[0] = byte(i) Equal(t, msg, <-dq.ReadChan()) }