Skip to content

Commit 401c27f

Browse files
authored
fix: read EOF error fix (#2)
* test: added a test for EOF error when reading * fix: fixed an issue where the reader would encounter an EOF if he was one message behind the writer and writer would roll
1 parent c324276 commit 401c27f

File tree

2 files changed

+74
-13
lines changed

2 files changed

+74
-13
lines changed

diskqueue.go

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -279,12 +279,44 @@ func (d *diskQueue) skipToNextRWFile() error {
279279
return err
280280
}
281281

282+
func (d *diskQueue) readRollFile() {
283+
d.nextReadFileNum++
284+
d.nextReadPos = 0
285+
286+
oldReadFileNum := d.readFileNum
287+
d.readFileNum = d.nextReadFileNum
288+
d.readPos = d.nextReadPos
289+
290+
// sync every time we start reading from a new file
291+
d.needSync = true
292+
293+
fn := d.fileName(oldReadFileNum)
294+
err := os.Remove(fn)
295+
if err != nil {
296+
d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, fn, err)
297+
}
298+
299+
d.checkTailCorruption(d.depth)
300+
}
301+
282302
// readOne performs a low level filesystem read for a single []byte
283303
// while advancing read positions and rolling files, if necessary
284304
func (d *diskQueue) readOne() ([]byte, error) {
285305
var err error
286306
var msgSize int32
287307

308+
// we only consider rotating if we're reading a "complete" file
309+
// and since we cannot know the size at which it was rotated, we
310+
// rely on maxBytesPerFileRead rather than maxBytesPerFile
311+
if d.readFileNum <= d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead && d.readFile != nil {
312+
if d.readFile != nil {
313+
d.readFile.Close()
314+
d.readFile = nil
315+
}
316+
317+
d.readRollFile()
318+
}
319+
288320
if d.readFile == nil {
289321
curFileName := d.fileName(d.readFileNum)
290322
d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)
@@ -346,19 +378,6 @@ func (d *diskQueue) readOne() ([]byte, error) {
346378
d.nextReadPos = d.readPos + totalBytes
347379
d.nextReadFileNum = d.readFileNum
348380

349-
// we only consider rotating if we're reading a "complete" file
350-
// and since we cannot know the size at which it was rotated, we
351-
// rely on maxBytesPerFileRead rather than maxBytesPerFile
352-
if d.readFileNum <= d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead {
353-
if d.readFile != nil {
354-
d.readFile.Close()
355-
d.readFile = nil
356-
}
357-
358-
d.nextReadFileNum++
359-
d.nextReadPos = 0
360-
}
361-
362381
return readBuf, nil
363382
}
364383

diskqueue_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -817,3 +817,45 @@ func TestDiskQueueRollAsync(t *testing.T) {
817817
return err
818818
})
819819
}
820+
821+
func NewTestLoggerFatalAtError(tbl tbLog, fatalChan chan error) AppLogFunc {
822+
return func(lvl LogLevel, f string, args ...interface{}) {
823+
if lvl == ERROR || lvl == FATAL {
824+
// tbl.Fatal(fmt.Sprintf(lvl.String()+": "+f, args...))
825+
fatalChan <- fmt.Errorf(lvl.String()+": "+f, args...)
826+
}
827+
828+
tbl.Log(fmt.Sprintf(lvl.String()+": "+f, args...))
829+
}
830+
}
831+
832+
func TestWriteRollReadEOF(t *testing.T) {
833+
fatalChan := make(chan error, 1)
834+
l := NewTestLoggerFatalAtError(t, fatalChan)
835+
836+
dqName := "test_disk_queue" + strconv.Itoa(int(time.Now().Unix()))
837+
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
838+
if err != nil {
839+
panic(err)
840+
}
841+
defer os.RemoveAll(tmpDir)
842+
dq := New(dqName, tmpDir, 1024, 4, 1<<10, 2500, 2*time.Second, l)
843+
defer dq.Close()
844+
NotNil(t, dq)
845+
Equal(t, int64(0), dq.Depth())
846+
847+
for i := 0; i < 205; i++ { // 204 messages fit, but message 205 will be too big
848+
msg := []byte(fmt.Sprintf("%05d", i)) // 5 bytes
849+
err = dq.Put(msg)
850+
851+
msgOut := <-dq.ReadChan()
852+
Equal(t, msg, msgOut)
853+
}
854+
855+
select {
856+
case err = <-fatalChan:
857+
t.Fatal(err)
858+
default:
859+
}
860+
861+
}

0 commit comments

Comments
 (0)