Skip to content

Sync fork #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: copy-master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: tests

on:
push: {branches: [master]}
pull_request: {branches: [master]}

jobs:
test:
runs-on: ubuntu-20.04
timeout-minutes: 10
strategy:
fail-fast: false
matrix:
gover:
- "1.13"
- "1.14"
- "1.15"
- "1.16"
- "1.17"
goarch:
- "amd64"
- "386"

container: "golang:${{matrix.gover}}"
env:
GOARCH: "${{matrix.goarch}}"

steps:
- uses: actions/checkout@v2

- name: test
run: |
go test -list Test
GOMAXPROCS=1 go test
if [ "$GOARCH" = "amd64" ]; then
echo "With -race:"
GOMAXPROCS=4 go test -race
fi

- name: lint
run: |
go vet
gofmt -w *.go && git diff --exit-code
8 changes: 0 additions & 8 deletions .travis.yml

This file was deleted.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# go-diskqueue

[![Build Status](https://secure.travis-ci.org/nsqio/go-diskqueue.png?branch=master)](http://travis-ci.org/nsqio/go-diskqueue) [![GoDoc](https://godoc.org/github.com/nsqio/go-diskqueue?status.svg)](https://godoc.org/github.com/nsqio/go-diskqueue) [![GitHub release](https://img.shields.io/github/release/nsqio/go-diskqueue.svg)](https://github.com/nsqio/go-diskqueue/releases/latest)
[![Build Status](https://github.com/nsqio/go-diskqueue/workflows/tests/badge.svg)](https://github.com/nsqio/go-diskqueue/actions) [![Go Reference](https://pkg.go.dev/badge/github.com/nsqio/go-diskqueue.svg)](https://pkg.go.dev/github.com/nsqio/go-diskqueue) [![GitHub release](https://img.shields.io/github/release/nsqio/go-diskqueue.svg)](https://github.com/nsqio/go-diskqueue/releases/latest)

A Go package providing a filesystem-backed FIFO queue

Expand Down
92 changes: 65 additions & 27 deletions diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (l LogLevel) String() string {
type Interface interface {
Put([]byte) error
ReadChan() <-chan []byte // this is expected to be an *unbuffered* channel
PeekChan() <-chan []byte // this is expected to be an *unbuffered* channel
Close() error
Delete() error
Depth() int64
Expand Down Expand Up @@ -91,6 +92,9 @@ type diskQueue struct {
// exposed via ReadChan()
readChan chan []byte

// exposed via PeekChan()
peekChan chan []byte

// internal channels
depthChan chan int64
writeChan chan []byte
Expand All @@ -115,6 +119,7 @@ func New(name string, dataPath string, maxBytesPerFile int64,
minMsgSize: minMsgSize,
maxMsgSize: maxMsgSize,
readChan: make(chan []byte),
peekChan: make(chan []byte),
depthChan: make(chan int64),
writeChan: make(chan []byte),
writeResponseChan: make(chan error),
Expand Down Expand Up @@ -152,6 +157,10 @@ func (d *diskQueue) ReadChan() <-chan []byte {
return d.readChan
}

func (d *diskQueue) PeekChan() <-chan []byte {
return d.peekChan
}

// Put writes a []byte to the queue
func (d *diskQueue) Put(data []byte) error {
d.RLock()
Expand Down Expand Up @@ -358,6 +367,33 @@ func (d *diskQueue) readOne() ([]byte, error) {
func (d *diskQueue) writeOne(data []byte) error {
var err error

dataLen := int32(len(data))
totalBytes := int64(4 + dataLen)

if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize)
}

// will not wrap-around if maxBytesPerFile + maxMsgSize < Int64Max
if d.writePos > 0 && d.writePos+totalBytes > d.maxBytesPerFile {
if d.readFileNum == d.writeFileNum {
d.maxBytesPerFileRead = d.writePos
}

d.writeFileNum++
d.writePos = 0

// sync every time we start writing to a new file
err = d.sync()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
}

if d.writeFile != nil {
d.writeFile.Close()
d.writeFile = nil
}
}
if d.writeFile == nil {
curFileName := d.fileName(d.writeFileNum)
d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
Expand All @@ -377,12 +413,6 @@ func (d *diskQueue) writeOne(data []byte) error {
}
}

dataLen := int32(len(data))

if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize)
}

d.writeBuf.Reset()
err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
if err != nil {
Expand All @@ -402,30 +432,9 @@ func (d *diskQueue) writeOne(data []byte) error {
return err
}

totalBytes := int64(4 + dataLen)
d.writePos += totalBytes
d.depth += 1

if d.writePos >= d.maxBytesPerFile {
if d.readFileNum == d.writeFileNum {
d.maxBytesPerFileRead = d.writePos
}

d.writeFileNum++
d.writePos = 0

// sync every time we start writing to a new file
err = d.sync()
if err != nil {
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
}

if d.writeFile != nil {
d.writeFile.Close()
d.writeFile = nil
}
}

return err
}

Expand Down Expand Up @@ -473,6 +482,29 @@ func (d *diskQueue) retrieveMetaData() error {
d.nextReadFileNum = d.readFileNum
d.nextReadPos = d.readPos

// if the metadata was not sync'd at the last shutdown of nsqd
// then the actual file size might actually be larger than the writePos,
// in which case the safest thing to do is skip to the next file for
// writes, and let the reader salvage what it can from the messages in the
// diskqueue beyond the metadata's likely also stale readPos
fileName = d.fileName(d.writeFileNum)
fileInfo, err := os.Stat(fileName)
if err != nil {
return err
}
fileSize := fileInfo.Size()
if d.writePos < fileSize {
d.logf(WARN,
"DISKQUEUE(%s) %s metadata writePos %d < file size of %d, skipping to new file",
d.name, fileName, d.writePos, fileSize)
d.writeFileNum += 1
d.writePos = 0
if d.writeFile != nil {
d.writeFile.Close()
d.writeFile = nil
}
}

return nil
}

Expand Down Expand Up @@ -608,6 +640,8 @@ func (d *diskQueue) handleReadError() {

// significant state change, schedule a sync on the next iteration
d.needSync = true

d.checkTailCorruption(d.depth)
}

// ioLoop provides the backend for exposing a go channel (via ReadChan())
Expand All @@ -623,6 +657,7 @@ func (d *diskQueue) ioLoop() {
var err error
var count int64
var r chan []byte
var p chan []byte

syncTicker := time.NewTicker(d.syncTimeout)

Expand Down Expand Up @@ -651,13 +686,16 @@ func (d *diskQueue) ioLoop() {
}
}
r = d.readChan
p = d.peekChan
} else {
r = nil
p = nil
}

select {
// the Go channel spec dictates that nil channel operations (read or write)
// in a select are skipped, we set r to d.readChan only when there is data to read
case p <- dataRead:
case r <- dataRead:
count++
// moveForward sets needSync flag if a file is removed
Expand Down
Loading