Skip to content

proper handling of maxBytesPerFile for reads #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,16 @@ type diskQueue struct {
sync.RWMutex

// instantiation time metadata
name string
dataPath string
maxBytesPerFile int64 // currently this cannot change once created
minMsgSize int32
maxMsgSize int32
syncEvery int64 // number of writes per fsync
syncTimeout time.Duration // duration of time per fsync
exitFlag int32
needSync bool
name string
dataPath string
maxBytesPerFile int64 // cannot change once created
maxBytesPerFileRead int64
minMsgSize int32
maxMsgSize int32
syncEvery int64 // number of writes per fsync
syncTimeout time.Duration // duration of time per fsync
exitFlag int32
needSync bool

// keeps track of the position where we have read
// (but not yet sent over readChan)
Expand Down Expand Up @@ -293,6 +294,16 @@ func (d *diskQueue) readOne() ([]byte, error) {
}
}

// for "complete" files (i.e. not the "current" file), maxBytesPerFileRead
// should be initialized to the file's size, or default to maxBytesPerFile
d.maxBytesPerFileRead = d.maxBytesPerFile
if d.readFileNum < d.writeFileNum {
stat, err := d.readFile.Stat()
if err == nil {
d.maxBytesPerFileRead = stat.Size()
}
}

d.reader = bufio.NewReader(d.readFile)
}

Expand Down Expand Up @@ -326,10 +337,10 @@ func (d *diskQueue) readOne() ([]byte, error) {
d.nextReadPos = d.readPos + totalBytes
d.nextReadFileNum = d.readFileNum

// TODO: each data file should embed the maxBytesPerFile
// as the first 8 bytes (at creation time) ensuring that
// the value can change without affecting runtime
if d.nextReadPos > d.maxBytesPerFile {
// we only consider rotating if we're reading a "complete" file
// and since we cannot know the size at which it was rotated, we
// rely on maxBytesPerFileRead rather than maxBytesPerFile
if d.readFileNum < d.writeFileNum && d.nextReadPos >= d.maxBytesPerFileRead {
if d.readFile != nil {
d.readFile.Close()
d.readFile = nil
Expand Down Expand Up @@ -396,6 +407,10 @@ func (d *diskQueue) writeOne(data []byte) error {
d.depth += 1

if d.writePos >= d.maxBytesPerFile {
if d.readFileNum == d.writeFileNum {
d.maxBytesPerFileRead = d.writePos
}

d.writeFileNum++
d.writePos = 0

Expand Down
55 changes: 55 additions & 0 deletions diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,61 @@ func TestDiskQueueTorture(t *testing.T) {
Equal(t, depth, read)
}

func TestDiskQueueResize(t *testing.T) {
l := NewTestLogger(t)
dqName := "test_disk_queue_resize" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
msg := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
ml := int64(len(msg))
dq := New(dqName, tmpDir, 8*(ml+4), int32(ml), 1<<10, 2500, time.Second, l)
NotNil(t, dq)
Equal(t, int64(0), dq.Depth())

for i := 0; i < 8; i++ {
msg[0] = byte(i)
err := dq.Put(msg)
Nil(t, err)
}
Equal(t, int64(1), dq.(*diskQueue).writeFileNum)
Equal(t, int64(0), dq.(*diskQueue).writePos)
Equal(t, int64(8), dq.Depth())

dq.Close()
dq = New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, time.Second, l)

for i := 0; i < 10; i++ {
msg[0] = byte(20 + i)
err := dq.Put(msg)
Nil(t, err)
}
Equal(t, int64(2), dq.(*diskQueue).writeFileNum)
Equal(t, int64(0), dq.(*diskQueue).writePos)
Equal(t, int64(18), dq.Depth())

for i := 0; i < 8; i++ {
msg[0] = byte(i)
Equal(t, msg, <-dq.ReadChan())
}
for i := 0; i < 10; i++ {
msg[0] = byte(20 + i)
Equal(t, msg, <-dq.ReadChan())
}
Equal(t, int64(0), dq.Depth())
dq.Close()

// make sure there aren't "bad" files due to read logic errors
files, err := filepath.Glob(filepath.Join(tmpDir, dqName+"*.bad"))
Nil(t, err)
// empty files slice is actually nil, length check is less confusing
if len(files) > 0 {
Equal(t, []string{}, files)
}
}

func BenchmarkDiskQueuePut16(b *testing.B) {
benchmarkDiskQueuePut(16, b)
}
Expand Down