@@ -553,7 +553,7 @@ func (d *diskQueue) getAllBadFileInfo() ([]os.FileInfo, error) {
553
553
}
554
554
555
555
// get the accurate total non-"bad" file size
556
- func (d * diskQueue ) updateTotalDiskSpaceUsed () error {
556
+ func (d * diskQueue ) updateTotalDiskSpaceUsed () {
557
557
d .totalDiskSpaceUsed = maxMetaDataFileSize
558
558
559
559
updateTotalDiskSpaceUsed := func (fileInfo os.FileInfo ) error {
@@ -565,7 +565,10 @@ func (d *diskQueue) updateTotalDiskSpaceUsed() error {
565
565
return nil
566
566
}
567
567
568
- return d .walkDiskQueueDir (updateTotalDiskSpaceUsed )
568
+ err := d .walkDiskQueueDir (updateTotalDiskSpaceUsed )
569
+ if err != nil {
570
+ d .logf (ERROR , "DISKQUEUE(%s) failed to update write bytes - %s" , d .name , err )
571
+ }
569
572
}
570
573
571
574
func (d * diskQueue ) freeDiskSpace (expectedBytesIncrease int64 ) error {
@@ -936,6 +939,11 @@ func (d *diskQueue) handleReadError() {
936
939
}
937
940
d .writeFileNum ++
938
941
d .writePos = 0
942
+
943
+ if d .enableDiskLimitation {
944
+ d .totalDiskSpaceUsed = 0
945
+ d .writeMessages = 0
946
+ }
939
947
}
940
948
941
949
badFn := d .fileName (d .readFileNum )
@@ -952,27 +960,18 @@ func (d *diskQueue) handleReadError() {
952
960
d .name , badFn , badRenameFn )
953
961
}
954
962
955
- if d .enableDiskLimitation {
956
- if d .readFileNum == d .writeFileNum {
957
- // we moved on to the next writeFile
958
- d .totalDiskSpaceUsed = 0
959
- d .writeMessages = 0
960
- } else {
961
- if err != nil {
962
- d .logf (ERROR , "DISKQUEUE(%s) failed to update write bytes - %s" , d .name , err )
963
- }
964
- }
965
-
966
- d .readMessages = 0
967
- }
968
-
969
963
d .readFileNum ++
970
964
d .readPos = 0
971
965
d .nextReadFileNum = d .readFileNum
972
966
d .nextReadPos = 0
967
+ if d .enableDiskLimitation {
968
+ d .readMessages = 0
969
+ }
973
970
974
971
// significant state change, schedule a sync on the next iteration
975
972
d .needSync = true
973
+
974
+ d .checkTailCorruption (d .depth )
976
975
}
977
976
978
977
// ioLoop provides the backend for exposing a go channel (via ReadChan())
0 commit comments