diff --git a/CHANGELOG.md b/CHANGELOG.md index 883e675cf9b..41a403e54e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ * [CHANGE] Renamed the cache configuration setting `defaul_validity` to `default_validity`. #2140 * [CHANGE] Removed unused /validate_expr endpoint. #2152 * [CHANGE] Updated Prometheus dependency to v2.16.0. This Prometheus version uses Active Query Tracker to limit concurrent queries. In order to keep `-querier.max-concurrent` working, Active Query Tracker is enabled by default, and is configured to store its data to `active-query-tracker` directory (relative to current directory when Cortex started). This can be changed by using `-querier.active-query-tracker-dir` option. Purpose of Active Query Tracker is to log queries that were running when Cortex crashes. This logging happens on next Cortex start. #2088 +* [CHANGE] Experimental TSDB: TSDB head compaction interval and concurrency is now configurable (defaults to 1 min interval and 5 concurrent head compactions). New options: `-experimental.tsdb.head-compaction-interval` and `-experimental.tsdb.head-compaction-concurrency`. #2172 * [FEATURE] Added a read-only local alertmanager config store using files named corresponding to their tenant id. #2125 * [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947 * `--experimental.distributor.user-subring-size` diff --git a/docs/operations/blocks-storage.md b/docs/operations/blocks-storage.md index 592366ef7cc..f8970712eb9 100644 --- a/docs/operations/blocks-storage.md +++ b/docs/operations/blocks-storage.md @@ -123,6 +123,15 @@ tsdb: # CLI flag: -experimental.tsdb.ship-concurrency [ship_concurrency: | default = 10] + # How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range. + # Must be greater than 0 and max 5 minutes. + # CLI flag: -experimental.tsdb.head-compaction-interval + [head_compaction_interval: | default = 1m] + + # Maximum number of tenants concurrently compacting TSDB head into a new block. + # CLI flag: -experimental.tsdb.head-compaction-concurrency + [head_compaction_concurrency: | default = 5] + # The bucket store configuration applies to queriers and configure how queriers # iteract with the long-term storage backend. bucket_store: diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 7f186f93c9b..62b6866e797 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -65,6 +65,10 @@ type TSDBState struct { transferOnce sync.Once tsdbMetrics *tsdbMetrics + + // Head compactions metrics. + compactionsTriggered prometheus.Counter + compactionsFailed prometheus.Counter } // NewV2 returns a new Ingester that uses prometheus block storage instead of chunk storage @@ -90,6 +94,16 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, dbs: make(map[string]*userTSDB), bucket: bucketClient, tsdbMetrics: newTSDBMetrics(registerer), + + compactionsTriggered: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingester_tsdb_compactions_triggered_total", + Help: "Total number of triggered compactions.", + }), + + compactionsFailed: prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cortex_ingester_tsdb_compactions_failed_total", + Help: "Total number of compactions that failed.", + }), }, } @@ -101,6 +115,8 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, Name: "cortex_ingester_memory_series", Help: "The current number of series in memory.", }, i.numSeriesInTSDB)) + registerer.MustRegister(i.TSDBState.compactionsTriggered) + registerer.MustRegister(i.TSDBState.compactionsFailed) } i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true) @@ -129,6 +145,9 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, i.done.Add(1) go i.updateLoop() + i.done.Add(1) + go i.compactionLoop() + return i, nil } @@ -592,6 +611,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { if err != nil { return nil, err } + db.DisableCompactions() // we will compact on our own schedule userDB := &userTSDB{ DB: db, @@ -785,44 +805,98 @@ func (i *Ingester) shipBlocks() { return } - // Create a pool of workers which will synchronize blocks. The pool size - // is limited in order to avoid to concurrently sync a lot of tenants in - // a large cluster. - workersChan := make(chan string) - wg := &sync.WaitGroup{} - wg.Add(i.cfg.TSDBConfig.ShipConcurrency) + // Number of concurrent workers is limited in order to avoid to concurrently sync a lot + // of tenants in a large cluster. + i.runConcurrentUserWorkers(i.cfg.TSDBConfig.ShipConcurrency, func(userID string) { + // Get the user's DB. If the user doesn't exist, we skip it. + userDB := i.getTSDB(userID) + if userDB == nil || userDB.shipper == nil { + return + } - for j := 0; j < i.cfg.TSDBConfig.ShipConcurrency; j++ { - go func() { - defer wg.Done() + // Skip if the shipper context has been canceled. + if userDB.shipperCtx.Err() != nil { + return + } - for userID := range workersChan { - // Get the user's DB. If the user doesn't exist, we skip it. - userDB := i.getTSDB(userID) - if userDB == nil || userDB.shipper == nil { - continue - } + // Run the shipper's Sync() to upload unshipped blocks. + if uploaded, err := userDB.shipper.Sync(userDB.shipperCtx); err != nil { + level.Warn(util.Logger).Log("msg", "shipper failed to synchronize TSDB blocks with the storage", "user", userID, "uploaded", uploaded, "err", err) + } else { + level.Debug(util.Logger).Log("msg", "shipper successfully synchronized TSDB blocks with storage", "user", userID, "uploaded", uploaded) + } + }) +} - // Skip if the shipper context has been canceled. - if userDB.shipperCtx.Err() != nil { - continue - } +func (i *Ingester) compactionLoop() { + defer i.done.Done() - // Run the shipper's Sync() to upload unshipped blocks. - if uploaded, err := userDB.shipper.Sync(userDB.shipperCtx); err != nil { - level.Warn(util.Logger).Log("msg", "shipper failed to synchronize TSDB blocks with the storage", "user", userID, "uploaded", uploaded, "err", err) - } else { - level.Debug(util.Logger).Log("msg", "shipper successfully synchronized TSDB blocks with storage", "user", userID, "uploaded", uploaded) - } + ticker := time.NewTicker(i.cfg.TSDBConfig.HeadCompactionInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + i.compactBlocks() + + case <-i.quit: + return + } + } +} + +func (i *Ingester) compactBlocks() { + // Don't compact TSDB blocks while JOINING or LEAVING, as there may be ongoing blocks transfers. + if ingesterState := i.lifecycler.GetState(); ingesterState == ring.JOINING || ingesterState == ring.LEAVING { + level.Info(util.Logger).Log("msg", "TSDB blocks compaction has been skipped because of the current ingester state", "state", ingesterState) + return + } + + i.runConcurrentUserWorkers(i.cfg.TSDBConfig.HeadCompactionConcurrency, func(userID string) { + userDB := i.getTSDB(userID) + if userDB == nil { + return + } + + i.TSDBState.compactionsTriggered.Inc() + err := userDB.Compact() + if err != nil { + i.TSDBState.compactionsFailed.Inc() + level.Warn(util.Logger).Log("msg", "TSDB blocks compaction for user has failed", "user", userID, "err", err) + } else { + level.Debug(util.Logger).Log("msg", "TSDB blocks compaction completed successfully", "user", userID) + } + }) +} + +func (i *Ingester) runConcurrentUserWorkers(concurrency int, userFunc func(userID string)) { + wg := sync.WaitGroup{} + ch := make(chan string) + + for ix := 0; ix < concurrency; ix++ { + wg.Add(1) + go func() { + defer wg.Done() + + for userID := range ch { + userFunc(userID) } }() } +sendLoop: for _, userID := range i.getTSDBUsers() { - workersChan <- userID + select { + case ch <- userID: + // ok + case <-i.quit: + // don't start new tasks. + break sendLoop + } } - close(workersChan) - // Wait until all workers completed. + close(ch) + + // wait for ongoing workers to finish. wg.Wait() } diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index d4d64daa272..27c9ee0d3ab 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -114,7 +114,7 @@ func newIngesterMetrics(r prometheus.Registerer, registerMetricsConflictingWithT return m } -// TSDB metrics. Each tenant has its own registry, that TSDB code uses. +// TSDB metrics collector. Each tenant has its own registry, that TSDB code uses. type tsdbMetrics struct { // We aggregate metrics from individual TSDB registries into // a single set of counters, which are exposed as Cortex metrics. diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 2594729f692..0c84200d21f 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -30,19 +30,23 @@ const ( // Validation errors var ( - errUnsupportedBackend = errors.New("unsupported TSDB storage backend") - errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency") + errUnsupportedBackend = errors.New("unsupported TSDB storage backend") + errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency") + errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval") + errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency") ) // Config holds the config information for TSDB storage type Config struct { - Dir string `yaml:"dir"` - BlockRanges DurationList `yaml:"block_ranges_period"` - Retention time.Duration `yaml:"retention_period"` - ShipInterval time.Duration `yaml:"ship_interval"` - ShipConcurrency int `yaml:"ship_concurrency"` - Backend string `yaml:"backend"` - BucketStore BucketStoreConfig `yaml:"bucket_store"` + Dir string `yaml:"dir"` + BlockRanges DurationList `yaml:"block_ranges_period"` + Retention time.Duration `yaml:"retention_period"` + ShipInterval time.Duration `yaml:"ship_interval"` + ShipConcurrency int `yaml:"ship_concurrency"` + Backend string `yaml:"backend"` + BucketStore BucketStoreConfig `yaml:"bucket_store"` + HeadCompactionInterval time.Duration `yaml:"head_compaction_interval"` + HeadCompactionConcurrency int `yaml:"head_compaction_concurrency"` // MaxTSDBOpeningConcurrencyOnStartup limits the number of concurrently opening TSDB's during startup MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"` @@ -108,6 +112,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.IntVar(&cfg.ShipConcurrency, "experimental.tsdb.ship-concurrency", 10, "Maximum number of tenants concurrently shipping blocks to the storage.") f.StringVar(&cfg.Backend, "experimental.tsdb.backend", "s3", "TSDB storage backend to use") f.IntVar(&cfg.MaxTSDBOpeningConcurrencyOnStartup, "experimental.tsdb.max-tsdb-opening-concurrency-on-startup", 10, "limit the number of concurrently opening TSDB's on startup") + f.DurationVar(&cfg.HeadCompactionInterval, "experimental.tsdb.head-compaction-interval", 1*time.Minute, "How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range. Must be greater than 0 and max 5 minutes.") + f.IntVar(&cfg.HeadCompactionConcurrency, "experimental.tsdb.head-compaction-concurrency", 5, "Maximum number of tenants concurrently compacting TSDB head into a new block") } // Validate the config @@ -120,6 +126,14 @@ func (cfg *Config) Validate() error { return errInvalidShipConcurrency } + if cfg.HeadCompactionInterval <= 0 || cfg.HeadCompactionInterval > 5*time.Minute { + return errInvalidCompactionInterval + } + + if cfg.HeadCompactionConcurrency <= 0 { + return errInvalidCompactionConcurrency + } + return nil } diff --git a/pkg/storage/tsdb/config_test.go b/pkg/storage/tsdb/config_test.go index c9ad2af9ac4..0a2514df063 100644 --- a/pkg/storage/tsdb/config_test.go +++ b/pkg/storage/tsdb/config_test.go @@ -17,13 +17,17 @@ func TestConfig_Validate(t *testing.T) { }{ "should pass on S3 backend": { config: Config{ - Backend: "s3", + Backend: "s3", + HeadCompactionInterval: 1 * time.Minute, + HeadCompactionConcurrency: 5, }, expectedErr: nil, }, "should pass on GCS backend": { config: Config{ - Backend: "gcs", + Backend: "gcs", + HeadCompactionInterval: 1 * time.Minute, + HeadCompactionConcurrency: 5, }, expectedErr: nil, }, @@ -43,9 +47,41 @@ func TestConfig_Validate(t *testing.T) { }, "should pass on invalid ship concurrency but shipping is disabled": { config: Config{ - Backend: "s3", - ShipInterval: 0, - ShipConcurrency: 0, + Backend: "s3", + ShipInterval: 0, + ShipConcurrency: 0, + HeadCompactionInterval: 1 * time.Minute, + HeadCompactionConcurrency: 5, + }, + expectedErr: nil, + }, + "should fail on invalid compaction interval": { + config: Config{ + Backend: "s3", + HeadCompactionInterval: 0 * time.Minute, + }, + expectedErr: errInvalidCompactionInterval, + }, + "should fail on too high compaction interval": { + config: Config{ + Backend: "s3", + HeadCompactionInterval: 10 * time.Minute, + }, + expectedErr: errInvalidCompactionInterval, + }, + "should fail on invalid compaction concurrency": { + config: Config{ + Backend: "s3", + HeadCompactionInterval: time.Minute, + HeadCompactionConcurrency: 0, + }, + expectedErr: errInvalidCompactionConcurrency, + }, + "should pass on on valid compaction config": { + config: Config{ + Backend: "s3", + HeadCompactionInterval: time.Minute, + HeadCompactionConcurrency: 10, }, expectedErr: nil, }, diff --git a/pkg/util/grpcclient/ratelimit_test.go b/pkg/util/grpcclient/ratelimit_test.go index 6e7eb7a466d..6a8d6345b9b 100644 --- a/pkg/util/grpcclient/ratelimit_test.go +++ b/pkg/util/grpcclient/ratelimit_test.go @@ -8,7 +8,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - + "github.com/cortexproject/cortex/pkg/util/grpcclient" )