Skip to content

Commit 4610fe5

Browse files
committed
Fix compaction/shipping/deletion. Add test case for shipping and deleting blocks.
Signed-off-by: Ganesh Vernekar <[email protected]>
1 parent 10ad04d commit 4610fe5

File tree

2 files changed

+63
-12
lines changed

2 files changed

+63
-12
lines changed

pkg/ingester/ingester_v2_backfill.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -467,16 +467,22 @@ func (i *Ingester) closeOldBackfillTSDBsAndDelete(gracePeriod int64) error {
467467
return nowTimeMs - gracePeriod
468468
},
469469
func(db *userTSDB) error {
470+
// Compact the head if anything is left. Empty head will create no blocks.
471+
h := db.Head()
472+
if err := db.CompactHead(tsdb.NewRangeHead(h, h.MinTime(), h.MaxTime())); err != nil {
473+
return errors.Wrap(err, "compact head")
474+
}
475+
470476
// TODO(codesome): check for double closing.
471477
if err := db.Close(); err != nil {
472478
return errors.Wrap(err, "close backfill TSDB")
473479
}
474480

475481
unshippedBlocks, err := db.getUnshippedBlocksULID()
476-
if err != nil {
482+
if err != nil && errors.Cause(err) == os.ErrNotExist {
477483
return errors.Wrap(err, "get unshipped blocks")
478484
}
479-
if len(unshippedBlocks) > 0 {
485+
if err != nil || len(unshippedBlocks) > 0 {
480486
// Ship the unshipped blocks.
481487
uploaded, err := db.shipper.Sync(context.Background())
482488
if err != nil {
@@ -519,10 +525,7 @@ func (i *Ingester) runOnBucketsBefore(deleteBucket bool, gracePeriodFunc func(t
519525
// There is no main TSDB. So use the maxt of the last bucket.
520526
cutoffTime = gracePeriodFunc(userBuckets.buckets[len(userBuckets.buckets)-1].db.Head().MaxTime())
521527
}
522-
for _, bucket := range userBuckets.buckets {
523-
if bucket.bucketEnd >= cutoffTime {
524-
break
525-
}
528+
if userBuckets.buckets[0].bucketEnd < cutoffTime {
526529
usersHavingOldTSDBs = append(usersHavingOldTSDBs, tempType{
527530
userID: userID,
528531
cutoffTime: cutoffTime,

pkg/ingester/ingester_v2_test.go

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package ingester
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"io"
78
"io/ioutil"
@@ -26,6 +27,7 @@ import (
2627
"github.com/stretchr/testify/assert"
2728
"github.com/stretchr/testify/mock"
2829
"github.com/stretchr/testify/require"
30+
"github.com/thanos-io/thanos/pkg/shipper"
2931
"github.com/weaveworks/common/httpgrpc"
3032
"github.com/weaveworks/common/middleware"
3133
"github.com/weaveworks/common/user"
@@ -1294,6 +1296,8 @@ func TestIngester_shipBlocks(t *testing.T) {
12941296
}
12951297

12961298
type shipperMock struct {
1299+
db *userTSDB
1300+
uploaded int
12971301
mock.Mock
12981302
}
12991303

@@ -1769,7 +1773,7 @@ func TestIngester_CloseTSDBsOnShutdown(t *testing.T) {
17691773
require.Nil(t, db)
17701774
}
17711775

1772-
func TestIngesterV2BackfillPushAndQuery(t *testing.T) {
1776+
func TestIngesterV2BackfillCycle(t *testing.T) {
17731777
cfg := defaultIngesterTestConfig()
17741778
cfg.LifecyclerConfig.JoinAfter = 0
17751779
backfillLimit := 12 * time.Hour
@@ -1898,18 +1902,36 @@ func TestIngesterV2BackfillPushAndQuery(t *testing.T) {
18981902

18991903
// Compact old blocks partially to check
19001904
// * Compaction is happening properly.
1901-
// * We can still query it.
1905+
// * We can still query it while we have a mix of compacted and uncompacted TSDBs.
19021906

1903-
// Check there are no blocks yet.
1907+
// Check there are no blocks yet and attach and mock shipper.
19041908
userBuckets := i.TSDBState.backfillDBs.getBucketsForUser(userID)
19051909
for _, bucket := range userBuckets.buckets {
19061910
require.Equal(t, 0, len(bucket.db.Blocks()))
1907-
m := &shipperMock{}
1908-
m.On("Sync", mock.Anything).Return(0, nil)
1911+
1912+
m := &shipperMock{
1913+
db: bucket.db,
1914+
}
1915+
m.On("Sync", mock.Anything).Run(func(args mock.Arguments) {
1916+
var shipperMeta shipper.Meta
1917+
shipperMeta.Version = shipper.MetaVersion1
1918+
for _, block := range m.db.Blocks() {
1919+
shipperMeta.Uploaded = append(shipperMeta.Uploaded, block.Meta().ULID)
1920+
}
1921+
1922+
b, err := json.Marshal(&shipperMeta)
1923+
if err != nil {
1924+
return
1925+
}
1926+
1927+
path := filepath.Join(m.db.Dir(), shipper.MetaFilename)
1928+
_ = ioutil.WriteFile(path, b, os.ModePerm)
1929+
m.uploaded = len(shipperMeta.Uploaded)
1930+
}).Return(1, nil)
19091931
bucket.db.shipper = m
19101932
}
19111933

1912-
// Compacting the oldest 2 buckets. They are <=97h, so compacting buckets upto 97.5h (current 100h minus 2.5h).
1934+
// Compacting the oldest 2 buckets. They are <=97h, so compacting buckets upto 97.5h (current 100h minus 97.5h is the grace period).
19131935
require.NoError(t, i.compactOldBackfillTSDBsAndShip(2*time.Hour.Milliseconds()+30*time.Minute.Milliseconds()))
19141936
for idx, bucket := range userBuckets.buckets {
19151937
if idx < 2 {
@@ -1921,4 +1943,30 @@ func TestIngesterV2BackfillPushAndQuery(t *testing.T) {
19211943

19221944
// We can still query compacted blocks.
19231945
testQuery()
1946+
1947+
copiedBuckets := append([]*tsdbBucket{}, userBuckets.buckets...)
1948+
1949+
// Closing the old TSDBs and deleting them. Starting with the shipped blocks.
1950+
// Closing is based on current time. Hence grace period is w.r.t. current time.
1951+
nowTimeMs := time.Now().Unix() * 1000
1952+
gracePeriod := nowTimeMs - (97*time.Hour.Milliseconds() + 30*time.Minute.Milliseconds())
1953+
require.NoError(t, i.closeOldBackfillTSDBsAndDelete(gracePeriod))
1954+
require.Equal(t, 2, len(userBuckets.buckets))
1955+
for idx, bucket := range userBuckets.buckets {
1956+
require.Equal(t, 0, len(bucket.db.Blocks()), "bucket index %d", idx)
1957+
}
1958+
1959+
// Delete the pending TSDBs.
1960+
gracePeriod = nowTimeMs - (100 * time.Hour.Milliseconds())
1961+
require.NoError(t, i.closeOldBackfillTSDBsAndDelete(gracePeriod))
1962+
require.Equal(t, 0, len(userBuckets.buckets))
1963+
1964+
// Check from the copied buckets if all of them had at 1 shipped block.
1965+
// The last 2 buckets were not compacted explicitly before.
1966+
for idx, buk := range copiedBuckets {
1967+
s, ok := buk.db.shipper.(*shipperMock)
1968+
require.True(t, ok)
1969+
require.Equal(t, 1, s.uploaded, "uploaded - bucket index %d", idx)
1970+
require.Equal(t, 1, len(buk.db.Blocks()), "blocks - bucket index %d", idx)
1971+
}
19241972
}

0 commit comments

Comments
 (0)