Skip to content

Ingest samples older than 1h for block store #2819

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from

Conversation

codesome
Copy link
Contributor

@codesome codesome commented Jul 1, 2020

What this PR does:

This PR is currently a super early draft for ingesting samples older than 1h. Lots of TODOs.

The fail in out-of-bound test shows that we are currently appending out-of-bound samples.

Which issue(s) this PR fixes:
Fixes #2366

Checklist

  • Ingest samples older than 1h
  • Reload TSDBs on restart
  • Query samples from older TSDBs
  • Support query stream over old TSDBs
  • Automatically cleanup old TSDBs
  • Tests updated
  • Documentation added
  • CHANGELOG.md updated - the order of entries should be [CHANGE], [FEATURE], [ENHANCEMENT], [BUGFIX]

TODO in future PRs

  • Metrics from new TSDBs
  • Limits on the additional TSDB
  • Automatically cleanup stale TSDBs

@bboreham
Copy link
Contributor

bboreham commented Jul 1, 2020

Should probably clarify in the title this is for block store - chunks has ingested old samples forever.

@codesome codesome changed the title Ingest samples older than 1h Ingest samples older than 1h for block store Jul 1, 2020
@codesome codesome marked this pull request as draft July 1, 2020 10:27
Copy link
Contributor

@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @codesome for working on this! I did a very quick first pass and left some design feedback. My suggestion is: try to keep it as simplest and cleanest as possible. It's fine if you need to submit preliminary refactorings to existing code to simplify this PR, but let's try to come up with a clean design.

Please also remember:

  • Add a CHANGELOG entry
  • I would allow to disable backfilling setting a 0 value for the "max age" (left a dedicated comment)

Limits on the additional TSDB

I would leave this outside of this PR. It's fine adding them separately to keep this PR a bit smaller.

@pull-request-size pull-request-size bot added size/XL and removed size/L labels Jul 6, 2020
@codesome codesome force-pushed the ingest-ooold branch 3 times, most recently from 225a602 to fd1fa7f Compare July 29, 2020 06:17
@codesome
Copy link
Contributor Author

The code looks more complex than I desire it to be. I am currently writing more units tests and once I am satisfied with that, I will spend some time in simplifying the code wherever possible before moving onto manual tests.

Copy link
Contributor

@pracucci pracucci left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another partial review, sorry. I think this logic is too much complicated. I stopped reviewing it because I'm wondering if it's worth all such complexity to fix the issue we're trying to fix. Let's talk offline.

Things I would like to discuss (don't jump on coding it immediately):

  • The transfer doesn't support backfill TSDBs. I'm fine with that (I believe we shouldn't), but this made me think if we could simplify the shutdown procedure and actually always snapshot and ship backfill blocks to the storage at shutdown.

@@ -146,6 +147,8 @@ type TSDBConfig struct {
StripeSize int `yaml:"stripe_size"`
WALCompressionEnabled bool `yaml:"wal_compression_enabled"`
FlushBlocksOnShutdown bool `yaml:"flush_blocks_on_shutdown"`
BackfillDir string `yaml:"backfill_dir"`
BackfillLimit time.Duration `yaml:"backfill_limit"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The CLI flag is called backfill-max-age while the YAML config option is backfill_limit. We should keep them consistent. Between the two I believe backfill_max_age is more clear to understand. I would rename the BackfillLimit variable accordingly.

@@ -119,10 +123,42 @@ func (u *userTSDB) setLastUpdate(t time.Time) {
u.lastUpdate.Store(t.Unix())
}

func (u *userTSDB) getShippedBlocksULID() ([]ulid.ULID, error) {
b, err := ioutil.ReadFile(filepath.Join(u.Dir(), shipper.MetaFilename))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two things:

  1. I would use shipper.ReadMetaFile() to simplify this function
  2. If the file does not exists, it should return an empty list of block IDs and no error (it's an OK error if no block has been shipped yet)

@@ -274,6 +311,11 @@ func (i *Ingester) startingV2(ctx context.Context) error {
return errors.Wrap(err, "opening existing TSDBs")
}

// Scan and open backfill TSDB's that already exist on disk.
if err := i.openExistingBackfillTSDB(context.Background()); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should pass ctx, not context.Background().

}

func (i *Ingester) openExistingBackfillTSDB(ctx context.Context) error {
level.Info(util.Logger).Log("msg", "opening existing TSDBs")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
level.Info(util.Logger).Log("msg", "opening existing TSDBs")
level.Info(util.Logger).Log("msg", "opening existing backfill TSDBs")

}

userID := u.Name()
userPath := filepath.Join(i.cfg.BlocksStorageConfig.TSDB.BackfillDir, userID)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you've added the function BackfillBlocksDir() on the config specifically for this 😉

buckets.Unlock()
}
i.TSDBState.backfillDBs.tsdbsMtx.Unlock()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, we should log the success/error similarly to what we do in openExistingTSDB()

Comment on lines +1126 to +1131
wg.Add(1)
go func() {
defer wg.Done()
i.closeAllBackfillTSDBs()
}()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do it after i.userStatesMtx.Unlock(). Also to further simplify the code, I would run wg.Wait() to wait until all user TSDBs are closed, and then I would call closeAllBackfillTSDBs() outside of any go routine.

})
}

func (i *Ingester) runConcurrentBackfillWorkers(ctx context.Context, concurrency int, userFunc func(*userTSDB)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think runConcurrentBackfillWorkers() and runConcurrentUserWorkers() could be generalised into a single function like this:

func runConcurrentUserWorkers(ctx context.Context, userIDs []string, concurrency int, userFunc func(userID string))

Then:

  • You need two functions to generate the list of user IDs. We already have i.getTSDBUsers() then you could do the same for the backfill TSDBs.
  • The callback passed to runConcurrentBackfillWorkers() but looks easy passing the userID to the callback and then having the callback fetching the backfilling TSDBs. I understand the actual parallelisation is not the exact same, but we should strive for simplicity first (without compromising correctness) and then we can always optimise it. As already mentioned a bunch of times, I expect backfilling to be an uncommon use case (I don't want to keep multiple TSDBs open for every tenant any time)

This is a refactoring that could be done in a preliminary PR.

cause := errors.Cause(err)
if cause == storage.ErrOutOfBounds &&
i.cfg.BlocksStorageConfig.TSDB.BackfillLimit > 0 &&
s.TimestampMs > db.Head().MaxTime()-i.cfg.BlocksStorageConfig.TSDB.BackfillLimit.Milliseconds() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. There could be any case when db.Head().MaxTime() math.MinInt64?
  2. But should be based on head max time or time.Now()? The "max age" could be also see an compared to "now". All other time-based limits we have are based on "now".

@@ -495,8 +552,13 @@ func (i *Ingester) v2Push(ctx context.Context, req *client.WriteRequest) (*clien
}

// The error looks an issue on our side, so we should rollback
if rollbackErr := app.Rollback(); rollbackErr != nil {
level.Warn(util.Logger).Log("msg", "failed to rollback on error", "user", userID, "err", rollbackErr)
var merr tsdb_errors.MultiError
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renaming merr to rollbackErr as was previously could help to clarify what this error is about.

@codesome
Copy link
Contributor Author

Superseded by #3025

@codesome codesome closed this Aug 25, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Blocks storage unable to ingest samples older than 1h after an outage
3 participants