Skip to content

Commit 0c456b4

Browse files
authored
Merge branch 'master' into DiskSizeLimit
2 parents a821855 + 5c1f588 commit 0c456b4

File tree

2 files changed

+30
-6
lines changed

2 files changed

+30
-6
lines changed

diskqueue.go

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ type diskQueue struct {
7979
// instantiation time metadata
8080
name string
8181
dataPath string
82-
maxBytesDiskSpace int64
82+
maxBytesDiskSize int64
8383
maxBytesPerFile int64 // cannot change once created
8484
maxBytesPerFileRead int64
8585
minMsgSize int32
@@ -94,6 +94,10 @@ type diskQueue struct {
9494
nextReadPos int64
9595
nextReadFileNum int64
9696

97+
// keep track of the msg size we have read
98+
// (but not yet sent over readChan)
99+
readMsgSize int32
100+
97101
readFile *os.File
98102
writeFile *os.File
99103
reader *bufio.Reader
@@ -123,17 +127,17 @@ func New(name string, dataPath string, maxBytesPerFile int64,
123127
minMsgSize int32, maxMsgSize int32,
124128
syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {
125129

126-
return NewWithDiskSpace(name, dataPath,
130+
return NewWithDiskSize(name, dataPath,
127131
0, maxBytesPerFile,
128132
minMsgSize, maxMsgSize,
129133
syncEvery, syncTimeout, logf)
130134
}
131135

132-
// Another constructor that allows users to use Disk Space Limit feature
133-
// If user is not using Disk Space Limit feature, maxBytesDiskSpace will
136+
// Another constructor that allows users to use Disk Size Limit feature
137+
// If user is not using Disk Size Limit feature, maxBytesDiskSize will
134138
// be 0
135-
func NewWithDiskSpace(name string, dataPath string,
136-
maxBytesDiskSpace int64, maxBytesPerFile int64,
139+
func NewWithDiskSize(name string, dataPath string,
140+
maxBytesDiskSize int64, maxBytesPerFile int64,
137141
minMsgSize int32, maxMsgSize int32,
138142
syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {
139143
enableDiskLimitation := true
@@ -967,6 +971,24 @@ func (d *diskQueue) handleReadError() {
967971
d.name, badFn, badRenameFn)
968972
}
969973

974+
if d.enableDiskLimitation {
975+
var badFileSize int64
976+
if d.readFileNum == d.writeFileNum {
977+
badFileSize = d.writeBytes
978+
} else {
979+
var stat os.FileInfo
980+
stat, err = os.Stat(badRenameFn)
981+
if err == nil {
982+
badFileSize = stat.Size()
983+
} else {
984+
// max file size
985+
badFileSize = int64(d.maxMsgSize) + d.maxBytesPerFile + 4 + numFileMsgsBytes
986+
}
987+
}
988+
989+
d.writeBytes -= badFileSize
990+
}
991+
970992
d.readFileNum++
971993
d.readPos = 0
972994
d.nextReadFileNum = d.readFileNum

diskqueue_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,7 @@ completeWriteFileAgain:
499499
// test the writeFileNum correctly increments
500500
d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true)
501501
if d.depth == 7 &&
502+
d.writeBytes == 5068 &&
502503
d.readFileNum == 1 &&
503504
d.writeFileNum == 3 &&
504505
d.readMessages == 0 &&
@@ -532,6 +533,7 @@ completeReadFileAgain:
532533
// test the readFileNum correctly increments
533534
d := readMetaDataFile(dq.(*diskQueue).metaDataFileName(), 0, true)
534535
if d.depth == 0 &&
536+
d.writeBytes == 0 &&
535537
d.readFileNum == 3 &&
536538
d.writeFileNum == 3 &&
537539
d.readMessages == 0 &&

0 commit comments

Comments
 (0)