Skip to content

Commit 10ad04d

Browse files
committed
Test old block compaction. Bunch of sorting, infinite loop and other fixes.
Signed-off-by: Ganesh Vernekar <[email protected]>
1 parent 852f415 commit 10ad04d

File tree

3 files changed

+82
-51
lines changed

3 files changed

+82
-51
lines changed

pkg/ingester/ingester_v2.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ func (u *userTSDB) getShippedBlocksULID() ([]ulid.ULID, error) {
132132
if err := json.Unmarshal(b, &shipperMeta); err != nil {
133133
return nil, errors.Wrap(err, "unmarshal shipper meta file to json")
134134
}
135-
136135
return shipperMeta.Uploaded, nil
137136
}
138137

pkg/ingester/ingester_v2_backfill.go

Lines changed: 43 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (i *Ingester) getOrCreateBackfillTSDB(userBuckets *tsdbBuckets, userID stri
176176
}
177177
userBuckets.buckets = append(userBuckets.buckets, bucket)
178178
sort.Slice(userBuckets.buckets, func(i, j int) bool {
179-
return userBuckets.buckets[i].bucketStart < userBuckets.buckets[i].bucketEnd
179+
return userBuckets.buckets[i].bucketStart < userBuckets.buckets[j].bucketStart
180180
})
181181

