Skip to content

maxBytesPerFile read check off-by-one #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type diskQueue struct {
name string
dataPath string
maxBytesPerFile int64 // currently this cannot change once created
curReadFileSize int64 // currently read file total filesize
minMsgSize int32
maxMsgSize int32
syncEvery int64 // number of writes per fsync
Expand Down Expand Up @@ -293,6 +294,19 @@ func (d *diskQueue) readOne() ([]byte, error) {
}
}

if d.readFileNum < d.writeFileNum {
stat, err := d.readFile.Stat()
if err != nil {
// avoid get fileInfo error, use maxBytesPerFile do backup
d.curReadFileSize = d.maxBytesPerFile
} else {
d.curReadFileSize = stat.Size()
}
} else {
// when readFileNum equal writeFileNum before write file sync and close, use maxBytesPerFile instead of exact filesize
d.curReadFileSize = d.maxBytesPerFile
}

d.reader = bufio.NewReader(d.readFile)
}

Expand Down Expand Up @@ -326,10 +340,10 @@ func (d *diskQueue) readOne() ([]byte, error) {
d.nextReadPos = d.readPos + totalBytes
d.nextReadFileNum = d.readFileNum

// TODO: each data file should embed the maxBytesPerFile
// as the first 8 bytes (at creation time) ensuring that
// use the read file exact size, ensuring that
// the value can change without affecting runtime
if d.nextReadPos > d.maxBytesPerFile {
// only readFileNum less than writeFileNum need move next file
if d.readFileNum < d.writeFileNum && d.nextReadPos >= d.curReadFileSize {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if d.readFileNum < d.writeFileNum needed to be added here, I'll think on it a bit...

Adding a test or two would help :) In particular, one that fails with current master, and passes with this branch. Note that Travis-CI currently fails to post "commit status" for the "legacy travis-ci.org integration". I've updated a couple of other repos to use GitHub Actions instead, I'll get around to this one eventually. In the meantime, you can view test results at https://travis-ci.org/github/nsqio/go-diskqueue/pull_requests or run them locally.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when d.readFileNum == d.writeFileNum means reader and writer is same file, so when d.nextReadPos >= d.curReadFileSize the readFile can't be closed, the program should be waiting writeFile update d.curReadFileSize

And I think just pass now unit test will be ok.

if d.readFile != nil {
d.readFile.Close()
d.readFile = nil
Expand Down Expand Up @@ -396,6 +410,11 @@ func (d *diskQueue) writeOne(data []byte) error {
d.depth += 1

if d.writePos >= d.maxBytesPerFile {
// when readFileNum equal writeFileNum, before sync and close writefile should update curReadFileSize
if d.readFileNum == d.writeFileNum {
d.curReadFileSize = d.writePos
}

d.writeFileNum++
d.writePos = 0

Expand Down