Skip to content

Ingest samples older than 1h for block store #2819

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,14 @@ blocks_storage:
# CLI flag: -experimental.blocks-storage.tsdb.flush-blocks-on-shutdown
[flush_blocks_on_shutdown: <boolean> | default = false]

# Local directory to store backfill TSDBs in the ingesters.
# CLI flag: -experimental.blocks-storage.tsdb.backfill-dir
[backfill_dir: <string> | default = "backfill_tsdb"]

# Maximum accepted sample age by backfilling. 0 disables it.
# CLI flag: -experimental.blocks-storage.tsdb.backfill-max-age
[backfill_limit: <duration> | default = 0s]

# limit the number of concurrently opening TSDB's on startup
# CLI flag: -experimental.blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup
[max_tsdb_opening_concurrency_on_startup: <int> | default = 10]
Expand Down
8 changes: 8 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,14 @@ blocks_storage:
# CLI flag: -experimental.blocks-storage.tsdb.flush-blocks-on-shutdown
[flush_blocks_on_shutdown: <boolean> | default = false]

# Local directory to store backfill TSDBs in the ingesters.
# CLI flag: -experimental.blocks-storage.tsdb.backfill-dir
[backfill_dir: <string> | default = "backfill_tsdb"]

# Maximum accepted sample age by backfilling. 0 disables it.
# CLI flag: -experimental.blocks-storage.tsdb.backfill-max-age
[backfill_limit: <duration> | default = 0s]

# limit the number of concurrently opening TSDB's on startup
# CLI flag: -experimental.blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup
[max_tsdb_opening_concurrency_on_startup: <int> | default = 10]
Expand Down
8 changes: 8 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3251,6 +3251,14 @@ tsdb:
# CLI flag: -experimental.blocks-storage.tsdb.flush-blocks-on-shutdown
[flush_blocks_on_shutdown: <boolean> | default = false]

# Local directory to store backfill TSDBs in the ingesters.
# CLI flag: -experimental.blocks-storage.tsdb.backfill-dir
[backfill_dir: <string> | default = "backfill_tsdb"]

# Maximum accepted sample age by backfilling. 0 disables it.
# CLI flag: -experimental.blocks-storage.tsdb.backfill-max-age
[backfill_limit: <duration> | default = 0s]

# limit the number of concurrently opening TSDB's on startup
# CLI flag: -experimental.blocks-storage.tsdb.max-tsdb-opening-concurrency-on-startup
[max_tsdb_opening_concurrency_on_startup: <int> | default = 10]
Expand Down
159 changes: 138 additions & 21 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package ingester

