Skip to content

Commit 9e3b572

Browse files
author
Mikhail Faraponov
committed
Fixed issue with peekChan
1 parent 441bc51 commit 9e3b572

File tree

2 files changed

+23
-20
lines changed

2 files changed

+23
-20
lines changed

diskqueue.go

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,7 @@ func NewWithDiskSpace(name string, dataPath string,
152152
minMsgSize: minMsgSize,
153153
maxMsgSize: maxMsgSize,
154154
readChan: make(chan []byte),
155+
peekChan: make(chan []byte),
155156
depthChan: make(chan int64),
156157
writeChan: make(chan []byte),
157158
writeResponseChan: make(chan error),
@@ -648,23 +649,6 @@ func (d *diskQueue) writeOne(data []byte) error {
648649
return fmt.Errorf("invalid message write size (%d) minMsgSize=%d maxMsgSize=%d", dataLen, d.minMsgSize, d.maxMsgSize)
649650
}
650651

651-
if d.enableDiskLimitation {
652-
expectedBytesIncrease := totalBytes
653-
// check if we will reach or surpass file size limit
654-
if d.writePos+totalBytes+numFileMsgBytes >= d.maxBytesPerFile {
655-
reachedFileSizeLimit = true
656-
expectedBytesIncrease += numFileMsgBytes
657-
}
658-
659-
// free disk space if needed
660-
err = d.checkDiskSpace(expectedBytesIncrease)
661-
if err != nil {
662-
return err
663-
}
664-
} else if d.writePos+totalBytes >= d.maxBytesPerFile {
665-
reachedFileSizeLimit = true
666-
}
667-
668652
// will not wrap-around if maxBytesPerFile + maxMsgSize < Int64Max
669653
if d.writePos > 0 && d.writePos+totalBytes > d.maxBytesPerFile {
670654
if d.readFileNum == d.writeFileNum {
@@ -685,6 +669,7 @@ func (d *diskQueue) writeOne(data []byte) error {
685669
d.writeFile = nil
686670
}
687671
}
672+
688673
if d.writeFile == nil {
689674
curFileName := d.fileName(d.writeFileNum)
690675
d.writeFile, err = os.OpenFile(curFileName, os.O_RDWR|os.O_CREATE, 0600)
@@ -704,6 +689,23 @@ func (d *diskQueue) writeOne(data []byte) error {
704689
}
705690
}
706691

692+
if d.enableDiskLimitation {
693+
expectedBytesIncrease := totalBytes
694+
// check if we will reach or surpass file size limit
695+
if d.writePos+totalBytes+numFileMsgBytes >= d.maxBytesPerFile {
696+
reachedFileSizeLimit = true
697+
expectedBytesIncrease += numFileMsgBytes
698+
}
699+
700+
// free disk space if needed
701+
err = d.checkDiskSpace(expectedBytesIncrease)
702+
if err != nil {
703+
return err
704+
}
705+
} else if d.writePos+totalBytes >= d.maxBytesPerFile {
706+
reachedFileSizeLimit = true
707+
}
708+
707709
d.writeBuf.Reset()
708710
err = binary.Write(&d.writeBuf, binary.BigEndian, dataLen)
709711
if err != nil {

diskqueue_test.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ func TestDiskQueuePeek(t *testing.T) {
149149
t.Run("roll", func(t *testing.T) {
150150
for i := 0; i < 10; i++ {
151151
err := dq.Put(msg)
152+
152153
Nil(t, err)
153154
Equal(t, int64(i+1), dq.Depth())
154155
}
@@ -327,7 +328,7 @@ func TestDiskQueueCorruption(t *testing.T) {
327328
Equal(t, msg, <-dq.ReadChan())
328329
badFilesCount = numberOfBadFiles(dqName, tmpDir)
329330
if badFilesCount != 2 {
330-
panic("fail")
331+
panic(badFilesCount)
331332
}
332333

333334
// write a corrupt (len 0) message at the 5th (current) file
@@ -339,12 +340,12 @@ func TestDiskQueueCorruption(t *testing.T) {
339340

340341
Equal(t, msg, <-dq.ReadChan())
341342

342-
// conflict was here
343+
// conflict was here
343344
badFilesCount = numberOfBadFiles(dqName, tmpDir)
344345
if badFilesCount != 3 {
345346
panic("fail")
346347
}
347-
//
348+
//
348349

349350
dq.Put(msg)
350351
dq.Put(msg)

0 commit comments

Comments
 (0)