Skip to content

Commit af05e55

Browse files
author
Thor
committed
Review fixes
Signed-off-by: Thor <[email protected]>
1 parent 2486b15 commit af05e55

File tree

3 files changed

+38
-38
lines changed

3 files changed

+38
-38
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
* [FEATURE] Added `global` ingestion rate limiter strategy. Deprecated `-distributor.limiter-reload-period` flag. #1766
1717
* [FEATURE] Added support for Microsoft Azure blob storage to be used for storing chunk data. #1913
1818
* [BUGFIX] Fixed unnecessary CAS operations done by the HA tracker when the jitter is enabled. #1861
19+
* [ENHANCEMENT] Experimental TSDB: Open existing TSDB on startup to prevent ingester from becoming ready before it can accept writes. #1917
20+
* --experimental.tsdb.max-tsdb-opening-concurrency-on-startup
1921

2022
## 0.4.0 / 2019-12-02
2123

pkg/ingester/ingester_v2.go

Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -465,59 +465,57 @@ func (i *Ingester) closeAllTSDB() {
465465
// openExistingTSDB walks the user tsdb dir, and opens a tsdb for each user. This may start a WAL replay, so we limit the number of
466466
// concurrently opening TSDB.
467467
func (i *Ingester) openExistingTSDB(ctx context.Context) error {
468+
level.Info(util.Logger).Log("msg", "opening existing TSDBs")
468469
wg := &sync.WaitGroup{}
469-
openGate := gate.New(i.cfg.TSDBConfig.MaxOpeningTSDBOnStartup)
470+
openGate := gate.New(i.cfg.TSDBConfig.MaxTSDBOpeningConcurrencyOnStartup)
470471

471472
err := filepath.Walk(i.cfg.TSDBConfig.Dir, func(path string, info os.FileInfo, err error) error {
472-
if path == i.cfg.TSDBConfig.Dir { // Nothing to do for root
473+
474+
// Skip root dir and all other files
475+
if path == i.cfg.TSDBConfig.Dir || !info.IsDir() {
473476
return nil
474477
}
475478

476479
// Top level directories are assumed to be user TSDBs
477-
if info.IsDir() {
480+
userID := info.Name()
481+
f, err := os.Open(path)
482+
if err != nil {
483+
level.Error(util.Logger).Log("msg", "unable to open user TSDB dir", "err", err, "user", userID, "path", path)
484+
return filepath.SkipDir
485+
}
486+
defer f.Close()
478487

479-
userID := info.Name()
480-
f, err := os.Open(path)
481-
if err != nil {
482-
level.Error(util.Logger).Log("msg", "unable to open user TSDB dir", "err", err, "user", userID)
488+
// If the dir is empty skip it
489+
if _, err := f.Readdirnames(1); err != nil {
490+
if err != io.EOF {
491+
level.Error(util.Logger).Log("msg", "unable to read TSDB dir", "err", err, "user", userID, "path", path)
483492
return filepath.SkipDir
484493
}
485-
defer f.Close()
486494

487-
// If the dir is empty skip it
488-
if _, err := f.Readdirnames(1); err != nil {
489-
if err != io.EOF {
490-
level.Error(util.Logger).Log("msg", "unable to read TSDB dir", "err", err, "user", userID)
491-
return filepath.SkipDir
492-
}
495+
// Empty dir
496+
return filepath.SkipDir
497+
}
493498

494-
// Empty dir
495-
return filepath.SkipDir
496-
}
499+
// Limit the number of TSDB's opening concurrently. Start blocks until there's a free spot available or the context is cancelled.
500+
if err := openGate.Start(ctx); err != nil {
501+
return err
502+
}
497503

498-
// Limit the number of TSDB's opening concurrently
499-
if err := openGate.Start(ctx); err != nil {
500-
return err
504+
wg.Add(1)
505+
go func(userID string) {
506+
defer wg.Done()
507+
defer openGate.Done()
508+
_, err := i.getOrCreateTSDB(userID, true) // force create the TSDB due to the lifecycler not having started yet
509+
if err != nil {
510+
level.Error(util.Logger).Log("msg", "unable to open user TSDB", "err", err, "user", userID)
501511
}
512+
}(userID)
502513

503-
wg.Add(1)
504-
go func(userID string) {
505-
defer wg.Done()
506-
defer openGate.Done()
507-
_, err := i.getOrCreateTSDB(userID, true) // force create the TSDB due to the lifecycler not having started yet
508-
if err != nil {
509-
level.Error(util.Logger).Log("msg", "unable to open user TSDB", "err", err, "user", userID)
510-
}
511-
}(userID)
512-
513-
return filepath.SkipDir // Don't descend into directories
514-
}
515-
516-
// Skip all other files
517-
return nil
514+
return filepath.SkipDir // Don't descend into directories
518515
})
519516

520517
// Wait for all opening routines to finish
521518
wg.Wait()
519+
level.Info(util.Logger).Log("msg", "completed opening existing TSDBs")
522520
return err
523521
}

pkg/storage/tsdb/config.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ type Config struct {
3232
Backend string `yaml:"backend"`
3333
BucketStore BucketStoreConfig `yaml:"bucket_store"`
3434

35-
// MaxOpeningTSDBOnStartup limits the number of concurrently opening TSDB's during startup
36-
MaxOpeningTSDBOnStartup int `yaml:"max_opening_tsdb_on_startup"`
35+
// MaxTSDBOpeningConcurrencyOnStartup limits the number of concurrently opening TSDB's during startup
36+
MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"`
3737

3838
// Backends
3939
S3 s3.Config `yaml:"s3"`
@@ -85,7 +85,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
8585
f.DurationVar(&cfg.Retention, "experimental.tsdb.retention-period", 6*time.Hour, "TSDB block retention")
8686
f.DurationVar(&cfg.ShipInterval, "experimental.tsdb.ship-interval", 30*time.Second, "the frequency at which tsdb blocks are scanned for shipping")
8787
f.StringVar(&cfg.Backend, "experimental.tsdb.backend", "s3", "TSDB storage backend to use")
88-
f.IntVar(&cfg.MaxOpeningTSDBOnStartup, "experimental.tsdb.max-opening-tsdb-on-startup", 10, "limit the number of concurrently opening TSDB's on startup")
88+
f.IntVar(&cfg.MaxTSDBOpeningConcurrencyOnStartup, "experimental.tsdb.max-tsdb-opening-concurrency-on-startup", 10, "limit the number of concurrently opening TSDB's on startup")
8989
}
9090

9191
// Validate the config

0 commit comments

Comments
 (0)