diff --git a/diskqueue_test.go b/diskqueue_test.go index 9d9ae0b..5ecbb82 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "fmt" + "io/fs" "io/ioutil" "os" "path" @@ -11,6 +12,7 @@ import ( "reflect" "runtime" "strconv" + "strings" "sync" "sync/atomic" "testing" @@ -130,6 +132,42 @@ func TestDiskQueueRoll(t *testing.T) { } } +func TestDiskQueueRollAsync(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_roll" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + msg := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0} + ml := int64(len(msg)) + dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l) + defer dq.Close() + NotNil(t, dq) + Equal(t, int64(0), dq.Depth()) + + for i := 0; i < 11; i++ { + err := dq.Put(msg) + Nil(t, err) + Equal(t, int64(1), dq.Depth()) + + Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(0), dq.Depth()) + } + + Equal(t, int64(1), dq.(*diskQueue).writeFileNum) + Equal(t, int64(ml+4), dq.(*diskQueue).writePos) + + filepath.Walk(tmpDir, func(path string, info fs.FileInfo, err error) error { + if strings.HasSuffix(path, ".bad") { + t.FailNow() + } + + return err + }) +} + func TestDiskQueuePeek(t *testing.T) { l := NewTestLogger(t) dqName := "test_disk_queue_peek" + strconv.Itoa(int(time.Now().Unix()))