From f3b34f8baa14e501b1579b0aca6f671f29b27ce6 Mon Sep 17 00:00:00 2001 From: CatherineF-dev <78218824+CatherineF-dev@users.noreply.github.com> Date: Mon, 9 Aug 2021 13:51:16 -0400 Subject: [PATCH] Add retention time to keep data during the last X hours. Add retention time to keep data during the last X hours. --- diskqueue.go | 33 ++++++++++++-- diskqueue_test.go | 110 ++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 136 insertions(+), 7 deletions(-) diff --git a/diskqueue.go b/diskqueue.go index 8a401a3..c54e3e5 100644 --- a/diskqueue.go +++ b/diskqueue.go @@ -13,6 +13,7 @@ import ( "path" "regexp" "sync" + "syscall" "time" ) @@ -58,6 +59,8 @@ type Interface interface { Depth() int64 Empty() error TotalBytesFolderSize() int64 + setRetentionTime(time.Duration) // this is used for testing only + getCreateTime(int64) (time.Time, error) // this is used for testing only } // diskQueue implements a filesystem backed FIFO queue @@ -80,6 +83,7 @@ type diskQueue struct { name string dataPath string maxBytesDiskSpace int64 + retentionTime time.Duration maxBytesPerFile int64 // cannot change once created maxBytesPerFileRead int64 minMsgSize int32 @@ -124,7 +128,7 @@ func New(name string, dataPath string, maxBytesPerFile int64, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { return NewWithDiskSpace(name, dataPath, - 0, maxBytesPerFile, + 0, time.Second*0, maxBytesPerFile, minMsgSize, maxMsgSize, syncEvery, syncTimeout, logf) } @@ -133,7 +137,7 @@ func New(name string, dataPath string, maxBytesPerFile int64, // If user is not using Disk Space Limit feature, maxBytesDiskSpace will // be 0 func NewWithDiskSpace(name string, dataPath string, - maxBytesDiskSpace int64, maxBytesPerFile int64, + maxBytesDiskSpace int64, retentionTime time.Duration, maxBytesPerFile int64, minMsgSize int32, maxMsgSize int32, syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface { enableDiskLimitation := true @@ -144,6 +148,7 @@ func NewWithDiskSpace(name string, dataPath string, name: name, dataPath: dataPath, maxBytesDiskSpace: maxBytesDiskSpace, + retentionTime: retentionTime, maxBytesPerFile: maxBytesPerFile, minMsgSize: minMsgSize, maxMsgSize: maxMsgSize, @@ -349,6 +354,16 @@ func (d *diskQueue) skipToNextRWFile() error { return err } +func (d *diskQueue) getCreateTime(readFileNum int64) (time.Time, error) { + info, err := os.Stat(d.fileName(readFileNum)) + if err != nil { + return time.Now(), err + } + stat := info.Sys().(*syscall.Stat_t) + createTime := time.Unix(int64(stat.Ctim.Sec), int64(stat.Ctim.Nsec)) + return createTime, nil +} + // readOne performs a low level filesystem read for a single []byte // while advancing read positions and rolling files, if necessary func (d *diskQueue) readOne() ([]byte, error) { @@ -586,7 +601,14 @@ func (d *diskQueue) freeDiskSpace(expectedBytesIncrease int64) error { d.removeBadFile(badFileInfo) } for d.readFileNum <= d.writeFileNum { - if d.totalDiskSpaceUsed+expectedBytesIncrease <= d.maxBytesDiskSpace { + outdated := false + createTime, err := d.getCreateTime(d.readFileNum) + if err == nil { + if time.Since(createTime) > d.retentionTime { + outdated = true + } + } + if d.totalDiskSpaceUsed+expectedBytesIncrease <= d.maxBytesDiskSpace && !outdated { return nil } // delete the read file (make space) @@ -975,6 +997,11 @@ func (d *diskQueue) handleReadError() { d.checkTailCorruption(d.depth) } +// This is only used for testing. +func (d *diskQueue) setRetentionTime(time time.Duration) { + d.retentionTime = time +} + // ioLoop provides the backend for exposing a go channel (via ReadChan()) // in support of multiple concurrent queue consumers // diff --git a/diskqueue_test.go b/diskqueue_test.go index 4cd5239..d0ea0b6 100644 --- a/diskqueue_test.go +++ b/diskqueue_test.go @@ -18,6 +18,10 @@ import ( "time" ) +const ( + largeTimePeriod = time.Hour * 1000 +) + func Equal(t *testing.T, expected, actual interface{}) { if !reflect.DeepEqual(expected, actual) { _, file, line, _ := runtime.Caller(1) @@ -363,7 +367,7 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) { panic(err) } defer os.RemoveAll(tmpDir) - dq := NewWithDiskSpace(dqName, tmpDir, 6040, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) + dq := NewWithDiskSpace(dqName, tmpDir, 6040, largeTimePeriod, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) defer dq.Close() msgSize := 1000 @@ -557,7 +561,7 @@ func TestDiskSizeImplementationDiskSizeLimit(t *testing.T) { panic(err) } defer os.RemoveAll(tmpDir) - dq := NewWithDiskSpace(dqName, tmpDir, 6040, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) + dq := NewWithDiskSpace(dqName, tmpDir, 6040, largeTimePeriod, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l) defer dq.Close() msgSize := 1000 @@ -633,6 +637,104 @@ surpassDiskSizeLimit: done: } +func TestDiskSizeRetentionTimeZero(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_retention_time_zero" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + + msgSize := 100 + numMsg := 3 + maxBytesPerFile := int64(msgSize + 4 + numFileMsgBytes) + maxBytesDiskSpace := int64(maxBytesPerFile*int64(numMsg) + maxMetaDataFileSize) + dq := NewWithDiskSpace(dqName, tmpDir, maxBytesDiskSpace, largeTimePeriod, maxBytesPerFile, 0, int32(msgSize), 1, 50*time.Millisecond, l) + defer dq.Close() + + msg := make([]byte, msgSize) + + for i := 0; i < numMsg; i++ { + dq.Put(msg) + } + if dq.Depth() != int64(numMsg) { + panic("fail.") + } + dq.setRetentionTime(time.Second * 0) + dq.Put(msg) + if dq.Depth() != 0 { + panic("fail.") + } +} + +func TestDiskSizeRetentionTimeWithMultipleFiles(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_retention_time_with_multiple_files" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + + msgSize := 100 + numMsg := 3 + maxBytesPerFile := int64(msgSize + 4 + numFileMsgBytes) + maxBytesDiskSpace := int64(maxBytesPerFile*int64(numMsg) + maxMetaDataFileSize) + dq := NewWithDiskSpace(dqName, tmpDir, maxBytesDiskSpace, largeTimePeriod, maxBytesPerFile, 0, int32(msgSize), 1, 50*time.Millisecond, l) + defer dq.Close() + + msg := make([]byte, msgSize) + for i := 0; i < numMsg; i++ { + dq.Put(msg) + time.Sleep(time.Millisecond * 50) + } + + t1, err := dq.getCreateTime(1) + if err != nil { + panic("failed") + } + retentionTime := time.Since(t1.Add(time.Microsecond * 1)) + dq.setRetentionTime(retentionTime) + + dq.Put(msg) + if dq.Depth() != 2 { + panic("fail.") + } +} + +func TestDiskSizeRetentionTimeWithSingleFile(t *testing.T) { + l := NewTestLogger(t) + dqName := "test_disk_queue_retention_time_with_single_file" + strconv.Itoa(int(time.Now().Unix())) + tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano())) + if err != nil { + panic(err) + } + defer os.RemoveAll(tmpDir) + + msgSize := 100 + numMsg := 3 + maxBytesPerFile := int64((msgSize+4)*numMsg + numFileMsgBytes) + maxBytesDiskSpace := int64(maxBytesPerFile+maxMetaDataFileSize) + 1 + dq := NewWithDiskSpace(dqName, tmpDir, maxBytesDiskSpace, largeTimePeriod, maxBytesPerFile, 0, int32(msgSize), 1, 50*time.Millisecond, l) + defer dq.Close() + + msg := make([]byte, msgSize) + for i := 0; i < numMsg; i++ { + dq.Put(msg) + time.Sleep(time.Millisecond * 50) + } + if err != nil { + panic("failed") + } + dq.setRetentionTime(largeTimePeriod) + + dq.Put(msg) + if dq.Depth() != 1 { + panic("fail.") + } +} + func TestDiskSizeImplementationMsgSizeGreaterThanFileSize(t *testing.T) { // write three files @@ -643,7 +745,7 @@ func TestDiskSizeImplementationMsgSizeGreaterThanFileSize(t *testing.T) { panic(err) } defer os.RemoveAll(tmpDir) - dq := NewWithDiskSpace(dqName, tmpDir, 1<<12, 1<<10, 0, 1<<12, 2500, 50*time.Millisecond, l) + dq := NewWithDiskSpace(dqName, tmpDir, 1<<12, largeTimePeriod, 1<<10, 0, 1<<12, 2500, 50*time.Millisecond, l) defer dq.Close() msgSize := 1000 @@ -767,7 +869,7 @@ func TestDiskSizeImplementationWithBadFiles(t *testing.T) { panic("fail") } - dq := NewWithDiskSpace(dqName, tmpDir, 1<<12, 1<<10, 10, 1600, 2500, 50*time.Millisecond, l) + dq := NewWithDiskSpace(dqName, tmpDir, 1<<12, largeTimePeriod, 1<<10, 10, 1600, 2500, 50*time.Millisecond, l) defer dq.Close() msgSize := 1000