diff --git a/diskqueue.go b/diskqueue.go index 5821548..c5e6e3d 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -65,6 +65,7 @@ type diskQueue struct { writeFileNum int64 readMessages int64 writeMessages int64 + writeBytes int64 depth int64 sync.RWMutex @@ -87,6 +88,10 @@ type diskQueue struct { nextReadPos int64 nextReadFileNum int64 + // keep track of the msg size we have read + // (but not yet sent over readChan) + readMsgSize int32 + readFile *os.File writeFile *os.File reader *bufio.Reader @@ -299,8 +304,12 @@ func (d *diskQueue) skipToNextRWFile() error { d.nextReadFileNum = d.writeFileNum d.nextReadPos = 0 d.depth = 0 - d.readMessages = 0 - d.writeMessages = 0 + + if d.enableDiskLimitation { + d.writeBytes = 0 + d.readMessages = 0 + d.writeMessages = 0 + } return err } @@ -361,6 +370,8 @@ func (d *diskQueue) readOne() ([]byte, error) { return nil, fmt.Errorf("invalid message read size (%d)", msgSize) } + d.readMsgSize = msgSize + readBuf := make([]byte, msgSize) _, err = io.ReadFull(d.reader, readBuf) if err != nil { @@ -462,6 +473,7 @@ func (d *diskQueue) writeOne(data []byte) error { if d.enableDiskLimitation { // save space for the number of messages in this file fileSize += numFileMsgsBytes + d.writeBytes += totalBytes d.writeMessages += 1 } @@ -472,7 +484,12 @@ func (d *diskQueue) writeOne(data []byte) error { d.writeFileNum++ d.writePos = 0 - d.writeMessages = 0 + + if d.enableDiskLimitation { + // add bytes for the number of messages in the file + d.writeBytes += numFileMsgsBytes + d.writeMessages = 0 + } // sync every time we start writing to a new file err = d.sync() @@ -523,10 +540,10 @@ func (d *diskQueue) retrieveMetaData() error { // if user is using disk size limit feature if d.enableDiskLimitation { - _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", + _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n", &d.depth, &d.readFileNum, &d.readMessages, &d.readPos, - &d.writeFileNum, &d.writeMessages, &d.writePos) + &d.writeBytes, &d.writeFileNum, &d.writeMessages, &d.writePos) } else { _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", &d.depth, @@ -560,10 +577,10 @@ func (d *diskQueue) persistMetaData() error { // if user is using disk size limit feature if d.enableDiskLimitation { - _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", + _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n", d.depth, d.readFileNum, d.readMessages, d.readPos, - d.writeFileNum, d.writeMessages, d.writePos) + d.writeBytes, d.writeFileNum, d.writeMessages, d.writePos) } else { _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n", d.depth, @@ -630,15 +647,20 @@ func (d *diskQueue) checkTailCorruption(depth int64) { } func (d *diskQueue) moveForward() { + // add bytes for the number of messages and the size of the message + readFileSize := int64(d.readMsgSize) + d.readPos + numFileMsgsBytes + 4 + oldReadFileNum := d.readFileNum d.readFileNum = d.nextReadFileNum d.readPos = d.nextReadPos d.depth -= 1 - d.readMessages += 1 + + if d.enableDiskLimitation { + d.readMessages += 1 + } // see if we need to clean up the old file if oldReadFileNum != d.nextReadFileNum { - d.readMessages = 0 // sync every time we start reading from a new file d.needSync = true @@ -648,6 +670,11 @@ func (d *diskQueue) moveForward() { if err != nil { d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, fn, err) } + + if d.enableDiskLimitation { + d.readMessages = 0 + d.writeBytes -= readFileSize + } } d.checkTailCorruption(d.depth) @@ -681,6 +708,24 @@ func (d *diskQueue) handleReadError() { d.name, badFn, badRenameFn) } + if d.enableDiskLimitation { + var badFileSize int64 + if d.readFileNum == d.writeFileNum { + badFileSize = d.writeBytes + } else { + var stat os.FileInfo + stat, err = os.Stat(badRenameFn) + if err == nil { + badFileSize = stat.Size() + } else { + // max file size + badFileSize = int64(d.maxMsgSize) + d.maxBytesPerFile + 4 + numFileMsgsBytes + } + } + + d.writeBytes -= badFileSize + } + d.readFileNum++ d.readPos = 0 d.nextReadFileNum = d.readFileNum diff --git a/diskqueue_test.go b/diskqueue_test.go index b4a712a..9e98eaa 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -251,6 +251,7 @@ func TestDiskQueueCorruption(t *testing.T) { type md struct { depth int64 + writeBytes int64 readFileNum int64 writeFileNum int64 readMessages int64 @@ -275,10 +276,10 @@ func readMetaDataFile(fileName string, retried int, enableDiskLimitation bool) m var ret md if enableDiskLimitation { - _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", + _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, - &ret.writeFileNum, &ret.writeMessages, &ret.writePos) + &ret.writeBytes, &ret.writeFileNum, &ret.writeMessages, &ret.writePos) } else { _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", &ret.depth, @@ -358,12 +359,13 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { for i := 0; i < 10; i++ { d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 1 && + d.writeBytes == 1004 && d.readFileNum == 0 && d.writeFileNum == 0 && - d.readPos == 0 && - d.writePos == 1004 && d.readMessages == 0 && - d.writeMessages == 1 { + d.writeMessages == 1 && + d.readPos == 0 && + d.writePos == 1004 { // success goto next } @@ -378,12 +380,13 @@ next: for i := 0; i < 10; i++ { d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 1 && + d.writeBytes == 2008 && d.readFileNum == 0 && d.writeFileNum == 0 && - d.readPos == 1004 && - d.writePos == 2008 && d.readMessages == 1 && - d.writeMessages == 2 { + d.writeMessages == 2 && + d.readPos == 1004 && + d.writePos == 2008 { // success goto completeWriteFile } @@ -405,12 +408,13 @@ completeWriteFile: // test the writeFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 3 && + d.writeBytes == 2048 && d.readFileNum == 0 && d.writeFileNum == 1 && - d.readPos == 1004 && - d.writePos == 0 && d.readMessages == 1 && - d.writeMessages == 0 { + d.writeMessages == 0 && + d.readPos == 1004 && + d.writePos == 0 { // success goto completeReadFile } @@ -430,12 +434,13 @@ completeReadFile: // test the readFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 1 && + d.writeBytes == 1004 && d.readFileNum == 1 && d.writeFileNum == 1 && - d.readPos == 0 && - d.writePos == 1004 && d.readMessages == 0 && - d.writeMessages == 1 { + d.writeMessages == 1 && + d.readPos == 0 && + d.writePos == 1004 { // success goto completeWriteFileAgain } @@ -460,12 +465,13 @@ completeWriteFileAgain: // test the writeFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 7 && + d.writeBytes == 5068 && d.readFileNum == 1 && d.writeFileNum == 3 && - d.readPos == 0 && - d.writePos == 0 && d.readMessages == 0 && - d.writeMessages == 0 { + d.writeMessages == 0 && + d.readPos == 0 && + d.writePos == 0 { // success goto completeReadFileAgain } @@ -488,12 +494,13 @@ completeReadFileAgain: // test the readFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 0 && + d.writeBytes == 0 && d.readFileNum == 3 && d.writeFileNum == 3 && - d.readPos == 0 && - d.writePos == 0 && d.readMessages == 0 && - d.writeMessages == 0 { + d.writeMessages == 0 && + d.readPos == 0 && + d.writePos == 0 { // success goto done }