Skip to content

Commit e816851

Browse files
author
zhangyi
committed
Fix unexpected EOF error (nsqio#43)
1 parent 02dd623 commit e816851

File tree

2 files changed

+113
-0
lines changed

2 files changed

+113
-0
lines changed

diskqueue.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,40 @@ func (d *diskQueue) readOne() ([]byte, error) {
276276
var err error
277277
var msgSize int32
278278

279+
// Fix: since the d.maxBytesPerFileRead may be changed during calling d.writeOne(),
280+
// we must check the current position before next reading to avoid an unexpected EOF.
281+
if d.readFileNum < d.writeFileNum {
282+
if d.maxBytesPerFileRead <= 0 {
283+
d.maxBytesPerFileRead = d.maxBytesPerFile
284+
readFile := d.fileName(d.readFileNum)
285+
stat, err := os.Stat(readFile)
286+
if err != nil {
287+
d.logf(ERROR, "DISKQUEUE(%s) unable to stat(%s) - %s", d.name, readFile, err)
288+
} else {
289+
d.maxBytesPerFileRead = stat.Size()
290+
}
291+
}
292+
293+
if d.readPos >= d.maxBytesPerFileRead {
294+
if d.readFile != nil {
295+
if err := d.readFile.Close(); err != nil {
296+
d.logf(ERROR, "DISKQUEUE(%s) failed to close(%s) - %s", d.name, d.readFile.Name(), err)
297+
}
298+
if err := os.Remove(d.readFile.Name()); err != nil {
299+
d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, d.readFile.Name(), err)
300+
}
301+
d.readFile = nil
302+
}
303+
304+
// sync every time we start reading from a new file
305+
if err = d.sync(); err != nil {
306+
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
307+
}
308+
d.readFileNum++
309+
d.readPos = 0
310+
}
311+
}
312+
279313
if d.readFile == nil {
280314
curFileName := d.fileName(d.readFileNum)
281315
d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)

diskqueue_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package diskqueue
33
import (
44
"bufio"
55
"bytes"
6+
"crypto/rand"
67
"fmt"
78
"io/ioutil"
9+
"log"
810
"os"
911
"path"
1012
"path/filepath"
@@ -696,3 +698,80 @@ func benchmarkDiskQueueGet(size int64, b *testing.B) {
696698
<-dq.ReadChan()
697699
}
698700
}
701+
702+
func TestDiskQueue_ReadChan(t *testing.T) {
703+
dataDir := "./testdata/dat"
704+
err := os.MkdirAll(dataDir, 0o755)
705+
if err != nil {
706+
t.Fatal(err)
707+
}
708+
defer os.RemoveAll(dataDir)
709+
710+
var (
711+
megabyte int64 = 1 << 20
712+
datCount = 1112
713+
)
714+
715+
dq := New("nsqio_diskqueue", dataDir, 128*megabyte, 0, int32(16*megabyte),
716+
32*megabyte, time.Second*5, func(lvl LogLevel, f string, args ...interface{}) {
717+
if lvl >= WARN {
718+
t.Errorf(f, args)
719+
return
720+
}
721+
log.Println(lvl, fmt.Sprintf(f, args...))
722+
})
723+
724+
buf := make([]byte, 3231197)
725+
n, err := rand.Read(buf)
726+
if err != nil {
727+
t.Fatal(err)
728+
}
729+
if n != len(buf) {
730+
t.Fatal("buf is not full")
731+
}
732+
733+
pushExit := make(chan struct{})
734+
go func() {
735+
for i := 0; i < datCount; i++ {
736+
if err := dq.Put(buf); err != nil {
737+
t.Error(err)
738+
return
739+
}
740+
}
741+
close(pushExit)
742+
}()
743+
744+
var wg sync.WaitGroup
745+
wg.Add(5)
746+
747+
var counter atomic.Int64
748+
749+
for i := 0; i < 5; i++ {
750+
go func() {
751+
defer wg.Done()
752+
for {
753+
select {
754+
case data := <-dq.ReadChan():
755+
if bytes.Compare(buf, data) != 0 {
756+
t.Error("get corrupt msg")
757+
return
758+
}
759+
counter.Add(1)
760+
case <-pushExit:
761+
if dq.Depth() == 0 {
762+
return
763+
}
764+
}
765+
}
766+
}()
767+
}
768+
769+
wg.Wait()
770+
771+
if counter.Load() != int64(datCount) {
772+
t.Fatal("push message count not equals get message count")
773+
}
774+
if err := dq.Close(); err != nil {
775+
t.Fatal(err)
776+
}
777+
}

0 commit comments

Comments
 (0)