From 4f3dbc20b01f7c0164ca3c9256f10205011cf3c4 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 19:55:35 +0000 Subject: [PATCH 01/10] Remove readBytes and update testing. --- diskqueue.go | 9 ++++++--- diskqueue_test.go | 40 +++++++++++++++++++++------------------- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 3aab8e5..6b8149c 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -302,9 +302,12 @@ func (d *diskQueue) skipToNextRWFile() error { d.nextReadFileNum = d.writeFileNum d.nextReadPos = 0 d.depth = 0 - d.readMessages = 0 - d.writeMessages = 0 - d.writeBytes = 0 + + if d.diskLimitFeatIsOn { + d.writeBytes = 0 + d.readMessages = 0 + d.writeMessages = 0 + } return err } diff --git a/diskqueue_test.go b/diskqueue_test.go index 8d4f78f..26af104 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -363,10 +363,10 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { d.writeBytes == 1004 && d.readFileNum == 0 && d.writeFileNum == 0 && - d.readPos == 0 && - d.writePos == 1004 && d.readMessages == 0 && - d.writeMessages == 1 { + d.writeMessages == 1 && + d.readPos == 0 && + d.writePos == 1004 { // success goto next } @@ -384,10 +384,10 @@ next: d.writeBytes == 2008 && d.readFileNum == 0 && d.writeFileNum == 0 && - d.readPos == 1004 && - d.writePos == 2008 && d.readMessages == 1 && - d.writeMessages == 2 { + d.writeMessages == 2 && + d.readPos == 1004 && + d.writePos == 2008 { // success goto completeWriteFile } @@ -409,13 +409,13 @@ completeWriteFile: // test the writeFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 3 && - d.writeBytes == 3020 && + d.writeBytes == 2048 && d.readFileNum == 0 && d.writeFileNum == 1 && - d.readPos == 1004 && - d.writePos == 0 && d.readMessages == 1 && - d.writeMessages == 0 { + d.writeMessages == 0 && + d.readPos == 1004 && + d.writePos == 0 { // success goto completeReadFile } @@ -439,10 +439,10 @@ completeReadFile: d.writeBytes == 1004 && d.readFileNum == 1 && d.writeFileNum == 1 && - d.readPos == 0 && - d.writePos == 1004 && d.readMessages == 0 && - d.writeMessages == 1 { + d.writeMessages == 1 && + d.readPos == 0 && + d.writePos == 1004 { // success goto completeWriteFileAgain } @@ -467,12 +467,13 @@ completeWriteFileAgain: // test the writeFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 7 && + d.writeBytes == 5068 && d.readFileNum == 1 && d.writeFileNum == 3 && - d.readPos == 0 && - d.writePos == 0 && d.readMessages == 0 && - d.writeMessages == 0 { + d.writeMessages == 0 && + d.readPos == 0 && + d.writePos == 0 { // success goto completeReadFileAgain } @@ -495,12 +496,13 @@ completeReadFileAgain: // test the readFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 0 && + d.writeBytes == 0 && d.readFileNum == 3 && d.writeFileNum == 3 && - d.readPos == 0 && - d.writePos == 0 && d.readMessages == 0 && - d.writeMessages == 0 { + d.writeMessages == 0 && + d.readPos == 0 && + d.writePos == 0 { // success goto done } From 0ab12757ff20da364438b3d3b577cb87e5135bd2 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 2 Jun 2021 20:47:12 +0000 Subject: [PATCH 02/10] Add comment for readMsgSize. --- diskqueue.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 6b8149c..2cb4069 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -87,6 +87,10 @@ type diskQueue struct { nextReadPos int64 nextReadFileNum int64 + // keep track of the msg size we have read + // (but not yet sent over readChan) + readMsgSize int32 + readFile *os.File writeFile *os.File reader *bufio.Reader @@ -108,9 +112,6 @@ type diskQueue struct { // disk limit implementation flag diskLimitFeatIsOn bool - - // the size of the - readMsgSize int32 } // New instantiates an instance of diskQueue, retrieving metadata From 844e43bf80fb84bffb876423bcaeb8879f207b60 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Thu, 3 Jun 2021 15:00:43 +0000 Subject: [PATCH 03/10] Modify disk size limit features only if disk size limit feature is being used . --- diskqueue.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 2cb4069..8212306 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -649,11 +649,13 @@ func (d *diskQueue) moveForward() { d.readFileNum = d.nextReadFileNum d.readPos = d.nextReadPos d.depth -= 1 - d.readMessages += 1 + + if d.diskLimitFeatIsOn { + d.readMessages += 1 + } // see if we need to clean up the old file if oldReadFileNum != d.nextReadFileNum { - d.readMessages = 0 // sync every time we start reading from a new file d.needSync = true @@ -664,7 +666,10 @@ func (d *diskQueue) moveForward() { d.logf(ERROR, "DISKQUEUE(%s) failed to Remove(%s) - %s", d.name, fn, err) } - d.writeBytes -= readFileLen + if d.diskLimitFeatIsOn { + d.readMessages = 0 + d.writeBytes -= readFileLen + } } d.checkTailCorruption(d.depth) From 8a5683962c6645a1f5c617258ca1097ed20073d8 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 9 Jun 2021 16:52:06 +0000 Subject: [PATCH 04/10] Reset write bytes during read error if read file is write file. --- diskqueue.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/diskqueue.go b/diskqueue.go index 8212306..b08da94 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -686,6 +686,11 @@ func (d *diskQueue) handleReadError() { } d.writeFileNum++ d.writePos = 0 + + if d.diskLimitFeatIsOn { + d.writeMessages = 0 + d.writeBytes = 0 + } } badFn := d.fileName(d.readFileNum) @@ -706,6 +711,9 @@ func (d *diskQueue) handleReadError() { d.readPos = 0 d.nextReadFileNum = d.readFileNum d.nextReadPos = 0 + if d.diskLimitFeatIsOn { + d.readMessages = 0 + } // significant state change, schedule a sync on the next iteration d.needSync = true From 8848880d6854bb55e8346cb89a6849347bec6179 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 9 Jun 2021 20:54:04 +0000 Subject: [PATCH 05/10] Track writeBytes and readMsgSize. --- diskqueue.go | 59 ++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 18 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index e981d3a..fbdbeab 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -65,6 +65,7 @@ type diskQueue struct { writeFileNum int64 readMessages int64 writeMessages int64 + writeBytes int64 depth int64 sync.RWMutex @@ -317,7 +318,6 @@ func (d *diskQueue) skipToNextRWFile() error { // while advancing read positions and rolling files, if necessary func (d *diskQueue) readOne() ([]byte, error) { var err error - var msgSize int32 if d.readFile == nil { curFileName := d.fileName(d.readFileNum) @@ -354,22 +354,22 @@ func (d *diskQueue) readOne() ([]byte, error) { d.reader = bufio.NewReader(d.readFile) } - err = binary.Read(d.reader, binary.BigEndian, &msgSize) + err = binary.Read(d.reader, binary.BigEndian, &d.readMsgSize) if err != nil { d.readFile.Close() d.readFile = nil return nil, err } - if msgSize < d.minMsgSize || msgSize > d.maxMsgSize { + if d.readMsgSize < d.minMsgSize || d.readMsgSize > d.maxMsgSize { // this file is corrupt and we have no reasonable guarantee on // where a new message should begin d.readFile.Close() d.readFile = nil - return nil, fmt.Errorf("invalid message read size (%d)", msgSize) + return nil, fmt.Errorf("invalid message read size (%d)", d.readMsgSize) } - readBuf := make([]byte, msgSize) + readBuf := make([]byte, d.readMsgSize) _, err = io.ReadFull(d.reader, readBuf) if err != nil { d.readFile.Close() @@ -377,7 +377,7 @@ func (d *diskQueue) readOne() ([]byte, error) { return nil, err } - totalBytes := int64(4 + msgSize) + totalBytes := int64(4 + d.readMsgSize) // we only advance next* because we have not yet sent this to consumers // (where readFileNum, readPos will actually be advanced) @@ -470,6 +470,7 @@ func (d *diskQueue) writeOne(data []byte) error { if d.enableDiskLimitation { // save space for the number of messages in this file fileSize += numFileMsgsBytes + d.writeBytes += totalBytes d.writeMessages += 1 } @@ -480,7 +481,12 @@ func (d *diskQueue) writeOne(data []byte) error { d.writeFileNum++ d.writePos = 0 - d.writeMessages = 0 + + if d.enableDiskLimitation { + // add bytes for the number of messages in the file + d.writeBytes += numFileMsgsBytes + d.writeMessages = 0 + } // sync every time we start writing to a new file err = d.sync() @@ -531,10 +537,10 @@ func (d *diskQueue) retrieveMetaData() error { // if user is using disk size limit feature if d.enableDiskLimitation { - _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", + _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n", &d.depth, &d.readFileNum, &d.readMessages, &d.readPos, - &d.writeFileNum, &d.writeMessages, &d.writePos) + &d.writeBytes, &d.writeFileNum, &d.writeMessages, &d.writePos) } else { _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", &d.depth, @@ -568,10 +574,10 @@ func (d *diskQueue) persistMetaData() error { // if user is using disk size limit feature if d.enableDiskLimitation { - _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", + _, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n", d.depth, d.readFileNum, d.readMessages, d.readPos, - d.writeFileNum, d.writeMessages, d.writePos) + d.writeBytes, d.writeFileNum, d.writeMessages, d.writePos) } else { _, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n", d.depth, @@ -638,12 +644,15 @@ func (d *diskQueue) checkTailCorruption(depth int64) { } func (d *diskQueue) moveForward() { + // add bytes for the number of messages and the size of the message + readFileSize := int64(d.readMsgSize) + d.readPos + 12 + oldReadFileNum := d.readFileNum d.readFileNum = d.nextReadFileNum d.readPos = d.nextReadPos d.depth -= 1 - if d.diskLimitFeatIsOn { + if d.enableDiskLimitation { d.readMessages += 1 } @@ -661,7 +670,7 @@ func (d *diskQueue) moveForward() { if d.enableDiskLimitation { d.readMessages = 0 - d.writeBytes -= readFileLen + d.writeBytes -= readFileSize } } @@ -679,11 +688,7 @@ func (d *diskQueue) handleReadError() { } d.writeFileNum++ d.writePos = 0 - - if d.enableDiskLimitation { - d.writeMessages = 0 - d.writeBytes = 0 - } + d.writeMessages = 0 } badFn := d.fileName(d.readFileNum) @@ -700,6 +705,24 @@ func (d *diskQueue) handleReadError() { d.name, badFn, badRenameFn) } + if d.enableDiskLimitation { + var badFileSize int64 + if d.readFileNum == d.writeFileNum { + badFileSize = d.writeBytes + } else { + var stat os.FileInfo + stat, err = os.Stat(badRenameFn) + if err == nil { + badFileSize = stat.Size() + } else { + // max file size + badFileSize = int64(d.maxMsgSize) + d.maxBytesPerFile + 4 + } + } + + d.writeBytes -= badFileSize + } + d.readFileNum++ d.readPos = 0 d.nextReadFileNum = d.readFileNum From b53b2639454ab5227ae4e8799134fd1b30754206 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 9 Jun 2021 20:54:45 +0000 Subject: [PATCH 06/10] Update test. --- diskqueue_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/diskqueue_test.go b/diskqueue_test.go index 325da2c..d08be9c 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -251,6 +251,7 @@ func TestDiskQueueCorruption(t *testing.T) { type md struct { depth int64 + writeBytes int64 readFileNum int64 writeFileNum int64 readMessages int64 @@ -275,10 +276,10 @@ func readMetaDataFile(fileName string, retried int, enableDiskLimitation bool) m var ret md if enableDiskLimitation { - _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n", + _, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d,%d\n", &ret.depth, &ret.readFileNum, &ret.readMessages, &ret.readPos, - &ret.writeFileNum, &ret.writeMessages, &ret.writePos) + &ret.writeBytes, &ret.writeFileNum, &ret.writeMessages, &ret.writePos) } else { _, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n", &ret.depth, From 7c5234e6c4833447dfa7e627e89813a1d84d7213 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Wed, 9 Jun 2021 20:59:18 +0000 Subject: [PATCH 07/10] Test writeBytes is accurate. --- diskqueue_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/diskqueue_test.go b/diskqueue_test.go index d08be9c..9e98eaa 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -359,6 +359,7 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { for i := 0; i < 10; i++ { d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 1 && + d.writeBytes == 1004 && d.readFileNum == 0 && d.writeFileNum == 0 && d.readMessages == 0 && @@ -379,6 +380,7 @@ next: for i := 0; i < 10; i++ { d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 1 && + d.writeBytes == 2008 && d.readFileNum == 0 && d.writeFileNum == 0 && d.readMessages == 1 && @@ -432,6 +434,7 @@ completeReadFile: // test the readFileNum correctly increments d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true) if d.depth == 1 && + d.writeBytes == 1004 && d.readFileNum == 1 && d.writeFileNum == 1 && d.readMessages == 0 && From a042536d15da2fc9a7a047b64826b0d530529920 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Fri, 11 Jun 2021 20:43:23 +0000 Subject: [PATCH 08/10] Make code more readable. --- diskqueue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/diskqueue.go b/diskqueue.go index fbdbeab..760f35b 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -645,7 +645,7 @@ func (d *diskQueue) checkTailCorruption(depth int64) { func (d *diskQueue) moveForward() { // add bytes for the number of messages and the size of the message - readFileSize := int64(d.readMsgSize) + d.readPos + 12 + readFileSize := int64(d.readMsgSize) + d.readPos + numFileMsgsBytes + 4 oldReadFileNum := d.readFileNum d.readFileNum = d.nextReadFileNum From 1d51b40079689bfcb90dd533ac18f735addf08c2 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Mon, 14 Jun 2021 13:23:08 +0000 Subject: [PATCH 09/10] Revert back to using msgSize instead of readMsgSize in order to make minimal changes. --- diskqueue.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 760f35b..9d259dd 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -318,6 +318,7 @@ func (d *diskQueue) skipToNextRWFile() error { // while advancing read positions and rolling files, if necessary func (d *diskQueue) readOne() ([]byte, error) { var err error + var msgSize int32 if d.readFile == nil { curFileName := d.fileName(d.readFileNum) @@ -354,22 +355,24 @@ func (d *diskQueue) readOne() ([]byte, error) { d.reader = bufio.NewReader(d.readFile) } - err = binary.Read(d.reader, binary.BigEndian, &d.readMsgSize) + err = binary.Read(d.reader, binary.BigEndian, &msgSize) if err != nil { d.readFile.Close() d.readFile = nil return nil, err } - if d.readMsgSize < d.minMsgSize || d.readMsgSize > d.maxMsgSize { + if msgSize < d.minMsgSize || msgSize > d.maxMsgSize { // this file is corrupt and we have no reasonable guarantee on // where a new message should begin d.readFile.Close() d.readFile = nil - return nil, fmt.Errorf("invalid message read size (%d)", d.readMsgSize) + return nil, fmt.Errorf("invalid message read size (%d)", msgSize) } - readBuf := make([]byte, d.readMsgSize) + d.readMsgSize = msgSize + + readBuf := make([]byte, msgSize) _, err = io.ReadFull(d.reader, readBuf) if err != nil { d.readFile.Close() @@ -377,7 +380,7 @@ func (d *diskQueue) readOne() ([]byte, error) { return nil, err } - totalBytes := int64(4 + d.readMsgSize) + totalBytes := int64(4 + msgSize) // we only advance next* because we have not yet sent this to consumers // (where readFileNum, readPos will actually be advanced) From 886b9ab7040b1eb5b46367b9dccf7a47ff0e34a6 Mon Sep 17 00:00:00 2001 From: Kevin Cam Date: Mon, 14 Jun 2021 13:27:40 +0000 Subject: [PATCH 10/10] Update max bad file size to include numFileMsgsBytes. --- diskqueue.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/diskqueue.go b/diskqueue.go index 9d259dd..c5e6e3d 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -719,7 +719,7 @@ func (d *diskQueue) handleReadError() { badFileSize = stat.Size() } else { // max file size - badFileSize = int64(d.maxMsgSize) + d.maxBytesPerFile + 4 + badFileSize = int64(d.maxMsgSize) + d.maxBytesPerFile + 4 + numFileMsgsBytes } }