Skip to content

Commit f044099

Browse files
author
zhangyi
committed
Fix unexpected EOF error (nsqio#43)
1 parent 02dd623 commit f044099

File tree

2 files changed

+118
-0
lines changed

2 files changed

+118
-0
lines changed

diskqueue.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,45 @@ func (d *diskQueue) readOne() ([]byte, error) {
276276
var err error
277277
var msgSize int32
278278

279+
// we only consider rotating if we're reading a "complete" file
280+
// and since we cannot know the size at which it was rotated, we
281+
// rely on maxBytesPerFileRead rather than maxBytesPerFile
282+
// Fix: since the d.maxBytesPerFileRead may be changed during calling d.writeOne(),
283+
// we must check the current position before next reading to avoid an unexpected EOF.
284+
if d.readFileNum < d.writeFileNum {
285+
if d.maxBytesPerFileRead <= 0 {
286+
d.maxBytesPerFileRead = d.maxBytesPerFile
287+
readFile := d.fileName(d.readFileNum)
288+
stat, err := os.Stat(readFile)
289+
if err != nil {
290+
d.logf(ERROR, "DISKQUEUE(%s) unable to stat(%s) - %s", d.name, readFile, err)
291+
} else {
292+
d.maxBytesPerFileRead = stat.Size()
293+
}
294+
}
295+
296+
if d.readPos >= d.maxBytesPerFileRead {
297+
if d.readFile != nil {
298+
if err := d.readFile.Close(); err != nil {
299+
d.logf(ERROR, "DISKQUEUE(%s) failed to close(%s) - %s", d.name, d.readFile.Name(), err)
300+
}
301+
err := os.Remove(d.readFile.Name())
302+
if err != nil {
303+
d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, d.readFile.Name(), err)
304+
}
305+
d.readFile = nil
306+
// sync every time we start reading from a new file
307+
err = d.sync()
308+
if err != nil {
309+
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
310+
}
311+
}
312+
313+
d.readFileNum++
314+
d.readPos = 0
315+
}
316+
}
317+
279318
if d.readFile == nil {
280319
curFileName := d.fileName(d.readFileNum)
281320
d.readFile, err = os.OpenFile(curFileName, os.O_RDONLY, 0600)

diskqueue_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package diskqueue
33
import (
44
"bufio"
55
"bytes"
6+
"crypto/rand"
67
"fmt"
78
"io/ioutil"
9+
"log"
810
"os"
911
"path"
1012
"path/filepath"
@@ -696,3 +698,80 @@ func benchmarkDiskQueueGet(size int64, b *testing.B) {
696698
<-dq.ReadChan()
697699
}
698700
}
701+
702+
func TestDiskQueue_ReadChan(t *testing.T) {
703+
dataDir := "./testdata/dat"
704+
err := os.MkdirAll(dataDir, 0o755)
705+
if err != nil {
706+
t.Fatal(err)
707+
}
708+
defer os.RemoveAll(dataDir)
709+
710+
var (
711+
megabyte int64 = 1 << 20
712+
datCount = 1112
713+
)
714+
715+
dq := New("nsqio_diskqueue", dataDir, 128*megabyte, 0, int32(16*megabyte),
716+
32*megabyte, time.Second*5, func(lvl LogLevel, f string, args ...interface{}) {
717+
if lvl >= WARN {
718+
t.Errorf(f, args)
719+
return
720+
}
721+
log.Println(lvl, fmt.Sprintf(f, args...))
722+
})
723+
724+
buf := make([]byte, 3231197)
725+
n, err := rand.Read(buf)
726+
if err != nil {
727+
t.Fatal(err)
728+
}
729+
if n != len(buf) {
730+
t.Fatal("buf is not full")
731+
}
732+
733+
pushExit := make(chan struct{})
734+
go func() {
735+
for i := 0; i < datCount; i++ {
736+
if err := dq.Put(buf); err != nil {
737+
t.Error(err)
738+
return
739+
}
740+
}
741+
close(pushExit)
742+
}()
743+
744+
var wg sync.WaitGroup
745+
wg.Add(5)
746+
747+
var counter atomic.Int64
748+
749+
for i := 0; i < 5; i++ {
750+
go func() {
751+
defer wg.Done()
752+
for {
753+
select {
754+
case data := <-dq.ReadChan():
755+
if bytes.Compare(buf, data) != 0 {
756+
t.Error("get corrupt msg")
757+
return
758+
}
759+
counter.Add(1)
760+
case <-pushExit:
761+
if dq.Depth() == 0 {
762+
return
763+
}
764+
}
765+
}
766+
}()
767+
}
768+
769+
wg.Wait()
770+
771+
if counter.Load() != int64(datCount) {
772+
t.Fatal("push message count not equals get message count")
773+
}
774+
if err := dq.Close(); err != nil {
775+
t.Fatal(err)
776+
}
777+
}

0 commit comments

Comments
 (0)