Skip to content

Add retention time to keep data during the last X hours. #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: RetentionTime
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions diskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"path"
"regexp"
"sync"
"syscall"
"time"
)

Expand Down Expand Up @@ -58,6 +59,8 @@ type Interface interface {
Depth() int64
Empty() error
TotalBytesFolderSize() int64
setRetentionTime(time.Duration) // this is used for testing only
getCreateTime(int64) (time.Time, error) // this is used for testing only
}

// diskQueue implements a filesystem backed FIFO queue
Expand All @@ -80,6 +83,7 @@ type diskQueue struct {
name string
dataPath string
maxBytesDiskSpace int64
retentionTime time.Duration
maxBytesPerFile int64 // cannot change once created
maxBytesPerFileRead int64
minMsgSize int32
Expand Down Expand Up @@ -124,7 +128,7 @@ func New(name string, dataPath string, maxBytesPerFile int64,
syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {

return NewWithDiskSpace(name, dataPath,
0, maxBytesPerFile,
0, time.Second*0, maxBytesPerFile,
minMsgSize, maxMsgSize,
syncEvery, syncTimeout, logf)
}
Expand All @@ -133,7 +137,7 @@ func New(name string, dataPath string, maxBytesPerFile int64,
// If user is not using Disk Space Limit feature, maxBytesDiskSpace will
// be 0
func NewWithDiskSpace(name string, dataPath string,
maxBytesDiskSpace int64, maxBytesPerFile int64,
maxBytesDiskSpace int64, retentionTime time.Duration, maxBytesPerFile int64,
minMsgSize int32, maxMsgSize int32,
syncEvery int64, syncTimeout time.Duration, logf AppLogFunc) Interface {
enableDiskLimitation := true
Expand All @@ -144,6 +148,7 @@ func NewWithDiskSpace(name string, dataPath string,
name: name,
dataPath: dataPath,
maxBytesDiskSpace: maxBytesDiskSpace,
retentionTime: retentionTime,
maxBytesPerFile: maxBytesPerFile,
minMsgSize: minMsgSize,
maxMsgSize: maxMsgSize,
Expand Down Expand Up @@ -349,6 +354,16 @@ func (d *diskQueue) skipToNextRWFile() error {
return err
}

func (d *diskQueue) getCreateTime(readFileNum int64) (time.Time, error) {
info, err := os.Stat(d.fileName(readFileNum))
if err != nil {
return time.Now(), err
}
stat := info.Sys().(*syscall.Stat_t)
createTime := time.Unix(int64(stat.Ctim.Sec), int64(stat.Ctim.Nsec))
return createTime, nil
}

// readOne performs a low level filesystem read for a single []byte
// while advancing read positions and rolling files, if necessary
func (d *diskQueue) readOne() ([]byte, error) {
Expand Down Expand Up @@ -586,7 +601,14 @@ func (d *diskQueue) freeDiskSpace(expectedBytesIncrease int64) error {
d.removeBadFile(badFileInfo)
}
for d.readFileNum <= d.writeFileNum {
if d.totalDiskSpaceUsed+expectedBytesIncrease <= d.maxBytesDiskSpace {
outdated := false
createTime, err := d.getCreateTime(d.readFileNum)
if err == nil {
if time.Since(createTime) > d.retentionTime {
outdated = true
}
}
if d.totalDiskSpaceUsed+expectedBytesIncrease <= d.maxBytesDiskSpace && !outdated {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The outdated files are deleted when diskqueue reaches size limit. in other words, they stay and will still be read as long as total size is within limit. In this case there seems not much improvement on setting total size limit alone.

I think it makes more sense (if a retention time is set) that expired files would not even be read and be deleted as soon as they expire. so that total size limit and the time limit both serves to reduce space usage and which ever comes first would take effect.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 I was wondering if you could check retention size:

  • within the ioloop inside the select using a time.NewTicker() similar to syncTicker. The ticker will start if it is null and a log file exists, for the duration of retentionTime - (currentTime - oldestFileCreateTime). When the ticker goes off it will iterate starting from the oldest file and delete outdated files.
  • before reading a log from a file in readOne()

This way. as Leon mentioned above, the retention size and retention time both serve to reduce space usage and whichever occurs first would take effect.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to use this now and go with the current approach I would suggest either:

  • use a name other than retentionTime, because it's only enforcing retention time in the size limit exceeding case;
  • or, keep the retentionTime name but add more comments/descriptions and a TODO as a reminder to implement the other half (enforcing retention time under the size limit).

return nil
}
// delete the read file (make space)
Expand Down Expand Up @@ -975,6 +997,11 @@ func (d *diskQueue) handleReadError() {
d.checkTailCorruption(d.depth)
}

// This is only used for testing.
func (d *diskQueue) setRetentionTime(time time.Duration) {
d.retentionTime = time
}

// ioLoop provides the backend for exposing a go channel (via ReadChan())
// in support of multiple concurrent queue consumers
//
Expand Down
110 changes: 106 additions & 4 deletions diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
"time"
)

const (
largeTimePeriod = time.Hour * 1000
)

func Equal(t *testing.T, expected, actual interface{}) {
if !reflect.DeepEqual(expected, actual) {
_, file, line, _ := runtime.Caller(1)
Expand Down Expand Up @@ -363,7 +367,7 @@ func TestDiskQueueSyncAfterReadWithDiskSizeImplementation(t *testing.T) {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := NewWithDiskSpace(dqName, tmpDir, 6040, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l)
dq := NewWithDiskSpace(dqName, tmpDir, 6040, largeTimePeriod, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l)
defer dq.Close()

msgSize := 1000
Expand Down Expand Up @@ -557,7 +561,7 @@ func TestDiskSizeImplementationDiskSizeLimit(t *testing.T) {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := NewWithDiskSpace(dqName, tmpDir, 6040, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l)
dq := NewWithDiskSpace(dqName, tmpDir, 6040, largeTimePeriod, 1<<11, 0, 1<<10, 2500, 50*time.Millisecond, l)
defer dq.Close()

msgSize := 1000
Expand Down Expand Up @@ -633,6 +637,104 @@ surpassDiskSizeLimit:
done:
}

func TestDiskSizeRetentionTimeZero(t *testing.T) {
l := NewTestLogger(t)
dqName := "test_disk_queue_retention_time_zero" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)

msgSize := 100
numMsg := 3
maxBytesPerFile := int64(msgSize + 4 + numFileMsgBytes)
maxBytesDiskSpace := int64(maxBytesPerFile*int64(numMsg) + maxMetaDataFileSize)
dq := NewWithDiskSpace(dqName, tmpDir, maxBytesDiskSpace, largeTimePeriod, maxBytesPerFile, 0, int32(msgSize), 1, 50*time.Millisecond, l)
defer dq.Close()

msg := make([]byte, msgSize)

for i := 0; i < numMsg; i++ {
dq.Put(msg)
}
if dq.Depth() != int64(numMsg) {
panic("fail.")
}
dq.setRetentionTime(time.Second * 0)
dq.Put(msg)
if dq.Depth() != 0 {
panic("fail.")
}
}

func TestDiskSizeRetentionTimeWithMultipleFiles(t *testing.T) {
l := NewTestLogger(t)
dqName := "test_disk_queue_retention_time_with_multiple_files" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)

msgSize := 100
numMsg := 3
maxBytesPerFile := int64(msgSize + 4 + numFileMsgBytes)
maxBytesDiskSpace := int64(maxBytesPerFile*int64(numMsg) + maxMetaDataFileSize)
dq := NewWithDiskSpace(dqName, tmpDir, maxBytesDiskSpace, largeTimePeriod, maxBytesPerFile, 0, int32(msgSize), 1, 50*time.Millisecond, l)
defer dq.Close()

msg := make([]byte, msgSize)
for i := 0; i < numMsg; i++ {
dq.Put(msg)
time.Sleep(time.Millisecond * 50)
}

t1, err := dq.getCreateTime(1)
if err != nil {
panic("failed")
}
retentionTime := time.Since(t1.Add(time.Microsecond * 1))
dq.setRetentionTime(retentionTime)

dq.Put(msg)
if dq.Depth() != 2 {
panic("fail.")
}
}

func TestDiskSizeRetentionTimeWithSingleFile(t *testing.T) {
l := NewTestLogger(t)
dqName := "test_disk_queue_retention_time_with_single_file" + strconv.Itoa(int(time.Now().Unix()))
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)

msgSize := 100
numMsg := 3
maxBytesPerFile := int64((msgSize+4)*numMsg + numFileMsgBytes)
maxBytesDiskSpace := int64(maxBytesPerFile+maxMetaDataFileSize) + 1
dq := NewWithDiskSpace(dqName, tmpDir, maxBytesDiskSpace, largeTimePeriod, maxBytesPerFile, 0, int32(msgSize), 1, 50*time.Millisecond, l)
defer dq.Close()

msg := make([]byte, msgSize)
for i := 0; i < numMsg; i++ {
dq.Put(msg)
time.Sleep(time.Millisecond * 50)
}
if err != nil {
panic("failed")
}
dq.setRetentionTime(largeTimePeriod)

dq.Put(msg)
if dq.Depth() != 1 {
panic("fail.")
}
}

func TestDiskSizeImplementationMsgSizeGreaterThanFileSize(t *testing.T) {
// write three files

Expand All @@ -643,7 +745,7 @@ func TestDiskSizeImplementationMsgSizeGreaterThanFileSize(t *testing.T) {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := NewWithDiskSpace(dqName, tmpDir, 1<<12, 1<<10, 0, 1<<12, 2500, 50*time.Millisecond, l)
dq := NewWithDiskSpace(dqName, tmpDir, 1<<12, largeTimePeriod, 1<<10, 0, 1<<12, 2500, 50*time.Millisecond, l)
defer dq.Close()

msgSize := 1000
Expand Down Expand Up @@ -767,7 +869,7 @@ func TestDiskSizeImplementationWithBadFiles(t *testing.T) {
panic("fail")
}

dq := NewWithDiskSpace(dqName, tmpDir, 1<<12, 1<<10, 10, 1600, 2500, 50*time.Millisecond, l)
dq := NewWithDiskSpace(dqName, tmpDir, 1<<12, largeTimePeriod, 1<<10, 10, 1600, 2500, 50*time.Millisecond, l)
defer dq.Close()

msgSize := 1000
Expand Down