import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"math"
"net/http"
"os"
Expand All @@ -12,6 +14,7 @@ import (
"time"

"github.com/go-kit/kit/log/level"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand All @@ -20,6 +23,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/shipper"
Expand Down Expand Up @@ -119,10 +123,42 @@ func (u *userTSDB) setLastUpdate(t time.Time) {
u.lastUpdate.Store(t.Unix())
}

func (u *userTSDB) getShippedBlocksULID() ([]ulid.ULID, error) {
b, err := ioutil.ReadFile(filepath.Join(u.Dir(), shipper.MetaFilename))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things:

  1. I would use shipper.ReadMetaFile() to simplify this function
  2. If the file does not exists, it should return an empty list of block IDs and no error (it's an OK error if no block has been shipped yet)

if err != nil {
return nil, errors.Wrap(err, "read shipper meta file")
}
var shipperMeta shipper.Meta
if err := json.Unmarshal(b, &shipperMeta); err != nil {
return nil, errors.Wrap(err, "unmarshal shipper meta file to json")
}
return shipperMeta.Uploaded, nil
}

func (u *userTSDB) getUnshippedBlocksULID() (unshipped []ulid.ULID, err error) {
shippedBlocks, err := u.getShippedBlocksULID()
if err != nil {
return nil, errors.Wrap(err, "get shipped blocks")
}

Outer:
for _, b := range u.Blocks() {
for _, uid := range shippedBlocks {
if uid == b.Meta().ULID {
continue Outer
}
}
unshipped = append(unshipped, b.Meta().ULID)
}

return unshipped, nil
}

// TSDBState holds data structures used by the TSDB storage engine
type TSDBState struct {
dbs map[string]*userTSDB // tsdb sharded by userID
bucket objstore.Bucket
dbs map[string]*userTSDB // tsdb sharded by userID
backfillDBs *backfillTSDBs
bucket objstore.Bucket

// Value used by shipper as external label.
shipperIngesterID string
Expand Down Expand Up @@ -157,6 +193,7 @@ func newTSDBState(bucketClient objstore.Bucket, registerer prometheus.Registerer
tsdbMetrics: newTSDBMetrics(registerer),
forceCompactTrigger: make(chan chan<- struct{}),
shipTrigger: make(chan chan<- struct{}),
backfillDBs: newBackfillTSDBs(),

compactionsTriggered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_tsdb_compactions_triggered_total",
Expand Down Expand Up @@ -274,6 +311,11 @@ func (i *Ingester) startingV2(ctx context.Context) error {
return errors.Wrap(err, "opening existing TSDBs")
}

// Scan and open backfill TSDB's that already exist on disk.
if err := i.openExistingBackfillTSDB(context.Background()); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should pass ctx, not context.Background().

return errors.Wrap(err, "opening existing backfill TSDBs")
}

// Important: we want to keep lifecycler running until we ask it to stop, so we need to give it independent context
if err := i.lifecycler.StartAsync(context.Background()); err != nil {
return errors.Wrap(err, "failed to start lifecycler")
Expand Down Expand Up @@ -420,6 +462,7 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien

// Walk the samples, appending them to the users database
app := db.Appender()
var backfillApp *backfillAppender
for _, ts := range req.Timeseries {
// Check if we already have a cached reference for this series. Be aware
// that even if we have a reference it's not guaranteed to be still valid.
Expand Down Expand Up @@ -460,13 +503,27 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien
}
}

cause := errors.Cause(err)
if cause == storage.ErrOutOfBounds &&
i.cfg.BlocksStorageConfig.TSDB.BackfillLimit > 0 &&
s.TimestampMs > db.Head().MaxTime()-i.cfg.BlocksStorageConfig.TSDB.BackfillLimit.Milliseconds() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. There could be any case when db.Head().MaxTime() math.MinInt64?
  2. But should be based on head max time or time.Now()? The "max age" could be also see an compared to "now". All other time-based limits we have are based on "now".

if backfillApp == nil {
backfillApp = i.newBackfillAppender(userID)
}
err := backfillApp.add(ts.Labels, s)
if err == nil {
succeededSamplesCount++
continue
}
cause = errors.Cause(err)
}

failedSamplesCount++

// Check if the error is a soft error we can proceed on. If so, we keep track
// of it, so that we can return it back to the distributor, which will return a
// 400 error to the client. The client (Prometheus) will not retry on 400, and
// we actually ingested all samples which haven't failed.
cause := errors.Cause(err)
if cause == storage.ErrOutOfBounds || cause == storage.ErrOutOfOrderSample || cause == storage.ErrDuplicateSampleForTimestamp {
if firstPartialErr == nil {
firstPartialErr = errors.Wrapf(err, "series=%s, timestamp=%v", client.FromLabelAdaptersToLabels(ts.Labels).String(), model.Time(s.TimestampMs).Time().UTC().Format(time.RFC3339Nano))
Expand Down Expand Up @@ -495,8 +552,13 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien
}

// The error looks an issue on our side, so we should rollback
if rollbackErr := app.Rollback(); rollbackErr != nil {
level.Warn(util.Logger).Log("msg", "failed to rollback on error", "user", userID, "err", rollbackErr)
var merr tsdb_errors.MultiError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming merr to rollbackErr as was previously could help to clarify what this error is about.

merr.Add(errors.Wrap(app.Rollback(), "main appender"))
if backfillApp != nil {
merr.Add(errors.Wrap(backfillApp.rollback(), "backfill appender"))
}
if merr.Err() != nil {
level.Warn(util.Logger).Log("msg", "failed to rollback on error", "user", userID, "err", merr.Err())
}

return nil, wrapWithUser(err, userID)
Expand All @@ -507,8 +569,13 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien
i.TSDBState.appenderAddDuration.Observe(time.Since(startAppend).Seconds())

startCommit := time.Now()
if err := app.Commit(); err != nil {
return nil, wrapWithUser(err, userID)
var merr tsdb_errors.MultiError
merr.Add(errors.Wrap(app.Commit(), "main appender"))
if backfillApp != nil {
merr.Add(errors.Wrap(backfillApp.commit(), "backfill appender"))
}
if merr.Err() != nil {
return nil, wrapWithUser(merr.Err(), userID)
}
i.TSDBState.appenderCommitDuration.Observe(time.Since(startCommit).Seconds())

Expand Down Expand Up @@ -571,6 +638,16 @@ func (i *Ingester) v2Query(ctx context.Context, req *client.QueryRequest) (*clie
return nil, ss.Err()
}

backfillSSs, err := i.backfillSelect(ctx, userID, int64(from), int64(through), matchers)
if err != nil {
return nil, err
}
if len(backfillSSs) > 0 {
// TODO(codesome): If any TSDB in backfill buckets were overlapping
// with main TSDB, then use tsdb.NewMergedVerticalSeriesSet
ss = tsdb.NewMergedSeriesSet(append(backfillSSs, ss))
}

numSamples := 0

result := &client.QueryResponse{}
Expand Down Expand Up @@ -802,6 +879,16 @@ func (i *Ingester) v2QueryStream(req *client.QueryRequest, stream client.Ingeste
return ss.Err()
}

backfillSSs, err := i.backfillSelect(ctx, userID, int64(from), int64(through), matchers)
if err != nil {
return err
}
if len(backfillSSs) > 0 {
// TODO(codesome): If any TSDB in backfill buckets were overlapping
// with main TSDB, then use tsdb.NewMergedVerticalSeriesSet
ss = tsdb.NewMergedSeriesSet(append(backfillSSs, ss))
}

timeseries := make([]client.TimeSeries, 0, queryStreamBatchSize)
batchSize := 0
numSamples := 0
Expand Down Expand Up @@ -921,11 +1008,28 @@ func (i *Ingester) getOrCreateTSDB(userID string, force bool) (*userTSDB, error)
// createTSDB creates a TSDB for a given userID, and returns the created db.
func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
tsdbPromReg := prometheus.NewRegistry()
udir := i.cfg.BlocksStorageConfig.TSDB.BlocksDir(userID)
userLogger := util.WithUserID(userID, util.Logger)

blockRanges := i.cfg.BlocksStorageConfig.TSDB.BlockRanges.ToMilliseconds()
userDB, err := i.createNewTSDB(
userID,
i.cfg.BlocksStorageConfig.TSDB.BlocksDir(userID),
blockRanges[0],
blockRanges[len(blockRanges)-1],
tsdbPromReg,
)
if err == nil {
// We set the limiter here because we don't want to limit
// series during WAL replay.
userDB.limiter = i.limiter
i.TSDBState.tsdbMetrics.setRegistryForUser(userID, tsdbPromReg)
}

return userDB, err
}

// createNewTSDB creates a TSDB for a given userID and data directory.
func (i *Ingester) createNewTSDB(userID, dbDir string, minBlockDuration, maxBlockDuration int64, reg *prometheus.Registry) (*userTSDB, error) {
userLogger := util.WithUserID(userID, util.Logger)
userDB := &userTSDB{
userID: userID,
refCache: cortex_tsdb.NewRefCache(),
Expand All @@ -936,17 +1040,17 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
}

// Create a new user database
db, err := tsdb.Open(udir, userLogger, tsdbPromReg, &tsdb.Options{
db, err := tsdb.Open(dbDir, userLogger, reg, &tsdb.Options{
RetentionDuration: i.cfg.BlocksStorageConfig.TSDB.Retention.Milliseconds(),
MinBlockDuration: blockRanges[0],
MaxBlockDuration: blockRanges[len(blockRanges)-1],
MinBlockDuration: minBlockDuration,
MaxBlockDuration: maxBlockDuration,
NoLockfile: true,
StripeSize: i.cfg.BlocksStorageConfig.TSDB.StripeSize,
WALCompression: i.cfg.BlocksStorageConfig.TSDB.WALCompressionEnabled,
SeriesLifecycleCallback: userDB,
})
if err != nil {
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)
return nil, errors.Wrapf(err, "failed to open TSDB: %s", dbDir)
}
db.DisableCompactions() // we will compact on our own schedule

Expand All @@ -956,13 +1060,10 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
level.Info(userLogger).Log("msg", "Running compaction after WAL replay")
err = db.Compact()
if err != nil {
return nil, errors.Wrapf(err, "failed to compact TSDB: %s", udir)
return nil, errors.Wrapf(err, "failed to compact TSDB: %s", dbDir)
}

userDB.DB = db
// We set the limiter here because we don't want to limit
// series during WAL replay.
userDB.limiter = i.limiter
userDB.setLastUpdate(time.Now()) // After WAL replay.

// Thanos shipper requires at least 1 external label to be set. For this reason,
Expand All @@ -982,16 +1083,15 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
if i.cfg.BlocksStorageConfig.TSDB.ShipInterval > 0 {
userDB.shipper = shipper.New(
userLogger,
tsdbPromReg,
udir,
reg,
dbDir,
cortex_tsdb.NewUserBucketClient(userID, i.TSDBState.bucket),
func() labels.Labels { return l },
metadata.ReceiveSource,
true, // Allow out of order uploads. It's fine in Cortex's context.
)
}

i.TSDBState.tsdbMetrics.setRegistryForUser(userID, tsdbPromReg)
return userDB, nil
}

Expand Down Expand Up @@ -1023,8 +1123,15 @@ func (i *Ingester) closeAllTSDB() {
}(userDB)
}

// Wait until all Close() completed
wg.Add(1)
go func() {
defer wg.Done()
i.closeAllBackfillTSDBs()
}()

Comment on lines +1126 to +1131
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do it after i.userStatesMtx.Unlock(). Also to further simplify the code, I would run wg.Wait() to wait until all user TSDBs are closed, and then I would call closeAllBackfillTSDBs() outside of any go routine.

i.userStatesMtx.Unlock()

// Wait until all Close() completed
wg.Wait()
}

Expand Down Expand Up @@ -1177,9 +1284,17 @@ func (i *Ingester) compactionLoop(ctx context.Context) error {
select {
case <-ticker.C:
i.compactBlocks(ctx, false)
if err := i.compactOldBackfillTSDBsAndShip(i.cfg.BlocksStorageConfig.TSDB.BackfillLimit.Milliseconds()); err != nil {
level.Warn(util.Logger).Log("msg", "failed to compact and ship old backfill TSDBs", "err", err)
}

if err := i.closeOldBackfillTSDBsAndDelete(i.cfg.MaxChunkAge.Milliseconds()); err != nil {
level.Warn(util.Logger).Log("msg", "failed to close and delete old backfill TSDBs", "err", err)
}

case ch := <-i.TSDBState.forceCompactTrigger:
i.compactBlocks(ctx, true)
i.compactAllBackfillTSDBs(ctx)

// Notify back.
select {
Expand Down Expand Up @@ -1289,8 +1404,10 @@ func (i *Ingester) v2LifecyclerFlush() {
ctx := context.Background()

i.compactBlocks(ctx, true)
i.compactAllBackfillTSDBs(ctx)
if i.cfg.BlocksStorageConfig.TSDB.ShipInterval > 0 {
i.shipBlocks(ctx)
i.shipAllBackfillTSDBs(ctx)
}

level.Info(util.Logger).Log("msg", "finished flushing and shipping TSDB blocks")
Expand Down
Loading