Skip to content

Commit 76a0ddd

Browse files
authored
Merge branch 'TrackDiskSize' into DepthImpl
2 parents 30c6ef6 + 4c956aa commit 76a0ddd

File tree

2 files changed

+34
-13
lines changed

2 files changed

+34
-13
lines changed

diskqueue.go

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ type diskQueue struct {
6464
writeFileNum int64
6565
readMessages int64
6666
writeMessages int64
67+
writeBytes int64
6768
depth int64
6869

6970
sync.RWMutex
@@ -107,6 +108,9 @@ type diskQueue struct {
107108

108109
// disk limit implementation flag
109110
diskLimitFeatIsOn bool
111+
112+
// the size of the
113+
readMsgSize int32
110114
}
111115

112116
// New instantiates an instance of diskQueue, retrieving metadata
@@ -300,6 +304,7 @@ func (d *diskQueue) skipToNextRWFile() error {
300304
d.depth = 0
301305
d.readMessages = 0
302306
d.writeMessages = 0
307+
d.writeBytes = 0
303308

304309
return err
305310
}
@@ -308,7 +313,6 @@ func (d *diskQueue) skipToNextRWFile() error {
308313
// while advancing read positions and rolling files, if necessary
309314
func (d *diskQueue) readOne() ([]byte, error) {
310315
var err error
311-
var msgSize int32
312316

313317
if d.readFile == nil {
314318
curFileName := d.fileName(d.readFileNum)
@@ -345,30 +349,30 @@ func (d *diskQueue) readOne() ([]byte, error) {
345349
d.reader = bufio.NewReader(d.readFile)
346350
}
347351

348-
err = binary.Read(d.reader, binary.BigEndian, &msgSize)
352+
err = binary.Read(d.reader, binary.BigEndian, &d.readMsgSize)
349353
if err != nil {
350354
d.readFile.Close()
351355
d.readFile = nil
352356
return nil, err
353357
}
354358

355-
if msgSize < d.minMsgSize || msgSize > d.maxMsgSize {
359+
if d.readMsgSize < d.minMsgSize || d.readMsgSize > d.maxMsgSize {
356360
// this file is corrupt and we have no reasonable guarantee on
357361
// where a new message should begin
358362
d.readFile.Close()
359363
d.readFile = nil
360-
return nil, fmt.Errorf("invalid message read size (%d)", msgSize)
364+
return nil, fmt.Errorf("invalid message read size (%d)", d.readMsgSize)
361365
}
362366

363-
readBuf := make([]byte, msgSize)
367+
readBuf := make([]byte, d.readMsgSize)
364368
_, err = io.ReadFull(d.reader, readBuf)
365369
if err != nil {
366370
d.readFile.Close()
367371
d.readFile = nil
368372
return nil, err
369373
}
370374

371-
totalBytes := int64(4 + msgSize)
375+
totalBytes := int64(4 + d.readMsgSize)
372376

373377
// we only advance next* because we have not yet sent this to consumers
374378
// (where readFileNum, readPos will actually be advanced)
@@ -461,6 +465,7 @@ func (d *diskQueue) writeOne(data []byte) error {
461465
if d.diskLimitFeatIsOn {
462466
// save space for the number of messages in this file
463467
fileSize += 8
468+
d.writeBytes += totalBytes
464469
d.writeMessages += 1
465470
}
466471

@@ -471,7 +476,12 @@ func (d *diskQueue) writeOne(data []byte) error {
471476

472477
d.writeFileNum++
473478
d.writePos = 0
474-
d.writeMessages = 0
479+
480+
if d.diskLimitFeatIsOn {
481+
// add bytes for the number of messages in the file
482+
d.writeBytes += 8
483+
d.writeMessages = 0
484+
}
475485

476486
// sync every time we start writing to a new file
477487
err = d.sync()
@@ -522,10 +532,10 @@ func (d *diskQueue) retrieveMetaData() error {
522532

523533
// if user is using disk space limit feature
524534
if d.diskLimitFeatIsOn {
525-
_, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n",
535+
_, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n",
526536
&d.depth,
527537
&d.readFileNum, &d.readMessages, &d.readPos,
528-
&d.writeFileNum, &d.writeMessages, &d.writePos)
538+
&d.writeBytes, &d.writeFileNum, &d.writeMessages, &d.writePos)
529539
} else {
530540
_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
531541
&d.depth,
@@ -559,10 +569,10 @@ func (d *diskQueue) persistMetaData() error {
559569

560570
// if user is using disk space limit feature
561571
if d.diskLimitFeatIsOn {
562-
_, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n",
572+
_, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n",
563573
d.depth,
564574
d.readFileNum, d.readMessages, d.readPos,
565-
d.writeFileNum, d.writeMessages, d.writePos)
575+
d.writeBytes, d.writeFileNum, d.writeMessages, d.writePos)
566576
} else {
567577
_, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
568578
d.depth,
@@ -629,6 +639,8 @@ func (d *diskQueue) checkTailCorruption(depth int64) {
629639
}
630640

631641
func (d *diskQueue) moveForward() {
642+
// add bytes for the number of messages and the size of the message
643+
readFileLen := int64(d.readMsgSize) + d.readPos + 12
632644
oldReadFileNum := d.readFileNum
633645
d.readFileNum = d.nextReadFileNum
634646
d.readPos = d.nextReadPos
@@ -647,6 +659,8 @@ func (d *diskQueue) moveForward() {
647659
if err != nil {
648660
d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, fn, err)
649661
}
662+
663+
d.writeBytes -= readFileLen
650664
}
651665

652666
d.checkTailCorruption(d.depth)

diskqueue_test.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ func TestDiskQueueCorruption(t *testing.T) {
251251

252252
type md struct {
253253
depth int64
254+
writeBytes int64
254255
readFileNum int64
255256
writeFileNum int64
256257
readMessages int64
@@ -274,11 +275,12 @@ func readMetaDataFile(fileName string, retried int, diskLimitFeatIsOn bool) md {
274275
defer f.Close()
275276

276277
var ret md
278+
277279
if diskLimitFeatIsOn {
278-
_, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n",
280+
_, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n",
279281
&ret.depth,
280282
&ret.readFileNum, &ret.readMessages, &ret.readPos,
281-
&ret.writeFileNum, &ret.writeMessages, &ret.writePos)
283+
&ret.writeBytes, &ret.writeFileNum, &ret.writeMessages, &ret.writePos)
282284
} else {
283285
_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
284286
&ret.depth,
@@ -358,6 +360,7 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) {
358360
for i := 0; i < 10; i++ {
359361
d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true)
360362
if d.depth == 1 &&
363+
d.writeBytes == 1004 &&
361364
d.readFileNum == 0 &&
362365
d.writeFileNum == 0 &&
363366
d.readPos == 0 &&
@@ -378,6 +381,7 @@ next:
378381
for i := 0; i < 10; i++ {
379382
d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true)
380383
if d.depth == 1 &&
384+
d.writeBytes == 2008 &&
381385
d.readFileNum == 0 &&
382386
d.writeFileNum == 0 &&
383387
d.readPos == 1004 &&
@@ -405,6 +409,7 @@ completeWriteFile:
405409
// test the writeFileNum correctly increments
406410
d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true)
407411
if d.depth == 3 &&
412+
d.writeBytes == 3020 &&
408413
d.readFileNum == 0 &&
409414
d.writeFileNum == 1 &&
410415
d.readPos == 1004 &&
@@ -429,7 +434,9 @@ completeReadFile:
429434
// test that read position and messages reset when a file is completely read
430435
// test the readFileNum correctly increments
431436
d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true)
437+
t.Logf("Write bytes: %d", d.writeBytes)
432438
if d.depth == 1 &&
439+
d.writeBytes == 1004 &&
433440
d.readFileNum == 1 &&
434441
d.writeFileNum == 1 &&
435442
d.readPos == 0 &&

0 commit comments

Comments
 (0)