Skip to content

Track the Number of Messages #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Jun 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
5a4a043
Keep track of the number of messages in the writeFile and the number …
kev1n80 May 24, 2021
15339eb
Update tests to reflect changes made to the metadata file. Metadata f…
kev1n80 May 24, 2021
ddadcb9
Test thtat write messages resets when a new file is created.
kev1n80 May 24, 2021
609f5c9
Add a test that checks if meta data file is correct after writing a c…
kev1n80 May 24, 2021
cd45da2
Allocate 8 bytes instead of 4 since writeMessages is in64, and reset …
kev1n80 May 27, 2021
eeaa65a
Update comment about number of bytes reserved for writeMessages.
kev1n80 May 27, 2021
2496eff
Allow user to choose whether to use new feature or not.
kev1n80 May 27, 2021
b35c029
Separate testing from the original with the implementation of depth f…
kev1n80 May 27, 2021
5cdd0ae
Revert to original code style.
kev1n80 May 27, 2021
fcb31cc
Add comment for functions.
kev1n80 May 27, 2021
c90f17f
Explain that maxBytesDiskSpace is 0 when user is not using disk space…
kev1n80 May 27, 2021
a28e6d5
Add explicit disk size limit feature flag.
kev1n80 Jun 1, 2021
293a774
Add a new line after if block.
kev1n80 Jun 1, 2021
4c956aa
Track the disk space the files tracked by DiskQueue takes up, and tes…
kev1n80 Jun 1, 2021
48c7a85
Update totalBytes and writePos when writing number of messages, and r…
kev1n80 Jun 1, 2021
652b548
Remove the additional 8 bytes to totalBytes.
kev1n80 Jun 1, 2021
b07cec4
Revert "Remove the additional 8 bytes to totalBytes."
kev1n80 Jun 2, 2021
5968b13
Revert "Update totalBytes and writePos when writing number of message…
kev1n80 Jun 2, 2021
53c0e47
Revert changes that removed the additional 8 bytes.
kev1n80 Jun 2, 2021
a3ce86a
Add comment to make code more readable.
kev1n80 Jun 2, 2021
3ed7338
Add test that completes the write file by meeting the file size limit…
kev1n80 Jun 2, 2021
30c6ef6
Add extra testing to validate the increment/decrement of bytes in cor…
kev1n80 Jun 2, 2021
76a0ddd
Merge branch 'TrackDiskSize' into DepthImpl
kev1n80 Jun 2, 2021
e9b23ac
Update variable names.
kev1n80 Jun 2, 2021
7acba5f
Revert "Update variable names."
kev1n80 Jun 2, 2021
7032086
Merge branch 'DepthImpl' of https://github.com/kev1n80/go-diskqueue i…
kev1n80 Jun 2, 2021
92819e1
Revert "Merge branch 'TrackDiskSize' into DepthImpl"
kev1n80 Jun 2, 2021
2616c7e
Update variable names.
kev1n80 Jun 2, 2021
5cd6288
Replace 8 with a constant to improve readability.
kev1n80 Jun 3, 2021
24fe56a
Reset Diskqueue data during readError.
kev1n80 Jun 9, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 121 additions & 40 deletions diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ import (
type LogLevel int

const (
DEBUG = LogLevel(1)
INFO = LogLevel(2)
WARN = LogLevel(3)
ERROR = LogLevel(4)
FATAL = LogLevel(5)
DEBUG = LogLevel(1)
INFO = LogLevel(2)
WARN = LogLevel(3)
ERROR = LogLevel(4)
FATAL = LogLevel(5)
numFileMsgsBytes = 8
)

type AppLogFunc func(lvl LogLevel, f string, args ...interface{})
Expand Down Expand Up @@ -58,17 +59,20 @@ type diskQueue struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms

// run-time state (also persisted to disk)
readPos int64
writePos int64
readFileNum int64
writeFileNum int64
depth int64
readPos int64
writePos int64
readFileNum int64
writeFileNum int64
readMessages int64
writeMessages int64
depth int64

sync.RWMutex

// instantiation time metadata
name string
dataPath string
maxBytesDiskSize int64
maxBytesPerFile int64 // cannot change once created
maxBytesPerFileRead int64
minMsgSize int32
Expand Down Expand Up @@ -101,40 +105,69 @@ type diskQueue struct {
exitSyncChan chan int

logf AppLogFunc

// disk limit implementation flag
enableDiskLimitation bool
}

// New instantiates an instance of diskQueue, retrieving metadata
// from the filesystem and starting the read ahead goroutine
func New(name string, dataPath string, maxBytesPerFile int64,
minMsgSize int32, maxMsgSize int32,
syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {
d := diskQueue{
name: name,
dataPath: dataPath,
maxBytesPerFile: maxBytesPerFile,
minMsgSize: minMsgSize,
maxMsgSize: maxMsgSize,
readChan: make(chan []byte),
depthChan: make(chan int64),
writeChan: make(chan []byte),
writeResponseChan: make(chan error),
emptyChan: make(chan int),
emptyResponseChan: make(chan error),
exitChan: make(chan int),
exitSyncChan: make(chan int),
syncEvery: syncEvery,
syncTimeout: syncTimeout,
logf: logf,

return NewWithDiskSize(name, dataPath,
0, maxBytesPerFile,
minMsgSize, maxMsgSize,
syncEvery, syncTimeout, logf)
}

// Another constructor that allows users to use Disk Size Limit feature
// If user is not using Disk Size Limit feature, maxBytesDiskSize will
// be 0
func NewWithDiskSize(name string, dataPath string,
maxBytesDiskSize int64, maxBytesPerFile int64,
minMsgSize int32, maxMsgSize int32,
syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {
enableDiskLimitation := true
if maxBytesDiskSize <= 0 {
maxBytesDiskSize = 0
enableDiskLimitation = false
}
d := diskQueue{
name: name,
dataPath: dataPath,
maxBytesDiskSize: maxBytesDiskSize,
maxBytesPerFile: maxBytesPerFile,
minMsgSize: minMsgSize,
maxMsgSize: maxMsgSize,
readChan: make(chan []byte),
depthChan: make(chan int64),
writeChan: make(chan []byte),
writeResponseChan: make(chan error),
emptyChan: make(chan int),
emptyResponseChan: make(chan error),
exitChan: make(chan int),
exitSyncChan: make(chan int),
syncEvery: syncEvery,
syncTimeout: syncTimeout,
logf: logf,
enableDiskLimitation: enableDiskLimitation,
}

d.start()
return &d
}

// Get the last known state of DiskQueue from metadata and start ioLoop
func (d *diskQueue) start() {
// no need to lock here, nothing else could possibly be touching this instance
err := d.retrieveMetaData()
if err != nil && !os.IsNotExist(err) {
d.logf(ERROR, "DISKQUEUE(%s) failed to retrieveMetaData - %s", d.name, err)
}

go d.ioLoop()
return &d
}

// Depth returns the depth of the queue
Expand Down Expand Up @@ -266,6 +299,8 @@ func (d *diskQueue) skipToNextRWFile() error {
d.nextReadFileNum = d.writeFileNum
d.nextReadPos = 0
d.depth = 0
d.readMessages = 0
d.writeMessages = 0

return err
}
Expand Down Expand Up @@ -301,6 +336,10 @@ func (d *diskQueue) readOne() ([]byte, error) {
stat, err := d.readFile.Stat()
if err == nil {
d.maxBytesPerFileRead = stat.Size()
if d.enableDiskLimitation {
// last 8 bytes are reserved for the number of messages in this file
d.maxBytesPerFileRead -= numFileMsgsBytes
}
}
}

Expand Down Expand Up @@ -383,6 +422,8 @@ func (d *diskQueue) writeOne(data []byte) error {
return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize)
}

// add all data to writeBuf before writing to file
// this causes everything to be written to file or nothing
d.writeBuf.Reset()
err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
if err != nil {
Expand All @@ -394,6 +435,17 @@ func (d *diskQueue) writeOne(data []byte) error {
return err
}

totalBytes := int64(4 + dataLen)

// check if we reached the file size limit with this message
if d.enableDiskLimitation && d.writePos+totalBytes+numFileMsgsBytes >= d.maxBytesPerFile {
// write number of messages in binary to file
err = binary.Write(&d.writeBuf, binary.BigEndian, d.writeMessages+1)
if err != nil {
return err
}
}

// only write to the file once
_, err = d.writeFile.Write(d.writeBuf.Bytes())
if err != nil {
Expand All @@ -402,17 +454,25 @@ func (d *diskQueue) writeOne(data []byte) error {
return err
}

totalBytes := int64(4 + dataLen)
d.writePos += totalBytes
d.depth += 1

if d.writePos >= d.maxBytesPerFile {
fileSize := d.writePos

if d.enableDiskLimitation {
// save space for the number of messages in this file
fileSize += numFileMsgsBytes
d.writeMessages += 1
}

if fileSize >= d.maxBytesPerFile {
if d.readFileNum == d.writeFileNum {
d.maxBytesPerFileRead = d.writePos
}

d.writeFileNum++
d.writePos = 0
d.writeMessages = 0

// sync every time we start writing to a new file
err = d.sync()
Expand Down Expand Up @@ -461,15 +521,23 @@ func (d *diskQueue) retrieveMetaData() error {
}
defer f.Close()

var depth int64
_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
&depth,
&d.readFileNum, &d.readPos,
&d.writeFileNum, &d.writePos)
// if user is using disk size limit feature
if d.enableDiskLimitation {
_, err = fmt.Fscanf(f, "%d\n%d,%d,%d\n%d,%d,%d\n",
&d.depth,
&d.readFileNum, &d.readMessages, &d.readPos,
&d.writeFileNum, &d.writeMessages, &d.writePos)
} else {
_, err = fmt.Fscanf(f, "%d\n%d,%d\n%d,%d\n",
&d.depth,
&d.readFileNum, &d.readPos,
&d.writeFileNum, &d.writePos)
}

if err != nil {
return err
}
d.depth = depth

d.nextReadFileNum = d.readFileNum
d.nextReadPos = d.readPos

Expand All @@ -490,10 +558,18 @@ func (d *diskQueue) persistMetaData() error {
return err
}

_, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
d.depth,
d.readFileNum, d.readPos,
d.writeFileNum, d.writePos)
// if user is using disk size limit feature
if d.enableDiskLimitation {
_, err = fmt.Fprintf(f, "%d\n%d,%d,%d\n%d,%d,%d\n",
d.depth,
d.readFileNum, d.readMessages, d.readPos,
d.writeFileNum, d.writeMessages, d.writePos)
} else {
_, err = fmt.Fprintf(f, "%d\n%d,%d\n%d,%d\n",
d.depth,
d.readFileNum, d.readPos,
d.writeFileNum, d.writePos)
}
if err != nil {
f.Close()
return err
Expand Down Expand Up @@ -558,9 +634,12 @@ func (d *diskQueue) moveForward() {
d.readFileNum = d.nextReadFileNum
d.readPos = d.nextReadPos
d.depth -= 1
d.readMessages += 1

// see if we need to clean up the old file
if oldReadFileNum != d.nextReadFileNum {
d.readMessages = 0

// sync every time we start reading from a new file
d.needSync = true

Expand All @@ -585,6 +664,7 @@ func (d *diskQueue) handleReadError() {
}
d.writeFileNum++
d.writePos = 0
d.writeMessages = 0
}

badFn := d.fileName(d.readFileNum)
Expand All @@ -605,6 +685,7 @@ func (d *diskQueue) handleReadError() {
d.readPos = 0
d.nextReadFileNum = d.readFileNum
d.nextReadPos = 0
d.readMessages = 0

// significant state change, schedule a sync on the next iteration
d.needSync = true
Expand Down
Loading