Skip to content

Commit d53c545

Browse files
committed
Close and ship old backfill TSDBs
Signed-off-by: Ganesh Vernekar <[email protected]>
1 parent a1c3352 commit d53c545

File tree

3 files changed

+97
-4
lines changed

3 files changed

+97
-4
lines changed

pkg/ingester/ingester_v2.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1251,9 +1251,13 @@ func (i *Ingester) compactionLoop(ctx context.Context) error {
12511251
select {
12521252
case <-ticker.C:
12531253
i.compactBlocks(ctx, false)
1254+
if err := i.closeOldBackfillTSDBsAndShip(i.cfg.TSDBConfig.BackfillLimit.Milliseconds()); err != nil {
1255+
level.Warn(util.Logger).Log("msg", "failed to close old backfill TSDBs", "err", err)
1256+
}
12541257

12551258
case ch := <-i.TSDBState.forceCompactTrigger:
12561259
i.compactBlocks(ctx, true)
1260+
i.compactAllBackfillTSDBs(ctx)
12571261

12581262
// Notify back.
12591263
select {

pkg/ingester/ingester_v2_backfill.go

Lines changed: 92 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -432,11 +432,100 @@ sendLoop:
432432
wg.Wait()
433433
}
434434

435+
func (i *Ingester) closeOldBackfillTSDBsAndShip(gracePeriod int64) error {
436+
i.TSDBState.backfillDBs.closeAndShipMtx.Lock()
437+
defer i.TSDBState.backfillDBs.closeAndShipMtx.Unlock()
438+
439+
type tempType struct {
440+
userID string
441+
buckets *tsdbBuckets
442+
}
443+
444+
var usersHavingOldTSDBs []tempType
445+
446+
// Collecting users who have old TSDBs.
447+
i.TSDBState.backfillDBs.tsdbsMtx.RLock()
448+
for userID, userBuckets := range i.TSDBState.backfillDBs.tsdbs {
449+
userBuckets.RLock()
450+
for _, bucket := range userBuckets.buckets {
451+
if bucket.bucketEnd < bucket.db.Head().MaxTime()-gracePeriod {
452+
userBuckets.RUnlock()
453+
usersHavingOldTSDBs = append(usersHavingOldTSDBs, tempType{
454+
userID: userID,
455+
buckets: userBuckets,
456+
})
457+
break
458+
}
459+
}
460+
userBuckets.RUnlock()
461+
}
462+
i.TSDBState.backfillDBs.tsdbsMtx.RUnlock()
463+
464+
var merr tsdb_errors.MultiError
465+
for _, user := range usersHavingOldTSDBs {
466+
for {
467+
user.buckets.Lock()
468+
if len(user.buckets.buckets) == 0 {
469+
user.buckets.Unlock()
470+
break
471+
}
472+
bucket := user.buckets.buckets[0]
473+
if bucket.bucketEnd >= bucket.db.Head().MaxTime()-gracePeriod {
474+
user.buckets.Unlock()
475+
break
476+
}
477+
user.buckets.buckets = user.buckets.buckets[1:]
478+
user.buckets.Unlock()
479+
480+
// Compact the head first.
481+
db := bucket.db
482+
h := db.Head()
483+
if err := db.CompactHead(tsdb.NewRangeHead(h, h.MinTime(), h.MaxTime())); err != nil {
484+
merr.Add(errors.Wrap(err, "compact head"))
485+
// Compaction failed. Restore back the old slice to attempt shipping later.
486+
// As the slice would be small and the compaction failure would be rare,
487+
// we are not caring about efficient slice management.
488+
user.buckets.Lock()
489+
user.buckets.buckets = append([]*tsdbBucket{bucket}, user.buckets.buckets...)
490+
user.buckets.Unlock()
491+
break
492+
}
493+
494+
if db.shipper == nil {
495+
continue
496+
}
497+
// Ship the block.
498+
if uploaded, err := db.shipper.Sync(context.Background()); err != nil {
499+
merr.Add(errors.Wrap(err, "ship block"))
500+
// Shipping failed. Restore back the old slice to attempt shipping later.
501+
// As the slice would be small and the compaction failure would be rare,
502+
// we are not caring about efficient slice management.
503+
user.buckets.Lock()
504+
user.buckets.buckets = append([]*tsdbBucket{bucket}, user.buckets.buckets...)
505+
user.buckets.Unlock()
506+
} else {
507+
level.Debug(util.Logger).Log("msg", "shipper successfully synchronized backfill TSDB blocks with storage", "user", db.userID, "uploaded", uploaded, "bucket_dir", db.Dir())
508+
}
509+
}
510+
511+
user.buckets.Lock()
512+
i.TSDBState.backfillDBs.tsdbsMtx.Lock()
513+
if len(user.buckets.buckets) == 0 {
514+
// No backfill TSDBs left for the user.
515+
delete(i.TSDBState.backfillDBs.tsdbs, user.userID)
516+
}
517+
i.TSDBState.backfillDBs.tsdbsMtx.Unlock()
518+
user.buckets.Unlock()
519+
}
520+
521+
return merr.Err()
522+
}
523+
435524
// Assumes 1h bucket range for . TODO(codesome): protect stuff with locks.
436525
type backfillTSDBs struct {
437-
tsdbsMtx sync.RWMutex
438-
// TODO(codesome): have more granular locks.
439-
tsdbs map[string]*tsdbBuckets
526+
tsdbsMtx sync.RWMutex
527+
closeAndShipMtx sync.Mutex
528+
tsdbs map[string]*tsdbBuckets
440529
}
441530

442531
func newBackfillTSDBs() *backfillTSDBs {

pkg/ingester/ingester_v2_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1803,7 +1803,7 @@ func TestIngesterV2BackfillPushAndQuery(t *testing.T) {
18031803
nil, client.API),
18041804
)
18051805

1806-
require.Equal(t, numBackfillTSDBs, len(i.TSDBState.backfillDBs.tsdbs[userID]))
1806+
require.Equal(t, numBackfillTSDBs, len(i.TSDBState.backfillDBs.tsdbs[userID].buckets))
18071807
if !errExpected {
18081808
require.NoError(t, err)
18091809
expectedIngested = append(expectedIngested, client.TimeSeries{

0 commit comments

Comments
 (0)