diff --git a/diskqueue.go b/diskqueue.go index 29d15ac..a9be6b7 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -70,6 +70,7 @@ type diskQueue struct { name string dataPath string maxBytesPerFile int64 // currently this cannot change once created + curReadFileSize int64 // currently read file total filesize minMsgSize int32 maxMsgSize int32 syncEvery int64 // number of writes per fsync @@ -293,6 +294,19 @@ func (d *diskQueue) readOne() ([]byte, error) { } } + if d.readFileNum < d.writeFileNum { + stat, err := d.readFile.Stat() + if err != nil { + // avoid get fileInfo error, use maxBytesPerFile do backup + d.curReadFileSize = d.maxBytesPerFile + } else { + d.curReadFileSize = stat.Size() + } + } else { + // when readFileNum equal writeFileNum before write file sync and close, use maxBytesPerFile instead of exact filesize + d.curReadFileSize = d.maxBytesPerFile + } + d.reader = bufio.NewReader(d.readFile) } @@ -326,10 +340,10 @@ func (d *diskQueue) readOne() ([]byte, error) { d.nextReadPos = d.readPos + totalBytes d.nextReadFileNum = d.readFileNum - // TODO: each data file should embed the maxBytesPerFile - // as the first 8 bytes (at creation time) ensuring that + // use the read file exact size, ensuring that // the value can change without affecting runtime - if d.nextReadPos > d.maxBytesPerFile { + // only readFileNum less than writeFileNum need move next file + if d.readFileNum < d.writeFileNum && d.nextReadPos >= d.curReadFileSize { if d.readFile != nil { d.readFile.Close() d.readFile = nil @@ -396,6 +410,11 @@ func (d *diskQueue) writeOne(data []byte) error { d.depth += 1 if d.writePos >= d.maxBytesPerFile { + // when readFileNum equal writeFileNum, before sync and close writefile should update curReadFileSize + if d.readFileNum == d.writeFileNum { + d.curReadFileSize = d.writePos + } + d.writeFileNum++ d.writePos = 0