Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,23 @@ func (d *diskQueue) readOne() ([]byte, error) {
d.reader = bufio.NewReader(d.readFile)
}

// check if we should rotate to the next file before attempting to read
// only rotate if we're reading a completed file (not the current write file)
if d.readFileNum < d.writeFileNum && d.readPos >= d.maxBytesPerFileRead {
if d.readFile != nil {
d.readFile.Close()
d.readFile = nil
}

d.readFileNum++
d.readPos = 0
d.nextReadFileNum = d.readFileNum
d.nextReadPos = 0

// recursively call readOne to open and read from the next file
return d.readOne()
}

err = binary.Read(d.reader, binary.BigEndian, &msgSize)
if err != nil {
d.readFile.Close()
Expand Down
72 changes: 72 additions & 0 deletions diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,3 +847,75 @@ func TestWriteRollReadEOF(t *testing.T) {
return err
})
}

// TestLargeMessageBoundary verifies that file rotation works correctly when messages are large relative to maxBytesPerFile,
// ensuring no EOF errors occur at boundaries and all messages can be read back without corruption.
func TestLargeMessageBoundary(t *testing.T) {
l := NewTestLogger(t)
tmpDir, _ := ioutil.TempDir("", "nsq-test")
defer os.RemoveAll(tmpDir)

// Use smaller sizes to test the same behavior more efficiently
// 5KB file limit, 4KB max message (same 10:8 ratio as 50MB:40MB in production)
maxBytesPerFile := int64(5 * 1024)
maxMsgSize := int32(4 * 1024)

dq := New("test_large_msg", tmpDir, maxBytesPerFile, 1, maxMsgSize, 1000, 2*time.Second, l)
defer dq.Close()

// Create messages that will cause multiple rotations
largeMsg := make([]byte, 4000) // ~4KB message
for i := 0; i < 15; i++ { // ~60KB total, should rotate cleanly across multiple files
err := dq.Put(largeMsg)
Nil(t, err)
}

// Read all messages back
for i := 0; i < 15; i++ {
msg := <-dq.ReadChan()
Equal(t, len(largeMsg), len(msg))
}

// Verify no .bad files were created
filepath.Walk(tmpDir, func(path string, info fs.FileInfo, err error) error {
if strings.HasSuffix(path, ".bad") {
t.Fatalf("Found corrupted file: %s", path)
}
return err
})
}

// TestReadCurrentWriteFile verifies that when reading the current write file,
// the reader doesn't try to rotate past the write file when reaching maxBytesPerFileRead
func TestReadCurrentWriteFile(t *testing.T) {
l := NewTestLogger(t)
tmpDir, _ := ioutil.TempDir("", "nsq-test")
defer os.RemoveAll(tmpDir)

// Small file limit to trigger boundary easily
maxBytesPerFile := int64(1024)
dq := New("test_current_file", tmpDir, maxBytesPerFile, 4, 1024, 1000, 2*time.Second, l)
defer dq.Close()

// Write messages up to the file limit
msg := []byte("test message")
for i := 0; i < 60; i++ { // Enough to fill first file and start second
err := dq.Put(msg)
Nil(t, err)
}

// Read all messages back - this tests reading from current write file
// without trying to advance past it
for i := 0; i < 60; i++ {
readMsg := <-dq.ReadChan()
Equal(t, msg, readMsg)
}

// Verify no .bad files were created
filepath.Walk(tmpDir, func(path string, info fs.FileInfo, err error) error {
if strings.HasSuffix(path, ".bad") {
t.Fatalf("Found corrupted file: %s", path)
}
return err
})
}