182182
return bucket, nil
@@ -286,7 +286,7 @@ func (i *Ingester) openExistingBackfillTSDB(ctx context.Context) error {
286286
for _, buckets := range i.TSDBState.backfillDBs.tsdbs {
287287
buckets.Lock()
288288
sort.Slice(buckets.buckets, func(i, j int) bool {
289-
return buckets.buckets[i].bucketStart < buckets.buckets[i].bucketEnd
289+
return buckets.buckets[i].bucketStart < buckets.buckets[j].bucketStart
290290
})
291291
buckets.Unlock()
292292
}
@@ -494,63 +494,72 @@ func (i *Ingester) closeOldBackfillTSDBsAndDelete(gracePeriod int64) error {
494494
}
495495

496496
func (i *Ingester) runOnBucketsBefore(deleteBucket bool, gracePeriodFunc func(t int64) int64, f func(db *userTSDB) error) error {
497-
i.TSDBState.backfillDBs.closeAndShipMtx.Lock()
498-
defer i.TSDBState.backfillDBs.closeAndShipMtx.Unlock()
497+
i.TSDBState.backfillDBs.compactShipDeleteMtx.Lock()
498+
defer i.TSDBState.backfillDBs.compactShipDeleteMtx.Unlock()
499499

500500
type tempType struct {
501-
userID string
502-
buckets *tsdbBuckets
501+
userID string
502+
cutoffTime int64
503+
buckets *tsdbBuckets
503504
}
504505

505506
var usersHavingOldTSDBs []tempType
506507

507508
// Collecting users who have old TSDBs.
508509
i.TSDBState.backfillDBs.tsdbsMtx.RLock()
509510
for userID, userBuckets := range i.TSDBState.backfillDBs.tsdbs {
511+
cutoffTime := int64(0)
512+
i.userStatesMtx.RLock()
513+
mainDB := i.TSDBState.dbs[userID]
514+
i.userStatesMtx.RUnlock()
510515
userBuckets.RLock()
516+
if mainDB != nil {
517+
cutoffTime = gracePeriodFunc(mainDB.Head().MaxTime())
518+
} else {
519+
// There is no main TSDB. So use the maxt of the last bucket.
520+
cutoffTime = gracePeriodFunc(userBuckets.buckets[len(userBuckets.buckets)-1].db.Head().MaxTime())
521+
}
511522
for _, bucket := range userBuckets.buckets {
512-
if bucket.bucketEnd < gracePeriodFunc(bucket.db.Head().MaxTime()) {
513-
usersHavingOldTSDBs = append(usersHavingOldTSDBs, tempType{
514-
userID: userID,
515-
buckets: userBuckets,
516-
})
523+
if bucket.bucketEnd >= cutoffTime {
517524
break
518525
}
526+
usersHavingOldTSDBs = append(usersHavingOldTSDBs, tempType{
527+
userID: userID,
528+
cutoffTime: cutoffTime,
529+
buckets: userBuckets,
530+
})
519531
}
520532
userBuckets.RUnlock()
521533
}
522534
i.TSDBState.backfillDBs.tsdbsMtx.RUnlock()
523535

524536
var merr tsdb_errors.MultiError
525537
for _, user := range usersHavingOldTSDBs {
538+
idx := 0
526539
for {
527-
user.buckets.Lock()
528-
if len(user.buckets.buckets) == 0 {
529-
user.buckets.Unlock()
540+
user.buckets.RLock()
541+
if len(user.buckets.buckets) == 0 || idx == len(user.buckets.buckets) {
542+
user.buckets.RUnlock()
530543
break
531544
}
532-
bucket := user.buckets.buckets[0]
533-
if bucket.bucketEnd >= gracePeriodFunc(bucket.db.Head().MaxTime()) {
534-
user.buckets.Unlock()
545+
bucket := user.buckets.buckets[idx]
546+
if bucket.bucketEnd >= user.cutoffTime {
547+
user.buckets.RUnlock()
535548
break
536549
}
537-
if deleteBucket {
538-
user.buckets.buckets = user.buckets.buckets[1:]
539-
}
540-
user.buckets.Unlock()
550+
user.buckets.RUnlock()
541551

542552
if err := f(bucket.db); err != nil {
543553
merr.Add(err)
544-
if deleteBucket {
545-
// Restore back the old slice to attempt running later.
546-
// As the slice would be small and the failure would be rare,
547-
// we are not caring about efficient slice management.
548-
user.buckets.Lock()
549-
user.buckets.buckets = append([]*tsdbBucket{bucket}, user.buckets.buckets...)
550-
user.buckets.Unlock()
551-
}
552554
break
553555
}
556+
idx++
557+
if deleteBucket {
558+
user.buckets.Lock()
559+
user.buckets.buckets = user.buckets.buckets[1:]
560+
user.buckets.Unlock()
561+
idx--
562+
}
554563
}
555564

556565
if deleteBucket {
@@ -570,9 +579,9 @@ func (i *Ingester) runOnBucketsBefore(deleteBucket bool, gracePeriodFunc func(t
570579

571580
// Assumes 1h bucket range for . TODO(codesome): protect stuff with locks.
572581
type backfillTSDBs struct {
573-
tsdbsMtx sync.RWMutex
574-
closeAndShipMtx sync.Mutex
575-
tsdbs map[string]*tsdbBuckets
582+
tsdbsMtx sync.RWMutex
583+
compactShipDeleteMtx sync.Mutex
584+
tsdbs map[string]*tsdbBuckets
576585
}
577586

578587
func newBackfillTSDBs() *backfillTSDBs {
@@ -620,8 +629,8 @@ func overlapsOpenInterval(mint1, maxt1, mint2, maxt2 int64) bool {
620629
// getBucketName returns the string representation of the bucket.
621630
// YYYY_MM_DD_HH_YYYY_MM_DD_HH
622631
func getBucketName(start, end int64) string {
623-
startTime := model.Time(start).Time()
624-
endTime := model.Time(end).Time()
632+
startTime := model.Time(start).Time().UTC()
633+
endTime := model.Time(end).Time().UTC()
625634

626635
return fmt.Sprintf(
627636
"%04d_%02d_%02d_%02d_%04d_%02d_%02d_%02d",

pkg/ingester/ingester_v2_test.go

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1821,6 +1821,17 @@ func TestIngesterV2BackfillPushAndQuery(t *testing.T) {
18211821
}
18221822
}
18231823

1824+
testQuery := func() {
1825+
res, err := i.v2Query(ctx, &client.QueryRequest{
1826+
StartTimestampMs: math.MinInt64,
1827+
EndTimestampMs: math.MaxInt64,
1828+
Matchers: []*client.LabelMatcher{{Type: client.REGEX_MATCH, Name: labels.MetricName, Value: ".*"}},
1829+
})
1830+
require.NoError(t, err)
1831+
require.NotNil(t, res)
1832+
assert.Equal(t, expectedIngested, res.Timeseries)
1833+
}
1834+
18241835
// Samples for 100h. The main tsdb will be able to handle samples only
18251836
// upto 99h after this.
18261837
ts := 100 * time.Hour.Milliseconds()
@@ -1863,14 +1874,7 @@ func TestIngesterV2BackfillPushAndQuery(t *testing.T) {
18631874
ingestSample(ts, 4, true)
18641875

18651876
// Query back all the samples.
1866-
res, err := i.v2Query(ctx, &client.QueryRequest{
1867-
StartTimestampMs: math.MinInt64,
1868-
EndTimestampMs: math.MaxInt64,
1869-
Matchers: []*client.LabelMatcher{{Type: client.REGEX_MATCH, Name: labels.MetricName, Value: ".*"}},
1870-
})
1871-
require.NoError(t, err)
1872-
require.NotNil(t, res)
1873-
assert.Equal(t, expectedIngested, res.Timeseries)
1877+
testQuery()
18741878

18751879
// Restart to check if we can still query backfill TSDBs.
18761880

@@ -1890,12 +1894,31 @@ func TestIngesterV2BackfillPushAndQuery(t *testing.T) {
18901894
})
18911895

18921896
// Query back all the samples.
1893-
res, err = i.v2Query(ctx, &client.QueryRequest{
1894-
StartTimestampMs: math.MinInt64,
1895-
EndTimestampMs: math.MaxInt64,
1896-
Matchers: []*client.LabelMatcher{{Type: client.REGEX_MATCH, Name: labels.MetricName, Value: ".*"}},
1897-
})
1898-
require.NoError(t, err)
1899-
require.NotNil(t, res)
1900-
assert.Equal(t, expectedIngested, res.Timeseries)
1897+
testQuery()
1898+
1899+
// Compact old blocks partially to check
1900+
// * Compaction is happening properly.
1901+
// * We can still query it.
1902+
1903+
// Check there are no blocks yet.
1904+
userBuckets := i.TSDBState.backfillDBs.getBucketsForUser(userID)
1905+
for _, bucket := range userBuckets.buckets {
1906+
require.Equal(t, 0, len(bucket.db.Blocks()))
1907+
m := &shipperMock{}
1908+
m.On("Sync", mock.Anything).Return(0, nil)
1909+
bucket.db.shipper = m
1910+
}
1911+
1912+
// Compacting the oldest 2 buckets. They are <=97h, so compacting buckets upto 97.5h (current 100h minus 2.5h).
1913+
require.NoError(t, i.compactOldBackfillTSDBsAndShip(2*time.Hour.Milliseconds()+30*time.Minute.Milliseconds()))
1914+
for idx, bucket := range userBuckets.buckets {
1915+
if idx < 2 {
1916+
require.Equal(t, 1, len(bucket.db.Blocks()), "bucket index %d", idx)
1917+
} else {
1918+
require.Equal(t, 0, len(bucket.db.Blocks()), "bucket index %d", idx)
1919+
}
1920+
}
1921+
1922+
// We can still query compacted blocks.
1923+
testQuery()
19011924
}

0 commit comments

Comments
 (0)