@@ -4,13 +4,15 @@ import (
4
4
"bufio"
5
5
"bytes"
6
6
"fmt"
7
+ "io/fs"
7
8
"io/ioutil"
8
9
"os"
9
10
"path"
10
11
"path/filepath"
11
12
"reflect"
12
13
"runtime"
13
14
"strconv"
15
+ "strings"
14
16
"sync"
15
17
"sync/atomic"
16
18
"testing"
@@ -773,3 +775,39 @@ func benchmarkDiskQueueGet(size int64, b *testing.B) {
773
775
<- dq .ReadChan ()
774
776
}
775
777
}
778
+
779
+ func TestDiskQueueRollAsync (t * testing.T ) {
780
+ l := NewTestLogger (t )
781
+ dqName := "test_disk_queue_roll" + strconv .Itoa (int (time .Now ().Unix ()))
782
+ tmpDir , err := ioutil .TempDir ("" , fmt .Sprintf ("nsq-test-%d" , time .Now ().UnixNano ()))
783
+ if err != nil {
784
+ panic (err )
785
+ }
786
+ defer os .RemoveAll (tmpDir )
787
+ msg := []byte {1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 0 }
788
+ ml := int64 (len (msg ))
789
+ dq := New (dqName , tmpDir , 10 * (ml + 4 ), int32 (ml ), 1 << 10 , 2500 , 2 * time .Second , l )
790
+ defer dq .Close ()
791
+ NotNil (t , dq )
792
+ Equal (t , int64 (0 ), dq .Depth ())
793
+
794
+ for i := 0 ; i < 11 ; i ++ {
795
+ err := dq .Put (msg )
796
+ Nil (t , err )
797
+ Equal (t , int64 (1 ), dq .Depth ())
798
+
799
+ Equal (t , msg , <- dq .ReadChan ())
800
+ Equal (t , int64 (0 ), dq .Depth ())
801
+ }
802
+
803
+ Equal (t , int64 (1 ), dq .(* diskQueue ).writeFileNum )
804
+ Equal (t , int64 (ml + 4 ), dq .(* diskQueue ).writePos )
805
+
806
+ filepath .Walk (tmpDir , func (path string , info fs.FileInfo , err error ) error {
807
+ if strings .HasSuffix (path , ".bad" ) {
808
+ t .FailNow ()
809
+ }
810
+
811
+ return err
812
+ })
813
+ }
0 commit comments