diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 53e627059f..70bc8d27a9 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -481,6 +481,14 @@ blocks_storage: # CLI flag: -experimental.blocks-storage.tsdb.flush-blocks-on-shutdown [flush_blocks_on_shutdown: | default = false] + # Local directory to store backfill TSDBs in the ingesters. + # CLI flag: -experimental.blocks-storage.tsdb.backfill-dir + [backfill_dir: | default = "backfill_tsdb"] + + # Maximum accepted sample age by backfilling. 0 disables it. + # CLI flag: -experimental.blocks-storage.tsdb.backfill-max-age + [backfill_limit: | default = 0s] + # limit the number of concurrently opening TSDB's on startup # CLI flag: -experimental.blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup [max_tsdb_opening_concurrency_on_startup: | default = 10] diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index f7bb21b791..28c7045547 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -508,6 +508,14 @@ blocks_storage: # CLI flag: -experimental.blocks-storage.tsdb.flush-blocks-on-shutdown [flush_blocks_on_shutdown: | default = false] + # Local directory to store backfill TSDBs in the ingesters. + # CLI flag: -experimental.blocks-storage.tsdb.backfill-dir + [backfill_dir: | default = "backfill_tsdb"] + + # Maximum accepted sample age by backfilling. 0 disables it. + # CLI flag: -experimental.blocks-storage.tsdb.backfill-max-age + [backfill_limit: | default = 0s] + # limit the number of concurrently opening TSDB's on startup # CLI flag: -experimental.blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup [max_tsdb_opening_concurrency_on_startup: | default = 10] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 8a8f68aa58..5718375231 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3251,6 +3251,14 @@ tsdb: # CLI flag: -experimental.blocks-storage.tsdb.flush-blocks-on-shutdown [flush_blocks_on_shutdown: | default = false] + # Local directory to store backfill TSDBs in the ingesters. + # CLI flag: -experimental.blocks-storage.tsdb.backfill-dir + [backfill_dir: | default = "backfill_tsdb"] + + # Maximum accepted sample age by backfilling. 0 disables it. + # CLI flag: -experimental.blocks-storage.tsdb.backfill-max-age + [backfill_limit: | default = 0s] + # limit the number of concurrently opening TSDB's on startup # CLI flag: -experimental.blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup [max_tsdb_opening_concurrency_on_startup: | default = 10] diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 80a31d7ee9..0fd0391d85 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -2,8 +2,10 @@ package ingester import ( "context" + "encoding/json" "fmt" "io" + "io/ioutil" "math" "net/http" "os" @@ -12,6 +14,7 @@ import ( "time" "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -20,6 +23,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/shipper" @@ -119,10 +123,42 @@ func (u *userTSDB) setLastUpdate(t time.Time) { u.lastUpdate.Store(t.Unix()) } +func (u *userTSDB) getShippedBlocksULID() ([]ulid.ULID, error) { + b, err := ioutil.ReadFile(filepath.Join(u.Dir(), shipper.MetaFilename)) + if err != nil { + return nil, errors.Wrap(err, "read shipper meta file") + } + var shipperMeta shipper.Meta + if err := json.Unmarshal(b, &shipperMeta); err != nil { + return nil, errors.Wrap(err, "unmarshal shipper meta file to json") + } + return shipperMeta.Uploaded, nil +} + +func (u *userTSDB) getUnshippedBlocksULID() (unshipped []ulid.ULID, err error) { + shippedBlocks, err := u.getShippedBlocksULID() + if err != nil { + return nil, errors.Wrap(err, "get shipped blocks") + } + +Outer: + for _, b := range u.Blocks() { + for _, uid := range shippedBlocks { + if uid == b.Meta().ULID { + continue Outer + } + } + unshipped = append(unshipped, b.Meta().ULID) + } + + return unshipped, nil +} + // TSDBState holds data structures used by the TSDB storage engine type TSDBState struct { - dbs map[string]*userTSDB // tsdb sharded by userID - bucket objstore.Bucket + dbs map[string]*userTSDB // tsdb sharded by userID + backfillDBs *backfillTSDBs + bucket objstore.Bucket // Value used by shipper as external label. shipperIngesterID string @@ -157,6 +193,7 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer tsdbMetrics: newTSDBMetrics(registerer), forceCompactTrigger: make(chan chan<- struct{}), shipTrigger: make(chan chan<- struct{}), + backfillDBs: newBackfillTSDBs(), compactionsTriggered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{ Name: "cortex_ingester_tsdb_compactions_triggered_total", @@ -274,6 +311,11 @@ func (i *Ingester) startingV2(ctx context.Context) error { return errors.Wrap(err, "opening existing TSDBs") } + // Scan and open backfill TSDB's that already exist on disk. + if err := i.openExistingBackfillTSDB(context.Background()); err != nil { + return errors.Wrap(err, "opening existing backfill TSDBs") + } + // Important: we want to keep lifecycler running until we ask it to stop, so we need to give it independent context if err := i.lifecycler.StartAsync(context.Background()); err != nil { return errors.Wrap(err, "failed to start lifecycler") @@ -420,6 +462,7 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien // Walk the samples, appending them to the users database app := db.Appender() + var backfillApp *backfillAppender for _, ts := range req.Timeseries { // Check if we already have a cached reference for this series. Be aware // that even if we have a reference it's not guaranteed to be still valid. @@ -460,13 +503,27 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien } } + cause := errors.Cause(err) + if cause == storage.ErrOutOfBounds && + i.cfg.BlocksStorageConfig.TSDB.BackfillLimit > 0 && + s.TimestampMs > db.Head().MaxTime()-i.cfg.BlocksStorageConfig.TSDB.BackfillLimit.Milliseconds() { + if backfillApp == nil { + backfillApp = i.newBackfillAppender(userID) + } + err := backfillApp.add(ts.Labels, s) + if err == nil { + succeededSamplesCount++ + continue + } + cause = errors.Cause(err) + } + failedSamplesCount++ // Check if the error is a soft error we can proceed on. If so, we keep track // of it, so that we can return it back to the distributor, which will return a // 400 error to the client. The client (Prometheus) will not retry on 400, and // we actually ingested all samples which haven't failed. - cause := errors.Cause(err) if cause == storage.ErrOutOfBounds || cause == storage.ErrOutOfOrderSample || cause == storage.ErrDuplicateSampleForTimestamp { if firstPartialErr == nil { firstPartialErr = errors.Wrapf(err, "series=%s, timestamp=%v", client.FromLabelAdaptersToLabels(ts.Labels).String(), model.Time(s.TimestampMs).Time().UTC().Format(time.RFC3339Nano)) @@ -495,8 +552,13 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien } // The error looks an issue on our side, so we should rollback - if rollbackErr := app.Rollback(); rollbackErr != nil { - level.Warn(util.Logger).Log("msg", "failed to rollback on error", "user", userID, "err", rollbackErr) + var merr tsdb_errors.MultiError + merr.Add(errors.Wrap(app.Rollback(), "main appender")) + if backfillApp != nil { + merr.Add(errors.Wrap(backfillApp.rollback(), "backfill appender")) + } + if merr.Err() != nil { + level.Warn(util.Logger).Log("msg", "failed to rollback on error", "user", userID, "err", merr.Err()) } return nil, wrapWithUser(err, userID) @@ -507,8 +569,13 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien i.TSDBState.appenderAddDuration.Observe(time.Since(startAppend).Seconds()) startCommit := time.Now() - if err := app.Commit(); err != nil { - return nil, wrapWithUser(err, userID) + var merr tsdb_errors.MultiError + merr.Add(errors.Wrap(app.Commit(), "main appender")) + if backfillApp != nil { + merr.Add(errors.Wrap(backfillApp.commit(), "backfill appender")) + } + if merr.Err() != nil { + return nil, wrapWithUser(merr.Err(), userID) } i.TSDBState.appenderCommitDuration.Observe(time.Since(startCommit).Seconds()) @@ -571,6 +638,16 @@ func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*clie return nil, ss.Err() } + backfillSSs, err := i.backfillSelect(ctx, userID, int64(from), int64(through), matchers) + if err != nil { + return nil, err + } + if len(backfillSSs) > 0 { + // TODO(codesome): If any TSDB in backfill buckets were overlapping + // with main TSDB, then use tsdb.NewMergedVerticalSeriesSet + ss = tsdb.NewMergedSeriesSet(append(backfillSSs, ss)) + } + numSamples := 0 result := &client.QueryResponse{} @@ -802,6 +879,16 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste return ss.Err() } + backfillSSs, err := i.backfillSelect(ctx, userID, int64(from), int64(through), matchers) + if err != nil { + return err + } + if len(backfillSSs) > 0 { + // TODO(codesome): If any TSDB in backfill buckets were overlapping + // with main TSDB, then use tsdb.NewMergedVerticalSeriesSet + ss = tsdb.NewMergedSeriesSet(append(backfillSSs, ss)) + } + timeseries := make([]client.TimeSeries, 0, queryStreamBatchSize) batchSize := 0 numSamples := 0 @@ -921,11 +1008,28 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error) // createTSDB creates a TSDB for a given userID, and returns the created db. func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { tsdbPromReg := prometheus.NewRegistry() - udir := i.cfg.BlocksStorageConfig.TSDB.BlocksDir(userID) - userLogger := util.WithUserID(userID, util.Logger) blockRanges := i.cfg.BlocksStorageConfig.TSDB.BlockRanges.ToMilliseconds() + userDB, err := i.createNewTSDB( + userID, + i.cfg.BlocksStorageConfig.TSDB.BlocksDir(userID), + blockRanges[0], + blockRanges[len(blockRanges)-1], + tsdbPromReg, + ) + if err == nil { + // We set the limiter here because we don't want to limit + // series during WAL replay. + userDB.limiter = i.limiter + i.TSDBState.tsdbMetrics.setRegistryForUser(userID, tsdbPromReg) + } + + return userDB, err +} +// createNewTSDB creates a TSDB for a given userID and data directory. +func (i *Ingester) createNewTSDB(userID, dbDir string, minBlockDuration, maxBlockDuration int64, reg *prometheus.Registry) (*userTSDB, error) { + userLogger := util.WithUserID(userID, util.Logger) userDB := &userTSDB{ userID: userID, refCache: cortex_tsdb.NewRefCache(), @@ -936,17 +1040,17 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { } // Create a new user database - db, err := tsdb.Open(udir, userLogger, tsdbPromReg, &tsdb.Options{ + db, err := tsdb.Open(dbDir, userLogger, reg, &tsdb.Options{ RetentionDuration: i.cfg.BlocksStorageConfig.TSDB.Retention.Milliseconds(), - MinBlockDuration: blockRanges[0], - MaxBlockDuration: blockRanges[len(blockRanges)-1], + MinBlockDuration: minBlockDuration, + MaxBlockDuration: maxBlockDuration, NoLockfile: true, StripeSize: i.cfg.BlocksStorageConfig.TSDB.StripeSize, WALCompression: i.cfg.BlocksStorageConfig.TSDB.WALCompressionEnabled, SeriesLifecycleCallback: userDB, }) if err != nil { - return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir) + return nil, errors.Wrapf(err, "failed to open TSDB: %s", dbDir) } db.DisableCompactions() // we will compact on our own schedule @@ -956,13 +1060,10 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { level.Info(userLogger).Log("msg", "Running compaction after WAL replay") err = db.Compact() if err != nil { - return nil, errors.Wrapf(err, "failed to compact TSDB: %s", udir) + return nil, errors.Wrapf(err, "failed to compact TSDB: %s", dbDir) } userDB.DB = db - // We set the limiter here because we don't want to limit - // series during WAL replay. - userDB.limiter = i.limiter userDB.setLastUpdate(time.Now()) // After WAL replay. // Thanos shipper requires at least 1 external label to be set. For this reason, @@ -982,8 +1083,8 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { if i.cfg.BlocksStorageConfig.TSDB.ShipInterval > 0 { userDB.shipper = shipper.New( userLogger, - tsdbPromReg, - udir, + reg, + dbDir, cortex_tsdb.NewUserBucketClient(userID, i.TSDBState.bucket), func() labels.Labels { return l }, metadata.ReceiveSource, @@ -991,7 +1092,6 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { ) } - i.TSDBState.tsdbMetrics.setRegistryForUser(userID, tsdbPromReg) return userDB, nil } @@ -1023,8 +1123,15 @@ func (i *Ingester) closeAllTSDB() { }(userDB) } - // Wait until all Close() completed + wg.Add(1) + go func() { + defer wg.Done() + i.closeAllBackfillTSDBs() + }() + i.userStatesMtx.Unlock() + + // Wait until all Close() completed wg.Wait() } @@ -1177,9 +1284,17 @@ func (i *Ingester) compactionLoop(ctx context.Context) error { select { case <-ticker.C: i.compactBlocks(ctx, false) + if err := i.compactOldBackfillTSDBsAndShip(i.cfg.BlocksStorageConfig.TSDB.BackfillLimit.Milliseconds()); err != nil { + level.Warn(util.Logger).Log("msg", "failed to compact and ship old backfill TSDBs", "err", err) + } + + if err := i.closeOldBackfillTSDBsAndDelete(i.cfg.MaxChunkAge.Milliseconds()); err != nil { + level.Warn(util.Logger).Log("msg", "failed to close and delete old backfill TSDBs", "err", err) + } case ch := <-i.TSDBState.forceCompactTrigger: i.compactBlocks(ctx, true) + i.compactAllBackfillTSDBs(ctx) // Notify back. select { @@ -1289,8 +1404,10 @@ func (i *Ingester) v2LifecyclerFlush() { ctx := context.Background() i.compactBlocks(ctx, true) + i.compactAllBackfillTSDBs(ctx) if i.cfg.BlocksStorageConfig.TSDB.ShipInterval > 0 { i.shipBlocks(ctx) + i.shipAllBackfillTSDBs(ctx) } level.Info(util.Logger).Log("msg", "finished flushing and shipping TSDB blocks") diff --git a/pkg/ingester/ingester_v2_backfill.go b/pkg/ingester/ingester_v2_backfill.go new file mode 100644 index 0000000000..cc21882d9d --- /dev/null +++ b/pkg/ingester/ingester_v2_backfill.go @@ -0,0 +1,711 @@ +package ingester + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "sort" + "strconv" + "sync" + "time" + + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/pkg/gate" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util" +) + +// backfillAppender is an appender to ingest old data. +// This _does not_ implement storage.Appender interface. +// The methods of this appender should not be called concurrently. +type backfillAppender struct { + userID string + ingester *Ingester + buckets *tsdbBuckets + appenders map[int]storage.Appender +} + +func (i *Ingester) newBackfillAppender(userID string) *backfillAppender { + buckets := i.TSDBState.backfillDBs.getBucketsForUser(userID) + if buckets == nil { + buckets = i.TSDBState.backfillDBs.getOrCreateNewUser(userID) + } + return &backfillAppender{ + userID: userID, + ingester: i, + buckets: buckets, + appenders: make(map[int]storage.Appender), + } +} + +func (a *backfillAppender) add(la []client.LabelAdapter, s client.Sample) (err error) { + a.buckets.RLock() + bucket := getBucketForTimestamp(s.TimestampMs, a.buckets.buckets) + a.buckets.RUnlock() + if bucket == nil { + bucket, err = a.ingester.getOrCreateBackfillTSDB(a.buckets, a.userID, s.TimestampMs) + if err != nil { + return err + } + } + + db := bucket.db + var app storage.Appender + if ap, ok := a.appenders[bucket.id]; ok { + app = ap + } else { + app = db.Appender() + a.appenders[bucket.id] = app + } + + startAppend := time.Now() + cachedRef, cachedRefExists := db.refCache.Ref(startAppend, client.FromLabelAdaptersToLabels(la)) + // If the cached reference exists, we try to use it. + if cachedRefExists { + err = app.AddFast(cachedRef, s.TimestampMs, s.Value) + if err != nil && errors.Cause(err) == storage.ErrNotFound { + cachedRefExists = false + err = nil + } + } + + // If the cached reference doesn't exist, we (re)try without using the reference. + if !cachedRefExists { + // Copy the label set because both TSDB and the cache may retain it. + copiedLabels := client.FromLabelAdaptersToLabelsWithCopy(la) + if ref, err := app.Add(copiedLabels, s.TimestampMs, s.Value); err == nil { + db.refCache.SetRef(startAppend, copiedLabels, ref) + } + } + + return err +} + +func (a *backfillAppender) commit() error { + var merr tsdb_errors.MultiError + for _, app := range a.appenders { + merr.Add(app.Commit()) + } + return merr.Err() +} + +func (a *backfillAppender) rollback() error { + var merr tsdb_errors.MultiError + for _, app := range a.appenders { + merr.Add(app.Rollback()) + } + return merr.Err() +} + +func getBucketForTimestamp(ts int64, userBuckets []*tsdbBucket) *tsdbBucket { + // As the number of buckets will be small, we are iterating instead of binary search. + for _, b := range userBuckets { + if ts >= b.bucketStart && ts < b.bucketEnd { + return b + } + } + return nil +} + +func (i *Ingester) getOrCreateBackfillTSDB(userBuckets *tsdbBuckets, userID string, ts int64) (*tsdbBucket, error) { + start, end := getBucketRangesForTimestamp(ts, 1) + + userBuckets.RLock() + for _, b := range userBuckets.buckets { + if ts >= b.bucketStart && ts < b.bucketEnd { + userBuckets.RUnlock() + return b, nil + } + + // Existing: |-----------| + // New: |------------| + // Changed to: |------| (no overlaps) + if b.bucketStart < start && start < b.bucketEnd { + start = b.bucketEnd + } + + // Existing: |-----------| + // New: |------------| + // Changed to: |------| (no overlaps) + if end > b.bucketStart && end < b.bucketEnd { + end = b.bucketStart + break + } + + if b.bucketStart > end { + break + } + } + userBuckets.RUnlock() + + // No bucket exists for this timestamp, create one. + userBuckets.Lock() + defer userBuckets.Unlock() + // Check again if another goroutine created a bucket for this timestamp between unlocking and locking.. + for _, b := range userBuckets.buckets { + if ts >= b.bucketStart && ts < b.bucketEnd { + return b, nil + } + } + + db, err := i.createNewTSDB( + userID, filepath.Join(i.cfg.BlocksStorageConfig.TSDB.BackfillDir, userID, getBucketName(start, end)), + (end-start)*2, (end-start)*2, prometheus.NewRegistry(), + ) + if err != nil { + return nil, err + } + bucket := &tsdbBucket{ + db: db, + bucketStart: start, + bucketEnd: end, + } + if len(userBuckets.buckets) > 0 { + bucket.id = userBuckets.buckets[len(userBuckets.buckets)-1].id + 1 + } + userBuckets.buckets = append(userBuckets.buckets, bucket) + sort.Slice(userBuckets.buckets, func(i, j int) bool { + return userBuckets.buckets[i].bucketStart < userBuckets.buckets[j].bucketStart + }) + + return bucket, nil +} + +func (i *Ingester) openExistingBackfillTSDB(ctx context.Context) error { + level.Info(util.Logger).Log("msg", "opening existing TSDBs") + wg := &sync.WaitGroup{} + openGate := gate.New(i.cfg.BlocksStorageConfig.TSDB.MaxTSDBOpeningConcurrencyOnStartup) + + users, err := ioutil.ReadDir(i.cfg.BlocksStorageConfig.TSDB.BackfillDir) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return err + } + + var runErr error + for _, u := range users { + if !u.IsDir() { + continue + } + + userID := u.Name() + userPath := filepath.Join(i.cfg.BlocksStorageConfig.TSDB.BackfillDir, userID) + + bucketNames, err := ioutil.ReadDir(userPath) + if err != nil { + level.Error(util.Logger).Log("msg", "unable to open user TSDB dir for backfill", "err", err, "user", u, "path", userPath) + continue + } + + for bucketID, bucketName := range bucketNames { + if !bucketName.IsDir() { + continue + } + + dbPath := filepath.Join(userPath, bucketName.Name()) + f, err := os.Open(dbPath) + if err != nil { + level.Error(util.Logger).Log("msg", "unable to open user backfill TSDB dir", "err", err, "user", userID, "path", dbPath) + return filepath.SkipDir + } + defer f.Close() + + // If the dir is empty skip it + if _, err := f.Readdirnames(1); err != nil { + if err != io.EOF { + level.Error(util.Logger).Log("msg", "unable to read backfill TSDB dir", "err", err, "user", userID, "path", dbPath) + } + + return filepath.SkipDir + } + + // Limit the number of TSDB's opening concurrently. Start blocks until there's a free spot available or the context is cancelled. + if err := openGate.Start(ctx); err != nil { + runErr = err + break + } + + wg.Add(1) + go func(bucketID int, userID, bucketName, dbDir string) { + defer wg.Done() + defer openGate.Done() + defer func(ts time.Time) { + i.TSDBState.walReplayTime.Observe(time.Since(ts).Seconds()) + }(time.Now()) + + start, end, err := getBucketRangesForBucketName(bucketName) + if err != nil { + level.Error(util.Logger).Log("msg", "unable to get bucket range", "err", err, "user", userID, "bucketName", bucketName) + return + } + db, err := i.createNewTSDB(userID, dbDir, (end-start)*2, (end-start)*2, prometheus.NewRegistry()) + if err != nil { + level.Error(util.Logger).Log("msg", "unable to open user backfill TSDB", "err", err, "user", userID) + return + } + + bucket := &tsdbBucket{ + id: bucketID, + db: db, + bucketStart: start, + bucketEnd: end, + } + + buckets := i.TSDBState.backfillDBs.getOrCreateNewUser(userID) + buckets.Lock() + // We will sort it at the end. + buckets.buckets = append(buckets.buckets, bucket) + buckets.Unlock() + }(bucketID, userID, bucketName.Name(), dbPath) + } + + if runErr != nil { + break + } + + } + + // Wait for all opening routines to finish + wg.Wait() + + // Sort the buckets within the users. + i.TSDBState.backfillDBs.tsdbsMtx.Lock() + for _, buckets := range i.TSDBState.backfillDBs.tsdbs { + buckets.Lock() + sort.Slice(buckets.buckets, func(i, j int) bool { + return buckets.buckets[i].bucketStart < buckets.buckets[j].bucketStart + }) + buckets.Unlock() + } + i.TSDBState.backfillDBs.tsdbsMtx.Unlock() + + return runErr +} + +func (i *Ingester) backfillSelect(ctx context.Context, userID string, from, through int64, matchers []*labels.Matcher) ([]storage.SeriesSet, error) { + buckets := i.TSDBState.backfillDBs.getBucketsForUser(userID) + if buckets == nil { + return nil, nil + } + var queriers []storage.Querier + defer func() { + for _, q := range queriers { + q.Close() + } + }() + buckets.RLock() + for _, b := range buckets.buckets { + if !b.overlaps(from, through) { + mint, err := b.db.DB.StartTime() + if err != nil { + buckets.RUnlock() + return nil, err + } + maxt := b.db.Head().MaxTime() + if !overlapsOpenInterval(mint, maxt, from, through) { + continue + } + } + + q, err := b.db.Querier(ctx, from, through) + if err != nil { + buckets.RUnlock() + return nil, err + } + + queriers = append(queriers, q) + } + buckets.RUnlock() + + if len(queriers) == 0 { + return nil, nil + } + + result := make([]storage.SeriesSet, len(queriers)) + errC := make(chan error, 1) + var wg sync.WaitGroup + for i, q := range queriers { + wg.Add(1) + go func(i int, q storage.Querier) { + defer wg.Done() + + ss := q.Select(false, nil, matchers...) + if ss.Err() != nil { + select { + case errC <- ss.Err(): + default: + } + } + result[i] = ss + }(i, q) + } + + wg.Wait() + select { + case err := <-errC: + return nil, err + default: + } + + return result, nil +} + +func (i *Ingester) closeAllBackfillTSDBs() { + // Snapshotting of in-memory chunks can be considered as a small compaction, hence + // using that concurrency. + i.runConcurrentBackfillWorkers(context.Background(), i.cfg.BlocksStorageConfig.TSDB.HeadCompactionConcurrency, func(db *userTSDB) { + if err := db.Close(); err != nil { + level.Warn(util.Logger).Log("msg", "unable to close backfill TSDB", "user", db.userID, "bucket_dir", db.Dir(), "err", err) + } + }) +} + +func (i *Ingester) compactAllBackfillTSDBs(ctx context.Context) { + i.runConcurrentBackfillWorkers(ctx, i.cfg.BlocksStorageConfig.TSDB.ShipConcurrency, func(db *userTSDB) { + h := db.Head() + if err := db.CompactHead(tsdb.NewRangeHead(h, h.MinTime(), h.MaxTime())); err != nil { + level.Error(util.Logger).Log("msg", "unable to compact backfill TSDB", "user", db.userID, "bucket_dir", db.Dir(), "err", err) + } + }) +} + +func (i *Ingester) shipAllBackfillTSDBs(ctx context.Context) { + i.runConcurrentBackfillWorkers(ctx, i.cfg.BlocksStorageConfig.TSDB.ShipConcurrency, func(db *userTSDB) { + if db.shipper == nil { + return + } + if uploaded, err := db.shipper.Sync(context.Background()); err != nil { + level.Warn(util.Logger).Log("msg", "shipper failed to synchronize backfill TSDB blocks with the storage", "user", db.userID, "uploaded", uploaded, "bucket_dir", db.Dir(), "err", err) + } else { + level.Debug(util.Logger).Log("msg", "shipper successfully synchronized backfill TSDB blocks with storage", "user", db.userID, "uploaded", uploaded, "bucket_dir", db.Dir()) + } + }) +} + +func (i *Ingester) runConcurrentBackfillWorkers(ctx context.Context, concurrency int, userFunc func(*userTSDB)) { + i.TSDBState.backfillDBs.tsdbsMtx.Lock() + defer i.TSDBState.backfillDBs.tsdbsMtx.Unlock() + + // Using head compaction concurrency for both head compaction and shipping. + ch := make(chan *userTSDB, concurrency) + + wg := &sync.WaitGroup{} + wg.Add(concurrency) + for i := 0; i < concurrency; i++ { + go func() { + defer wg.Done() + for db := range ch { + userFunc(db) + } + }() + } + +sendLoop: + for _, buckets := range i.TSDBState.backfillDBs.tsdbs { + buckets.Lock() + for _, bucket := range buckets.buckets { + select { + case ch <- bucket.db: + // ok + case <-ctx.Done(): + buckets.Unlock() + // don't start new tasks. + break sendLoop + } + + } + buckets.Unlock() + } + close(ch) + + wg.Wait() +} + +func (i *Ingester) compactOldBackfillTSDBsAndShip(gracePeriod int64) error { + return i.runOnBucketsBefore(false, + func(t int64) int64 { + return t - gracePeriod + }, + func(db *userTSDB) error { + // Compact the head first. + h := db.Head() + if err := db.CompactHead(tsdb.NewRangeHead(h, h.MinTime(), h.MaxTime())); err != nil { + return errors.Wrap(err, "compact head") + } + + if db.shipper == nil { + return nil + } + // Ship the block. + uploaded, err := db.shipper.Sync(context.Background()) + if err != nil { + return errors.Wrap(err, "ship block") + } + level.Debug(util.Logger).Log("msg", "shipper successfully synchronized backfill TSDB blocks with storage", "user", db.userID, "uploaded", uploaded, "bucket_dir", db.Dir()) + return nil + }, + ) +} + +func (i *Ingester) closeOldBackfillTSDBsAndDelete(gracePeriod int64) error { + nowTimeMs := time.Now().Unix() * 1000 + return i.runOnBucketsBefore(true, + func(t int64) int64 { + return nowTimeMs - gracePeriod + }, + func(db *userTSDB) error { + // Compact the head if anything is left. Empty head will create no blocks. + h := db.Head() + if err := db.CompactHead(tsdb.NewRangeHead(h, h.MinTime(), h.MaxTime())); err != nil { + return errors.Wrap(err, "compact head") + } + + // TODO(codesome): check for double closing. + if err := db.Close(); err != nil { + return errors.Wrap(err, "close backfill TSDB") + } + + unshippedBlocks, err := db.getUnshippedBlocksULID() + if err != nil && errors.Cause(err) == os.ErrNotExist { + return errors.Wrap(err, "get unshipped blocks") + } + if err != nil || len(unshippedBlocks) > 0 { + // Ship the unshipped blocks. + uploaded, err := db.shipper.Sync(context.Background()) + if err != nil { + return errors.Wrap(err, "ship block") + } + level.Debug(util.Logger).Log("msg", "shipper successfully synchronized backfill TSDB blocks with storage", "user", db.userID, "uploaded", uploaded, "bucket_dir", db.Dir()) + } + + if err := os.RemoveAll(db.Dir()); err != nil { + return errors.Wrap(err, "delete backfill TSDB dir") + } + return nil + }, + ) +} + +func (i *Ingester) runOnBucketsBefore(deleteBucket bool, gracePeriodFunc func(t int64) int64, f func(db *userTSDB) error) error { + i.TSDBState.backfillDBs.compactShipDeleteMtx.Lock() + defer i.TSDBState.backfillDBs.compactShipDeleteMtx.Unlock() + + type tempType struct { + userID string + cutoffTime int64 + buckets *tsdbBuckets + } + + var usersHavingOldTSDBs []tempType + + // Collecting users who have old TSDBs. + i.TSDBState.backfillDBs.tsdbsMtx.RLock() + for userID, userBuckets := range i.TSDBState.backfillDBs.tsdbs { + cutoffTime := int64(0) + i.userStatesMtx.RLock() + mainDB := i.TSDBState.dbs[userID] + i.userStatesMtx.RUnlock() + userBuckets.RLock() + if mainDB != nil { + cutoffTime = gracePeriodFunc(mainDB.Head().MaxTime()) + } else { + // There is no main TSDB. So use the maxt of the last bucket. + cutoffTime = gracePeriodFunc(userBuckets.buckets[len(userBuckets.buckets)-1].db.Head().MaxTime()) + } + if userBuckets.buckets[0].bucketEnd < cutoffTime { + usersHavingOldTSDBs = append(usersHavingOldTSDBs, tempType{ + userID: userID, + cutoffTime: cutoffTime, + buckets: userBuckets, + }) + } + userBuckets.RUnlock() + } + i.TSDBState.backfillDBs.tsdbsMtx.RUnlock() + + var merr tsdb_errors.MultiError + for _, user := range usersHavingOldTSDBs { + idx := 0 + for { + user.buckets.RLock() + if len(user.buckets.buckets) == 0 || idx == len(user.buckets.buckets) { + user.buckets.RUnlock() + break + } + bucket := user.buckets.buckets[idx] + if bucket.bucketEnd >= user.cutoffTime { + user.buckets.RUnlock() + break + } + user.buckets.RUnlock() + + if err := f(bucket.db); err != nil { + merr.Add(err) + break + } + idx++ + if deleteBucket { + user.buckets.Lock() + user.buckets.buckets = user.buckets.buckets[1:] + user.buckets.Unlock() + idx-- + } + } + + if deleteBucket { + // Check for empty buckets. + user.buckets.Lock() + i.TSDBState.backfillDBs.tsdbsMtx.Lock() + if len(user.buckets.buckets) == 0 { + delete(i.TSDBState.backfillDBs.tsdbs, user.userID) + } + i.TSDBState.backfillDBs.tsdbsMtx.Unlock() + user.buckets.Unlock() + } + } + + return merr.Err() +} + +// Assumes 1h bucket range for . TODO(codesome): protect stuff with locks. +type backfillTSDBs struct { + tsdbsMtx sync.RWMutex + compactShipDeleteMtx sync.Mutex + tsdbs map[string]*tsdbBuckets +} + +func newBackfillTSDBs() *backfillTSDBs { + return &backfillTSDBs{ + tsdbs: make(map[string]*tsdbBuckets), + } +} + +func (b *backfillTSDBs) getBucketsForUser(userID string) *tsdbBuckets { + b.tsdbsMtx.RLock() + defer b.tsdbsMtx.RUnlock() + return b.tsdbs[userID] +} + +func (b *backfillTSDBs) getOrCreateNewUser(userID string) *tsdbBuckets { + b.tsdbsMtx.Lock() + defer b.tsdbsMtx.Unlock() + buckets, ok := b.tsdbs[userID] + if !ok { + buckets = &tsdbBuckets{} + b.tsdbs[userID] = buckets + } + return buckets +} + +type tsdbBuckets struct { + sync.RWMutex + buckets []*tsdbBucket +} + +type tsdbBucket struct { + id int // This is any number but should be unique among all buckets of a user. + db *userTSDB + bucketStart, bucketEnd int64 +} + +func (b *tsdbBucket) overlaps(mint, maxt int64) bool { + return overlapsOpenInterval(b.bucketStart, b.bucketEnd, mint, maxt) +} + +func overlapsOpenInterval(mint1, maxt1, mint2, maxt2 int64) bool { + return mint1 < maxt2 && mint2 < maxt1 +} + +// getBucketName returns the string representation of the bucket. +// YYYY_MM_DD_HH_YYYY_MM_DD_HH +func getBucketName(start, end int64) string { + startTime := model.Time(start).Time().UTC() + endTime := model.Time(end).Time().UTC() + + return fmt.Sprintf( + "%04d_%02d_%02d_%02d_%04d_%02d_%02d_%02d", + startTime.Year(), startTime.Month(), startTime.Day(), startTime.Hour(), + endTime.Year(), endTime.Month(), endTime.Day(), endTime.Hour(), + ) +} + +func getBucketRangesForTimestamp(ts int64, bucketSize int) (int64, int64) { + // TODO(codesome): Replace this entire thing with 1-2 simple equations + // to align ts with the nearest hours which also aligns with bucketSize. + t := time.Unix(ts/1000, 0).UTC() + yyyy := t.Year() + mm := t.Month() + dd := t.Day() + hh := bucketSize * (t.Hour() / bucketSize) + t = time.Date(yyyy, mm, dd, hh, 0, 0, 0, time.UTC) + + start := t.Unix() * 1000 + end := start + int64(time.Duration(bucketSize)*time.Hour/time.Millisecond) + + return start, end +} + +func getBucketRangesForBucketName(bucketName string) (int64, int64, error) { + // TODO(codesome) use time.Parse. + + // YYYY_MM_DD_HH_YYYY_MM_DD_HH + // 012345678901234567890123456 + if len(bucketName) != 27 { + return 0, 0, errors.New("Invalid bucket name") + } + + startYYYY, err := strconv.Atoi(bucketName[0:4]) + if err != nil { + return 0, 0, err + } + startMM, err := strconv.Atoi(bucketName[5:7]) + if err != nil { + return 0, 0, err + } + startDD, err := strconv.Atoi(bucketName[8:10]) + if err != nil { + return 0, 0, err + } + startHH, err := strconv.Atoi(bucketName[11:13]) + if err != nil { + return 0, 0, err + } + + endYYYY, err := strconv.Atoi(bucketName[14:18]) + if err != nil { + return 0, 0, err + } + endMM, err := strconv.Atoi(bucketName[19:21]) + if err != nil { + return 0, 0, err + } + endDD, err := strconv.Atoi(bucketName[22:24]) + if err != nil { + return 0, 0, err + } + endHH, err := strconv.Atoi(bucketName[25:27]) + if err != nil { + return 0, 0, err + } + + startTime := time.Date(startYYYY, time.Month(startMM), startDD, startHH, 0, 0, 0, time.UTC) + endTime := time.Date(endYYYY, time.Month(endMM), endDD, endHH, 0, 0, 0, time.UTC) + + start := startTime.Unix() * 1000 + end := endTime.Unix() * 1000 + + return start, end, nil +} diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 7244b85ebb..bfd54f774f 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -2,6 +2,7 @@ package ingester import ( "context" + "encoding/json" "fmt" "io" "io/ioutil" @@ -26,6 +27,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/shipper" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" @@ -1166,6 +1168,7 @@ func newIngesterMockWithTSDBStorageAndLimits(ingesterCfg Config, limits validati ingesterCfg.BlocksStorageConfig.TSDB.Dir = dir ingesterCfg.BlocksStorageConfig.Backend = "s3" ingesterCfg.BlocksStorageConfig.S3.Endpoint = "localhost" + ingesterCfg.BlocksStorageConfig.TSDB.BackfillDir = filepath.Join(dir, "backfill_tsdb") ingester, err := NewV2(ingesterCfg, clientCfg, overrides, registerer) if err != nil { @@ -1293,6 +1296,8 @@ func TestIngester_shipBlocks(t *testing.T) { } type shipperMock struct { + db *userTSDB + uploaded int mock.Mock } @@ -1767,3 +1772,201 @@ func TestIngester_CloseTSDBsOnShutdown(t *testing.T) { db = i.getTSDB(userID) require.Nil(t, db) } + +func TestIngesterV2BackfillCycle(t *testing.T) { + cfg := defaultIngesterTestConfig() + cfg.LifecyclerConfig.JoinAfter = 0 + backfillLimit := 12 * time.Hour + cfg.BlocksStorageConfig.TSDB.BackfillLimit = backfillLimit + + i, cleanup, err := newIngesterMockWithTSDBStorage(cfg, nil) + require.NoError(t, err) + t.Cleanup(cleanup) + + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + t.Cleanup(func() { + _ = services.StopAndAwaitTerminated(context.Background(), i) + }) + + // Wait until it's ACTIVE + test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + userID := "testuser" + ctx := user.InjectOrgID(context.Background(), userID) + + expectedIngested := make([]client.TimeSeries, 0) + + ingestSample := func(ts int64, numBackfillTSDBs int, errExpected bool) { + t.Helper() + + metricLabelAdapters := []client.LabelAdapter{{Name: labels.MetricName, Value: fmt.Sprintf("test%d", len(expectedIngested))}} + metricLabels := client.FromLabelAdaptersToLabels(metricLabelAdapters) + _, err = i.v2Push(ctx, client.ToWriteRequest( + []labels.Labels{metricLabels}, + []client.Sample{{TimestampMs: ts}}, + nil, client.API), + ) + + numBuckets := 0 + if userBuckets := i.TSDBState.backfillDBs.tsdbs[userID]; userBuckets != nil { + numBuckets = len(userBuckets.buckets) + } + require.Equal(t, numBackfillTSDBs, numBuckets) + if !errExpected { + require.NoError(t, err) + expectedIngested = append(expectedIngested, client.TimeSeries{ + Labels: metricLabelAdapters, + Samples: []client.Sample{{TimestampMs: ts}}, + }) + } else { + require.Equal(t, httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(errors.Wrapf(storage.ErrOutOfBounds, "series=%s, timestamp=%s", metricLabels.String(), model.Time(ts).Time().UTC().Format(time.RFC3339Nano)), userID).Error()), err) + } + } + + testQuery := func() { + res, err := i.v2Query(ctx, &client.QueryRequest{ + StartTimestampMs: math.MinInt64, + EndTimestampMs: math.MaxInt64, + Matchers: []*client.LabelMatcher{{Type: client.REGEX_MATCH, Name: labels.MetricName, Value: ".*"}}, + }) + require.NoError(t, err) + require.NotNil(t, res) + assert.Equal(t, expectedIngested, res.Timeseries) + } + + // Samples for 100h. The main tsdb will be able to handle samples only + // upto 99h after this. + ts := 100 * time.Hour.Milliseconds() + ingestSample(ts, 0, false) + + // 99.5h, to the main TSDB. + ts = 99*time.Hour.Milliseconds() + 30*time.Minute.Milliseconds() + ingestSample(ts, 0, false) + + // 99h, to the main TSDB. + ts = 99 * time.Hour.Milliseconds() + ingestSample(ts, 0, false) + + // 99h-1s for a backfill TSDB. + ts = 99*time.Hour.Milliseconds() - 1*time.Second.Milliseconds() + ingestSample(ts, 1, false) + + // 98.5h to backfill in the same TSDB. + ts = 98*time.Hour.Milliseconds() + 30*time.Minute.Milliseconds() + ingestSample(ts, 1, false) + + // 96.5h to jump a bucket in between. + ts = 96*time.Hour.Milliseconds() + 30*time.Minute.Milliseconds() + ingestSample(ts, 2, false) + + // 97.5h for the gap between 2 buckets. + ts = 97*time.Hour.Milliseconds() + 30*time.Minute.Milliseconds() + ingestSample(ts, 3, false) + + // 100h-12h+1ms, testing near the edge. + ts = (100-12)*time.Hour.Milliseconds() + 1*time.Millisecond.Milliseconds() + ingestSample(ts, 4, false) + + // 100h-backfillLimit, out of bounds even for backfill. + ts = 100*time.Hour.Milliseconds() - backfillLimit.Milliseconds() + ingestSample(ts, 4, true) + + // 99h-backfillLimit, out of bounds even for backfill. + ts = 99*time.Hour.Milliseconds() - backfillLimit.Milliseconds() + ingestSample(ts, 4, true) + + // Query back all the samples. + testQuery() + + // Restart to check if we can still query backfill TSDBs. + + // Stop ingester. + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), i)) + + overrides, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + i, err = NewV2(i.cfg, defaultClientTestConfig(), overrides, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + + // Wait until it's ACTIVE + test.Poll(t, 10*time.Millisecond, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) + + // Query back all the samples. + testQuery() + + // Compact old blocks partially to check + // * Compaction is happening properly. + // * We can still query it while we have a mix of compacted and uncompacted TSDBs. + + // Check there are no blocks yet and attach and mock shipper. + userBuckets := i.TSDBState.backfillDBs.getBucketsForUser(userID) + for _, bucket := range userBuckets.buckets { + require.Equal(t, 0, len(bucket.db.Blocks())) + + m := &shipperMock{ + db: bucket.db, + } + m.On("Sync", mock.Anything).Run(func(args mock.Arguments) { + var shipperMeta shipper.Meta + shipperMeta.Version = shipper.MetaVersion1 + for _, block := range m.db.Blocks() { + shipperMeta.Uploaded = append(shipperMeta.Uploaded, block.Meta().ULID) + } + + b, err := json.Marshal(&shipperMeta) + if err != nil { + return + } + + path := filepath.Join(m.db.Dir(), shipper.MetaFilename) + _ = ioutil.WriteFile(path, b, os.ModePerm) + m.uploaded = len(shipperMeta.Uploaded) + }).Return(1, nil) + bucket.db.shipper = m + } + + // Compacting the oldest 2 buckets. They are <=97h, so compacting buckets upto 97.5h (current 100h minus 97.5h is the grace period). + require.NoError(t, i.compactOldBackfillTSDBsAndShip(2*time.Hour.Milliseconds()+30*time.Minute.Milliseconds())) + for idx, bucket := range userBuckets.buckets { + if idx < 2 { + require.Equal(t, 1, len(bucket.db.Blocks()), "bucket index %d", idx) + } else { + require.Equal(t, 0, len(bucket.db.Blocks()), "bucket index %d", idx) + } + } + + // We can still query compacted blocks. + testQuery() + + copiedBuckets := append([]*tsdbBucket{}, userBuckets.buckets...) + + // Closing the old TSDBs and deleting them. Starting with the shipped blocks. + // Closing is based on current time. Hence grace period is w.r.t. current time. + nowTimeMs := time.Now().Unix() * 1000 + gracePeriod := nowTimeMs - (97*time.Hour.Milliseconds() + 30*time.Minute.Milliseconds()) + require.NoError(t, i.closeOldBackfillTSDBsAndDelete(gracePeriod)) + require.Equal(t, 2, len(userBuckets.buckets)) + for idx, bucket := range userBuckets.buckets { + require.Equal(t, 0, len(bucket.db.Blocks()), "bucket index %d", idx) + } + + // Delete the pending TSDBs. + gracePeriod = nowTimeMs - (100 * time.Hour.Milliseconds()) + require.NoError(t, i.closeOldBackfillTSDBsAndDelete(gracePeriod)) + require.Equal(t, 0, len(userBuckets.buckets)) + + // Check from the copied buckets if all of them had at 1 shipped block. + // The last 2 buckets were not compacted explicitly before. + for idx, buk := range copiedBuckets { + s, ok := buk.db.shipper.(*shipperMock) + require.True(t, ok) + require.Equal(t, 1, s.uploaded, "uploaded - bucket index %d", idx) + require.Equal(t, 1, len(buk.db.Blocks()), "blocks - bucket index %d", idx) + } +} diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 61ff7e3e33..583cca90ce 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -54,6 +54,7 @@ var ( errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency") errInvalidStripeSize = errors.New("invalid TSDB stripe size") errEmptyBlockranges = errors.New("empty block ranges for TSDB") + errEmptyBackfillDir = errors.New("empty backfill directory") ) // BlocksStorageConfig holds the config information for the blocks storage. @@ -146,6 +147,8 @@ type TSDBConfig struct { StripeSize int `yaml:"stripe_size"` WALCompressionEnabled bool `yaml:"wal_compression_enabled"` FlushBlocksOnShutdown bool `yaml:"flush_blocks_on_shutdown"` + BackfillDir string `yaml:"backfill_dir"` + BackfillLimit time.Duration `yaml:"backfill_limit"` // MaxTSDBOpeningConcurrencyOnStartup limits the number of concurrently opening TSDB's during startup. MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"` @@ -173,6 +176,8 @@ func (cfg *TSDBConfig) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.StripeSize, "experimental.blocks-storage.tsdb.stripe-size", 16384, "The number of shards of series to use in TSDB (must be a power of 2). Reducing this will decrease memory footprint, but can negatively impact performance.") f.BoolVar(&cfg.WALCompressionEnabled, "experimental.blocks-storage.tsdb.wal-compression-enabled", false, "True to enable TSDB WAL compression.") f.BoolVar(&cfg.FlushBlocksOnShutdown, "experimental.blocks-storage.tsdb.flush-blocks-on-shutdown", false, "If true, and transfer of blocks on shutdown fails or is disabled, incomplete blocks are flushed to storage instead. If false, incomplete blocks will be reused after restart, and uploaded when finished.") + f.StringVar(&cfg.BackfillDir, "experimental.blocks-storage.tsdb.backfill-dir", "backfill_tsdb", "Local directory to store backfill TSDBs in the ingesters.") + f.DurationVar(&cfg.BackfillLimit, "experimental.blocks-storage.tsdb.backfill-max-age", 0, "Maximum accepted sample age by backfilling. 0 disables it.") } // Validate the config. @@ -197,6 +202,10 @@ func (cfg *TSDBConfig) Validate() error { return errEmptyBlockranges } + if cfg.BackfillLimit > 0 && cfg.BackfillDir == "" { + return errEmptyBackfillDir + } + return nil } @@ -206,6 +215,12 @@ func (cfg *TSDBConfig) BlocksDir(userID string) string { return filepath.Join(cfg.Dir, userID) } +// BackfillBlocksDir returns the directory path where TSDB blocks and wal used for +// backfilling should be stored by the ingester +func (cfg *TSDBConfig) BackfillBlocksDir(userID string) string { + return filepath.Join(cfg.BackfillDir, userID) +} + // BucketStoreConfig holds the config information for Bucket Stores used by the querier type BucketStoreConfig struct { SyncDir string `yaml:"sync_dir"` diff --git a/pkg/storage/tsdb/config_test.go b/pkg/storage/tsdb/config_test.go index 8f76633bbb..9ee36699e3 100644 --- a/pkg/storage/tsdb/config_test.go +++ b/pkg/storage/tsdb/config_test.go @@ -250,6 +250,43 @@ func TestConfig_Validate(t *testing.T) { }, expectedErr: errEmptyBlockranges, }, + "should pass on non empty backfill dir": { + config: BlocksStorageConfig{ + Backend: "s3", + TSDB: TSDBConfig{ + HeadCompactionInterval: 1 * time.Minute, + HeadCompactionConcurrency: 5, + StripeSize: 1 << 14, + BlockRanges: DurationList{1 * time.Minute}, + BackfillLimit: 6 * time.Hour, + BackfillDir: "somedir", + }, + BucketStore: BucketStoreConfig{ + IndexCache: IndexCacheConfig{ + Backend: "inmemory", + }, + }, + }, + expectedErr: nil, + }, + "should fail on empty backfill dir": { + config: BlocksStorageConfig{ + Backend: "s3", + TSDB: TSDBConfig{ + HeadCompactionInterval: 1 * time.Minute, + HeadCompactionConcurrency: 5, + StripeSize: 1 << 14, + BlockRanges: DurationList{1 * time.Minute}, + BackfillLimit: 6 * time.Hour, + }, + BucketStore: BucketStoreConfig{ + IndexCache: IndexCacheConfig{ + Backend: "inmemory", + }, + }, + }, + expectedErr: errEmptyBackfillDir, + }, } for testName, testData := range tests {