diff --git a/diskqueue.go b/diskqueue.go index f01ca71..2b54bd4 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -349,7 +349,7 @@ func (d *diskQueue) readOne() ([]byte, error) { // we only consider rotating if we're reading a "complete" file // and since we cannot know the size at which it was rotated, we // rely on maxBytesPerFileRead rather than maxBytesPerFile - if d.readFileNum < d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead { + if d.readFileNum <= d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead { if d.readFile != nil { d.readFile.Close() d.readFile = nil @@ -394,6 +394,7 @@ func (d *diskQueue) writeOne(data []byte) error { d.writeFile = nil } } + if d.writeFile == nil { curFileName := d.fileName(d.writeFileNum) d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600) @@ -675,7 +676,7 @@ func (d *diskQueue) ioLoop() { count = 0 } - if (d.readFileNum < d.writeFileNum) || (d.readPos < d.writePos) { + if d.readFileNum < d.writeFileNum || (d.readFileNum == d.writeFileNum && d.readPos < d.writePos) { if d.nextReadPos == d.readPos { dataRead, err = d.readOne() if err != nil { diff --git a/diskqueue_test.go b/diskqueue_test.go index 9d9ae0b..d10387b 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "fmt" + "io/fs" "io/ioutil" "os" "path" @@ -11,6 +12,7 @@ import ( "reflect" "runtime" "strconv" + "strings" "sync" "sync/atomic" "testing" @@ -308,17 +310,22 @@ func TestDiskQueueCorruption(t *testing.T) { dqFn := dq.(*diskQueue).fileName(1) os.Truncate(dqFn, 400) // 3 valid messages, 5 corrupted - for i := 0; i < 19; i++ { // 1 message leftover in 4th file - Equal(t, msg, <-dq.ReadChan()) - } - // corrupt the 4th (current) file dqFn = dq.(*diskQueue).fileName(3) os.Truncate(dqFn, 100) + for i := 0; i < 18; i++ { + Equal(t, msg, <-dq.ReadChan()) + } + Equal(t, int64(7), dq.Depth()) + + Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(0), dq.Depth()) + dq.Put(msg) // in 5th file Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(0), dq.Depth()) // write a corrupt (len 0) message at the 5th (current) file dq.(*diskQueue).writeFile.Write([]byte{0, 0, 0, 0}) @@ -331,6 +338,7 @@ func TestDiskQueueCorruption(t *testing.T) { dq.Put(msg) dq.Put(msg) + // corrupt the last file dqFn = dq.(*diskQueue).fileName(5) os.Truncate(dqFn, 100) @@ -338,7 +346,7 @@ func TestDiskQueueCorruption(t *testing.T) { Equal(t, int64(2), dq.Depth()) // return one message and try reading again from corrupted file - <-dq.ReadChan() + Equal(t, msg, <-dq.ReadChan()) // give diskqueue time to handle read error time.Sleep(50 * time.Millisecond) @@ -773,3 +781,39 @@ func benchmarkDiskQueueGet(size int64, b *testing.B) { <-dq.ReadChan() } } + +func TestDiskQueueRollAsync(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_roll" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + msg := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0} + ml := int64(len(msg)) + dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l) + defer dq.Close() + NotNil(t, dq) + Equal(t, int64(0), dq.Depth()) + + for i := 0; i < 11; i++ { + err := dq.Put(msg) + Nil(t, err) + Equal(t, int64(1), dq.Depth()) + + Equal(t, msg, <-dq.ReadChan()) + Equal(t, int64(0), dq.Depth()) + } + + Equal(t, int64(1), dq.(*diskQueue).writeFileNum) + Equal(t, int64(ml+4), dq.(*diskQueue).writePos) + + filepath.Walk(tmpDir, func(path string, info fs.FileInfo, err error) error { + if strings.HasSuffix(path, ".bad") { + t.FailNow() + } + + return err + }) +}