Skip to content

Commit 02dd623

Browse files
authored
Merge pull request #34 from ploxiln/max_bytes_max
switch to next file before maxBytesPerFile is reached
2 parents bc05aaf + eefc786 commit 02dd623

File tree

2 files changed

+43
-39
lines changed

2 files changed

+43
-39
lines changed

diskqueue.go

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,33 @@ func (d *diskQueue) readOne() ([]byte, error) {
358358
func (d *diskQueue) writeOne(data []byte) error {
359359
var err error
360360

361+
dataLen := int32(len(data))
362+
totalBytes := int64(4 + dataLen)
363+
364+
if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
365+
return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize)
366+
}
367+
368+
// will not wrap-around if maxBytesPerFile + maxMsgSize < Int64Max
369+
if d.writePos > 0 && d.writePos+totalBytes > d.maxBytesPerFile {
370+
if d.readFileNum == d.writeFileNum {
371+
d.maxBytesPerFileRead = d.writePos
372+
}
373+
374+
d.writeFileNum++
375+
d.writePos = 0
376+
377+
// sync every time we start writing to a new file
378+
err = d.sync()
379+
if err != nil {
380+
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
381+
}
382+
383+
if d.writeFile != nil {
384+
d.writeFile.Close()
385+
d.writeFile = nil
386+
}
387+
}
361388
if d.writeFile == nil {
362389
curFileName := d.fileName(d.writeFileNum)
363390
d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
@@ -377,12 +404,6 @@ func (d *diskQueue) writeOne(data []byte) error {
377404
}
378405
}
379406

380-
dataLen := int32(len(data))
381-
382-
if dataLen < d.minMsgSize || dataLen > d.maxMsgSize {
383-
return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize)
384-
}
385-
386407
d.writeBuf.Reset()
387408
err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
388409
if err != nil {
@@ -402,30 +423,9 @@ func (d *diskQueue) writeOne(data []byte) error {
402423
return err
403424
}
404425

405-
totalBytes := int64(4 + dataLen)
406426
d.writePos += totalBytes
407427
d.depth += 1
408428

409-
if d.writePos >= d.maxBytesPerFile {
410-
if d.readFileNum == d.writeFileNum {
411-
d.maxBytesPerFileRead = d.writePos
412-
}
413-
414-
d.writeFileNum++
415-
d.writePos = 0
416-
417-
// sync every time we start writing to a new file
418-
err = d.sync()
419-
if err != nil {
420-
d.logf(ERROR, "DISKQUEUE(%s) failed to sync - %s", d.name, err)
421-
}
422-
423-
if d.writeFile != nil {
424-
d.writeFile.Close()
425-
d.writeFile = nil
426-
}
427-
}
428-
429429
return err
430430
}
431431

diskqueue_test.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -108,23 +108,23 @@ func TestDiskQueueRoll(t *testing.T) {
108108
panic(err)
109109
}
110110
defer os.RemoveAll(tmpDir)
111-
msg := bytes.Repeat([]byte{0}, 10)
111+
msg := []byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}
112112
ml := int64(len(msg))
113113
dq := New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, 2*time.Second, l)
114114
defer dq.Close()
115115
NotNil(t, dq)
116116
Equal(t, int64(0), dq.Depth())
117117

118-
for i := 0; i < 10; i++ {
118+
for i := 0; i < 11; i++ {
119119
err := dq.Put(msg)
120120
Nil(t, err)
121121
Equal(t, int64(i+1), dq.Depth())
122122
}
123123

124124
Equal(t, int64(1), dq.(*diskQueue).writeFileNum)
125-
Equal(t, int64(0), dq.(*diskQueue).writePos)
125+
Equal(t, int64(ml+4), dq.(*diskQueue).writePos)
126126

127-
for i := 10; i > 0; i-- {
127+
for i := 11; i > 0; i-- {
128128
Equal(t, msg, <-dq.ReadChan())
129129
Equal(t, int64(i-1), dq.Depth())
130130
}
@@ -216,7 +216,11 @@ func TestDiskQueueCorruption(t *testing.T) {
216216
dq := New(dqName, tmpDir, 1000, 10, 1<<10, 5, 2*time.Second, l)
217217
defer dq.Close()
218218

219-
msg := make([]byte, 123) // 127 bytes per message, 8 (1016 bytes) messages per file
219+
msg := make([]byte, 120) // 124 bytes per message, 8 messages (992 bytes) per file
220+
msg[0] = 91
221+
msg[62] = 4
222+
msg[119] = 211
223+
220224
for i := 0; i < 25; i++ {
221225
dq.Put(msg)
222226
}
@@ -225,7 +229,7 @@ func TestDiskQueueCorruption(t *testing.T) {
225229

226230
// corrupt the 2nd file
227231
dqFn := dq.(*diskQueue).fileName(1)
228-
os.Truncate(dqFn, 500) // 3 valid messages, 5 corrupted
232+
os.Truncate(dqFn, 400) // 3 valid messages, 5 corrupted
229233

230234
for i := 0; i < 19; i++ { // 1 message leftover in 4th file
231235
Equal(t, msg, <-dq.ReadChan())
@@ -451,14 +455,14 @@ func TestDiskQueueResize(t *testing.T) {
451455
NotNil(t, dq)
452456
Equal(t, int64(0), dq.Depth())
453457

454-
for i := 0; i < 8; i++ {
458+
for i := 0; i < 9; i++ {
455459
msg[0] = byte(i)
456460
err := dq.Put(msg)
457461
Nil(t, err)
458462
}
459463
Equal(t, int64(1), dq.(*diskQueue).writeFileNum)
460-
Equal(t, int64(0), dq.(*diskQueue).writePos)
461-
Equal(t, int64(8), dq.Depth())
464+
Equal(t, int64(ml+4), dq.(*diskQueue).writePos)
465+
Equal(t, int64(9), dq.Depth())
462466

463467
dq.Close()
464468
dq = New(dqName, tmpDir, 10*(ml+4), int32(ml), 1<<10, 2500, time.Second, l)
@@ -469,10 +473,10 @@ func TestDiskQueueResize(t *testing.T) {
469473
Nil(t, err)
470474
}
471475
Equal(t, int64(2), dq.(*diskQueue).writeFileNum)
472-
Equal(t, int64(0), dq.(*diskQueue).writePos)
473-
Equal(t, int64(18), dq.Depth())
476+
Equal(t, int64(ml+4), dq.(*diskQueue).writePos)
477+
Equal(t, int64(19), dq.Depth())
474478

475-
for i := 0; i < 8; i++ {
479+
for i := 0; i < 9; i++ {
476480
msg[0] = byte(i)
477481
Equal(t, msg, <-dq.ReadChan())
478482
}

0 commit comments

Comments
 (0)