diff --git a/diskqueue.go b/diskqueue.go index 2b54bd4..98d2f6c 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -316,6 +316,23 @@ func (d *diskQueue) readOne() ([]byte, error) { d.reader = bufio.NewReader(d.readFile) } + // check if we should rotate to the next file before attempting to read + // only rotate if we're reading a completed file (not the current write file) + if d.readFileNum < d.writeFileNum && d.readPos >= d.maxBytesPerFileRead { + if d.readFile != nil { + d.readFile.Close() + d.readFile = nil + } + + d.readFileNum++ + d.readPos = 0 + d.nextReadFileNum = d.readFileNum + d.nextReadPos = 0 + + // recursively call readOne to open and read from the next file + return d.readOne() + } + err = binary.Read(d.reader, binary.BigEndian, &msgSize) if err != nil { d.readFile.Close() diff --git a/diskqueue_test.go b/diskqueue_test.go index c5374c3..3d6d1b0 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -847,3 +847,75 @@ func TestWriteRollReadEOF(t *testing.T) { return err }) } + +// TestLargeMessageBoundary verifies that file rotation works correctly when messages are large relative to maxBytesPerFile, +// ensuring no EOF errors occur at boundaries and all messages can be read back without corruption. +func TestLargeMessageBoundary(t *testing.T) { + l := NewTestLogger(t) + tmpDir, _ := ioutil.TempDir("", "nsq-test") + defer os.RemoveAll(tmpDir) + + // Use smaller sizes to test the same behavior more efficiently + // 5KB file limit, 4KB max message (same 10:8 ratio as 50MB:40MB in production) + maxBytesPerFile := int64(5 * 1024) + maxMsgSize := int32(4 * 1024) + + dq := New("test_large_msg", tmpDir, maxBytesPerFile, 1, maxMsgSize, 1000, 2*time.Second, l) + defer dq.Close() + + // Create messages that will cause multiple rotations + largeMsg := make([]byte, 4000) // ~4KB message + for i := 0; i < 15; i++ { // ~60KB total, should rotate cleanly across multiple files + err := dq.Put(largeMsg) + Nil(t, err) + } + + // Read all messages back + for i := 0; i < 15; i++ { + msg := <-dq.ReadChan() + Equal(t, len(largeMsg), len(msg)) + } + + // Verify no .bad files were created + filepath.Walk(tmpDir, func(path string, info fs.FileInfo, err error) error { + if strings.HasSuffix(path, ".bad") { + t.Fatalf("Found corrupted file: %s", path) + } + return err + }) +} + +// TestReadCurrentWriteFile verifies that when reading the current write file, +// the reader doesn't try to rotate past the write file when reaching maxBytesPerFileRead +func TestReadCurrentWriteFile(t *testing.T) { + l := NewTestLogger(t) + tmpDir, _ := ioutil.TempDir("", "nsq-test") + defer os.RemoveAll(tmpDir) + + // Small file limit to trigger boundary easily + maxBytesPerFile := int64(1024) + dq := New("test_current_file", tmpDir, maxBytesPerFile, 4, 1024, 1000, 2*time.Second, l) + defer dq.Close() + + // Write messages up to the file limit + msg := []byte("test message") + for i := 0; i < 60; i++ { // Enough to fill first file and start second + err := dq.Put(msg) + Nil(t, err) + } + + // Read all messages back - this tests reading from current write file + // without trying to advance past it + for i := 0; i < 60; i++ { + readMsg := <-dq.ReadChan() + Equal(t, msg, readMsg) + } + + // Verify no .bad files were created + filepath.Walk(tmpDir, func(path string, info fs.FileInfo, err error) error { + if strings.HasSuffix(path, ".bad") { + t.Fatalf("Found corrupted file: %s", path) + } + return err + }) +}