Skip to content

Commit 0d45478

Browse files
authored
Merge pull request #2 from kev1n80/DepthImpl
Depth impl
2 parents 4c956aa + 76a0ddd commit 0d45478

File tree

2 files changed

+78
-7
lines changed

2 files changed

+78
-7
lines changed

diskqueue.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,8 @@ func (d *diskQueue) writeOne(data []byte) error {
425425
return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize)
426426
}
427427

428+
// add all data to writeBuf before writing to file
429+
// this causes everything to be written to file or nothing
428430
d.writeBuf.Reset()
429431
err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
430432
if err != nil {
@@ -441,7 +443,7 @@ func (d *diskQueue) writeOne(data []byte) error {
441443
// check if we reached the file size limit with this message
442444
if d.diskLimitFeatIsOn && d.writePos+totalBytes+8 >= d.maxBytesPerFile {
443445
// write number of messages in binary to file
444-
err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages)
446+
err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1)
445447
if err != nil {
446448
return err
447449
}

diskqueue_test.go

Lines changed: 75 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -260,22 +260,23 @@ type md struct {
260260
writePos int64
261261
}
262262

263-
func readMetaDataFile(fileName string, retried int, withDiskSpaceFeat bool) md {
263+
func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md {
264264
f, err := os.OpenFile(fileName, os.O_RDONLY, 0600)
265265
if err != nil {
266266
// provide a simple retry that results in up to
267267
// another 500ms for the file to be written.
268268
if retried < 9 {
269269
retried++
270270
time.Sleep(50 * time.Millisecond)
271-
return readMetaDataFile(fileName, retried, withDiskSpaceFeat)
271+
return readMetaDataFile(fileName, retried, diskLimitFeatIsOn)
272272
}
273273
panic(err)
274274
}
275275
defer f.Close()
276276

277277
var ret md
278-
if withDiskSpaceFeat {
278+
279+
if diskLimitFeatIsOn {
279280
_, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n",
280281
&ret.depth,
281282
&ret.readFileNum, &ret.readMessages, &ret.readPos,
@@ -352,7 +353,8 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) {
352353
dq := NewWithDiskSpace(dqName, tmpDir, 1<<11, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l)
353354
defer dq.Close()
354355

355-
msg := make([]byte, 1000)
356+
msgSize := 1000
357+
msg := make([]byte, msgSize)
356358
dq.Put(msg)
357359

358360
for i := 0; i < 10; i++ {
@@ -394,12 +396,19 @@ next:
394396
panic("fail")
395397

396398
completeWriteFile:
397-
dq.Put(msg)
399+
// meet the file size limit exactly (2048 bytes) when writeFileNum
400+
// equals readFileNum
401+
totalBytes := 2 * (msgSize + 4)
402+
bytesRemaining := 2048 - (totalBytes + 8)
403+
oneByteMsgSizeIncrease := 5
404+
dq.Put(make([]byte, bytesRemaining-4-oneByteMsgSizeIncrease))
405+
dq.Put(make([]byte, 1))
398406

399407
for i := 0; i < 10; i++ {
400408
// test that write position and messages reset when a new file is created
409+
// test the writeFileNum correctly increments
401410
d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true)
402-
if d.depth == 2 &&
411+
if d.depth == 3 &&
403412
d.writeBytes == 3020 &&
404413
d.readFileNum == 0 &&
405414
d.writeFileNum == 1 &&
@@ -417,11 +426,13 @@ completeWriteFile:
417426
completeReadFile:
418427
dq.Put(msg)
419428

429+
<-dq.ReadChan()
420430
<-dq.ReadChan()
421431
<-dq.ReadChan()
422432

423433
for i := 0; i < 10; i++ {
424434
// test that read position and messages reset when a file is completely read
435+
// test the readFileNum correctly increments
425436
d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true)
426437
t.Logf("Write bytes: %d", d.writeBytes)
427438
if d.depth == 1 &&
@@ -433,6 +444,64 @@ completeReadFile:
433444
d.readMessages == 0 &&
434445
d.writeMessages == 1 {
435446
// success
447+
goto completeWriteFileAgain
448+
}
449+
time.Sleep(100 * time.Millisecond)
450+
}
451+
panic("fail")
452+
453+
completeWriteFileAgain:
454+
// make writeFileNum ahead of readFileNum
455+
dq.Put(msg)
456+
dq.Put(msg)
457+
458+
// meet the file size limit exactly (2048 bytes) when writeFileNum
459+
// is ahead of readFileNum
460+
dq.Put(msg)
461+
dq.Put(msg)
462+
dq.Put(make([]byte, bytesRemaining-4-oneByteMsgSizeIncrease))
463+
dq.Put(make([]byte, 1))
464+
465+
for i := 0; i < 10; i++ {
466+
// test that write position and messages reset when a file is completely read
467+
// test the writeFileNum correctly increments
468+
d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true)
469+
if d.depth == 7 &&
470+
d.readFileNum == 1 &&
471+
d.writeFileNum == 3 &&
472+
d.readPos == 0 &&
473+
d.writePos == 0 &&
474+
d.readMessages == 0 &&
475+
d.writeMessages == 0 {
476+
// success
477+
goto completeReadFileAgain
478+
}
479+
time.Sleep(100 * time.Millisecond)
480+
}
481+
panic("fail")
482+
483+
completeReadFileAgain:
484+
<-dq.ReadChan()
485+
<-dq.ReadChan()
486+
<-dq.ReadChan()
487+
488+
<-dq.ReadChan()
489+
<-dq.ReadChan()
490+
<-dq.ReadChan()
491+
<-dq.ReadChan()
492+
493+
for i := 0; i < 10; i++ {
494+
// test that read position and messages reset when a file is completely read
495+
// test the readFileNum correctly increments
496+
d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true)
497+
if d.depth == 0 &&
498+
d.readFileNum == 3 &&
499+
d.writeFileNum == 3 &&
500+
d.readPos == 0 &&
501+
d.writePos == 0 &&
502+
d.readMessages == 0 &&
503+
d.writeMessages == 0 {
504+
// success
436505
goto done
437506
}
438507
time.Sleep(100 * time.Millisecond)

0 commit comments

Comments
 (0)