Skip to content

Commit 6d4da2c

Browse files
authored
TSDB head compactor concurrency (#2172)
* Run local compactions on our own schedule and goroutines. Signed-off-by: Peter Štibraný <[email protected]> * Fix and add some tests to validation. Signed-off-by: Peter Štibraný <[email protected]> * Refactor common code into runConcurrentUserWorkers method. Signed-off-by: Peter Štibraný <[email protected]> * Tied compaction and shipping together. Signed-off-by: Peter Štibraný <[email protected]> * Updated CHANGELOG.md Signed-off-by: Peter Štibraný <[email protected]> * Updated CHANGELOG.md Signed-off-by: Peter Štibraný <[email protected]> * Updated documentation. Signed-off-by: Peter Štibraný <[email protected]> * Merge blocks. Signed-off-by: Peter Štibraný <[email protected]> * Revert "Tied compaction and shipping together." This reverts commit ac372e9f065f00e909f67d7c3b3557e2974f311c. Signed-off-by: Peter Štibraný <[email protected]> * Revert "Updated documentation." This reverts commit 9adb4cb9b4273ac189a3bb0478090966a0db67ec. Signed-off-by: Peter Štibraný <[email protected]> * Documentation Signed-off-by: Peter Štibraný <[email protected]> * Fixed documentation. Signed-off-by: Peter Štibraný <[email protected]> * Renamed TSDB head compaction options to use "head" prefix. To avoid confusion with compactor, which also has "compaction_interval" config field. Signed-off-by: Peter Štibraný <[email protected]> * Expose compaction metrics. Signed-off-by: Peter Štibraný <[email protected]> * Moved compaction metrics to TSDB state. They are now updated in Cortex code, when we call compaction. Signed-off-by: Peter Štibraný <[email protected]> * Decrease compaction interval. Signed-off-by: Peter Štibraný <[email protected]> * Updated help and changelog entry. Signed-off-by: Peter Štibraný <[email protected]> * Fix default value. Signed-off-by: Peter Štibraný <[email protected]> * Review feedback: - decrease default to 1 min - compaction cannot be disabled - compaction interval must be greater than 0, and <= 5 mins Signed-off-by: Peter Štibraný <[email protected]> * Fix CHANGELOG.md Signed-off-by: Peter Štibraný <[email protected]> * Fix formatting to make lint happy. Signed-off-by: Peter Štibraný <[email protected]>
1 parent f5e3dd7 commit 6d4da2c

File tree

6 files changed

+177
-43
lines changed

6 files changed

+177
-43
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
* [CHANGE] Renamed the cache configuration setting `defaul_validity` to `default_validity`. #2140
2424
* [CHANGE] Removed unused /validate_expr endpoint. #2152
2525
* [CHANGE] Updated Prometheus dependency to v2.16.0. This Prometheus version uses Active Query Tracker to limit concurrent queries. In order to keep `-querier.max-concurrent` working, Active Query Tracker is enabled by default, and is configured to store its data to `active-query-tracker` directory (relative to current directory when Cortex started). This can be changed by using `-querier.active-query-tracker-dir` option. Purpose of Active Query Tracker is to log queries that were running when Cortex crashes. This logging happens on next Cortex start. #2088
26+
* [CHANGE] Experimental TSDB: TSDB head compaction interval and concurrency is now configurable (defaults to 1 min interval and 5 concurrent head compactions). New options: `-experimental.tsdb.head-compaction-interval` and `-experimental.tsdb.head-compaction-concurrency`. #2172
2627
* [FEATURE] Added a read-only local alertmanager config store using files named corresponding to their tenant id. #2125
2728
* [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947
2829
* `--experimental.distributor.user-subring-size`

docs/operations/blocks-storage.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,15 @@ tsdb:
123123
# CLI flag: -experimental.tsdb.ship-concurrency
124124
[ship_concurrency: <int> | default = 10]
125125
126+
# How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range.
127+
# Must be greater than 0 and max 5 minutes.
128+
# CLI flag: -experimental.tsdb.head-compaction-interval
129+
[head_compaction_interval: <duration> | default = 1m]
130+
131+
# Maximum number of tenants concurrently compacting TSDB head into a new block.
132+
# CLI flag: -experimental.tsdb.head-compaction-concurrency
133+
[head_compaction_concurrency: <int> | default = 5]
134+
126135
# The bucket store configuration applies to queriers and configure how queriers
127136
# iteract with the long-term storage backend.
128137
bucket_store:

pkg/ingester/ingester_v2.go

Lines changed: 102 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ type TSDBState struct {
6565
transferOnce sync.Once
6666

6767
tsdbMetrics *tsdbMetrics
68+
69+
// Head compactions metrics.
70+
compactionsTriggered prometheus.Counter
71+
compactionsFailed prometheus.Counter
6872
}
6973

7074
// NewV2 returns a new Ingester that uses prometheus block storage instead of chunk storage
@@ -90,6 +94,16 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
9094
dbs: make(map[string]*userTSDB),
9195
bucket: bucketClient,
9296
tsdbMetrics: newTSDBMetrics(registerer),
97+
98+
compactionsTriggered: prometheus.NewCounter(prometheus.CounterOpts{
99+
Name: "cortex_ingester_tsdb_compactions_triggered_total",
100+
Help: "Total number of triggered compactions.",
101+
}),
102+
103+
compactionsFailed: prometheus.NewCounter(prometheus.CounterOpts{
104+
Name: "cortex_ingester_tsdb_compactions_failed_total",
105+
Help: "Total number of compactions that failed.",
106+
}),
93107
},
94108
}
95109

@@ -101,6 +115,8 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
101115
Name: "cortex_ingester_memory_series",
102116
Help: "The current number of series in memory.",
103117
}, i.numSeriesInTSDB))
118+
registerer.MustRegister(i.TSDBState.compactionsTriggered)
119+
registerer.MustRegister(i.TSDBState.compactionsFailed)
104120
}
105121

106122
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true)
@@ -129,6 +145,9 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
129145
i.done.Add(1)
130146
go i.updateLoop()
131147

148+
i.done.Add(1)
149+
go i.compactionLoop()
150+
132151
return i, nil
133152
}
134153

@@ -592,6 +611,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
592611
if err != nil {
593612
return nil, err
594613
}
614+
db.DisableCompactions() // we will compact on our own schedule
595615

596616
userDB := &userTSDB{
597617
DB: db,
@@ -785,44 +805,98 @@ func (i *Ingester) shipBlocks() {
785805
return
786806
}
787807

788-
// Create a pool of workers which will synchronize blocks. The pool size
789-
// is limited in order to avoid to concurrently sync a lot of tenants in
790-
// a large cluster.
791-
workersChan := make(chan string)
792-
wg := &sync.WaitGroup{}
793-
wg.Add(i.cfg.TSDBConfig.ShipConcurrency)
808+
// Number of concurrent workers is limited in order to avoid to concurrently sync a lot
809+
// of tenants in a large cluster.
810+
i.runConcurrentUserWorkers(i.cfg.TSDBConfig.ShipConcurrency, func(userID string) {
811+
// Get the user's DB. If the user doesn't exist, we skip it.
812+
userDB := i.getTSDB(userID)
813+
if userDB == nil || userDB.shipper == nil {
814+
return
815+
}
794816

795-
for j := 0; j < i.cfg.TSDBConfig.ShipConcurrency; j++ {
796-
go func() {
797-
defer wg.Done()
817+
// Skip if the shipper context has been canceled.
818+
if userDB.shipperCtx.Err() != nil {
819+
return
820+
}
798821

799-
for userID := range workersChan {
800-
// Get the user's DB. If the user doesn't exist, we skip it.
801-
userDB := i.getTSDB(userID)
802-
if userDB == nil || userDB.shipper == nil {
803-
continue
804-
}
822+
// Run the shipper's Sync() to upload unshipped blocks.
823+
if uploaded, err := userDB.shipper.Sync(userDB.shipperCtx); err != nil {
824+
level.Warn(util.Logger).Log("msg", "shipper failed to synchronize TSDB blocks with the storage", "user", userID, "uploaded", uploaded, "err", err)
825+
} else {
826+
level.Debug(util.Logger).Log("msg", "shipper successfully synchronized TSDB blocks with storage", "user", userID, "uploaded", uploaded)
827+
}
828+
})
829+
}
805830

806-
// Skip if the shipper context has been canceled.
807-
if userDB.shipperCtx.Err() != nil {
808-
continue
809-
}
831+
func (i *Ingester) compactionLoop() {
832+
defer i.done.Done()
810833

811-
// Run the shipper's Sync() to upload unshipped blocks.
812-
if uploaded, err := userDB.shipper.Sync(userDB.shipperCtx); err != nil {
813-
level.Warn(util.Logger).Log("msg", "shipper failed to synchronize TSDB blocks with the storage", "user", userID, "uploaded", uploaded, "err", err)
814-
} else {
815-
level.Debug(util.Logger).Log("msg", "shipper successfully synchronized TSDB blocks with storage", "user", userID, "uploaded", uploaded)
816-
}
834+
ticker := time.NewTicker(i.cfg.TSDBConfig.HeadCompactionInterval)
835+
defer ticker.Stop()
836+
837+
for {
838+
select {
839+
case <-ticker.C:
840+
i.compactBlocks()
841+
842+
case <-i.quit:
843+
return
844+
}
845+
}
846+
}
847+
848+
func (i *Ingester) compactBlocks() {
849+
// Don't compact TSDB blocks while JOINING or LEAVING, as there may be ongoing blocks transfers.
850+
if ingesterState := i.lifecycler.GetState(); ingesterState == ring.JOINING || ingesterState == ring.LEAVING {
851+
level.Info(util.Logger).Log("msg", "TSDB blocks compaction has been skipped because of the current ingester state", "state", ingesterState)
852+
return
853+
}
854+
855+
i.runConcurrentUserWorkers(i.cfg.TSDBConfig.HeadCompactionConcurrency, func(userID string) {
856+
userDB := i.getTSDB(userID)
857+
if userDB == nil {
858+
return
859+
}
860+
861+
i.TSDBState.compactionsTriggered.Inc()
862+
err := userDB.Compact()
863+
if err != nil {
864+
i.TSDBState.compactionsFailed.Inc()
865+
level.Warn(util.Logger).Log("msg", "TSDB blocks compaction for user has failed", "user", userID, "err", err)
866+
} else {
867+
level.Debug(util.Logger).Log("msg", "TSDB blocks compaction completed successfully", "user", userID)
868+
}
869+
})
870+
}
871+
872+
func (i *Ingester) runConcurrentUserWorkers(concurrency int, userFunc func(userID string)) {
873+
wg := sync.WaitGroup{}
874+
ch := make(chan string)
875+
876+
for ix := 0; ix < concurrency; ix++ {
877+
wg.Add(1)
878+
go func() {
879+
defer wg.Done()
880+
881+
for userID := range ch {
882+
userFunc(userID)
817883
}
818884
}()
819885
}
820886

887+
sendLoop:
821888
for _, userID := range i.getTSDBUsers() {
822-
workersChan <- userID
889+
select {
890+
case ch <- userID:
891+
// ok
892+
case <-i.quit:
893+
// don't start new tasks.
894+
break sendLoop
895+
}
823896
}
824-
close(workersChan)
825897

826-
// Wait until all workers completed.
898+
close(ch)
899+
900+
// wait for ongoing workers to finish.
827901
wg.Wait()
828902
}

pkg/ingester/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func newIngesterMetrics(r prometheus.Registerer, registerMetricsConflictingWithT
114114
return m
115115
}
116116

117-
// TSDB metrics. Each tenant has its own registry, that TSDB code uses.
117+
// TSDB metrics collector. Each tenant has its own registry, that TSDB code uses.
118118
type tsdbMetrics struct {
119119
// We aggregate metrics from individual TSDB registries into
120120
// a single set of counters, which are exposed as Cortex metrics.

pkg/storage/tsdb/config.go

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,23 @@ const (
3030

3131
// Validation errors
3232
var (
33-
errUnsupportedBackend = errors.New("unsupported TSDB storage backend")
34-
errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency")
33+
errUnsupportedBackend = errors.New("unsupported TSDB storage backend")
34+
errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency")
35+
errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval")
36+
errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency")
3537
)
3638

3739
// Config holds the config information for TSDB storage
3840
type Config struct {
39-
Dir string `yaml:"dir"`
40-
BlockRanges DurationList `yaml:"block_ranges_period"`
41-
Retention time.Duration `yaml:"retention_period"`
42-
ShipInterval time.Duration `yaml:"ship_interval"`
43-
ShipConcurrency int `yaml:"ship_concurrency"`
44-
Backend string `yaml:"backend"`
45-
BucketStore BucketStoreConfig `yaml:"bucket_store"`
41+
Dir string `yaml:"dir"`
42+
BlockRanges DurationList `yaml:"block_ranges_period"`
43+
Retention time.Duration `yaml:"retention_period"`
44+
ShipInterval time.Duration `yaml:"ship_interval"`
45+
ShipConcurrency int `yaml:"ship_concurrency"`
46+
Backend string `yaml:"backend"`
47+
BucketStore BucketStoreConfig `yaml:"bucket_store"`
48+
HeadCompactionInterval time.Duration `yaml:"head_compaction_interval"`
49+
HeadCompactionConcurrency int `yaml:"head_compaction_concurrency"`
4650

4751
// MaxTSDBOpeningConcurrencyOnStartup limits the number of concurrently opening TSDB's during startup
4852
MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"`
@@ -108,6 +112,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
108112
f.IntVar(&cfg.ShipConcurrency, "experimental.tsdb.ship-concurrency", 10, "Maximum number of tenants concurrently shipping blocks to the storage.")
109113
f.StringVar(&cfg.Backend, "experimental.tsdb.backend", "s3", "TSDB storage backend to use")
110114
f.IntVar(&cfg.MaxTSDBOpeningConcurrencyOnStartup, "experimental.tsdb.max-tsdb-opening-concurrency-on-startup", 10, "limit the number of concurrently opening TSDB's on startup")
115+
f.DurationVar(&cfg.HeadCompactionInterval, "experimental.tsdb.head-compaction-interval", 1*time.Minute, "How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range. Must be greater than 0 and max 5 minutes.")
116+
f.IntVar(&cfg.HeadCompactionConcurrency, "experimental.tsdb.head-compaction-concurrency", 5, "Maximum number of tenants concurrently compacting TSDB head into a new block")
111117
}
112118

113119
// Validate the config
@@ -120,6 +126,14 @@ func (cfg *Config) Validate() error {
120126
return errInvalidShipConcurrency
121127
}
122128

129+
if cfg.HeadCompactionInterval <= 0 || cfg.HeadCompactionInterval > 5*time.Minute {
130+
return errInvalidCompactionInterval
131+
}
132+
133+
if cfg.HeadCompactionConcurrency <= 0 {
134+
return errInvalidCompactionConcurrency
135+
}
136+
123137
return nil
124138
}
125139

pkg/storage/tsdb/config_test.go

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,17 @@ func TestConfig_Validate(t *testing.T) {
1717
}{
1818
"should pass on S3 backend": {
1919
config: Config{
20-
Backend: "s3",
20+
Backend: "s3",
21+
HeadCompactionInterval: 1 * time.Minute,
22+
HeadCompactionConcurrency: 5,
2123
},
2224
expectedErr: nil,
2325
},
2426
"should pass on GCS backend": {
2527
config: Config{
26-
Backend: "gcs",
28+
Backend: "gcs",
29+
HeadCompactionInterval: 1 * time.Minute,
30+
HeadCompactionConcurrency: 5,
2731
},
2832
expectedErr: nil,
2933
},
@@ -43,9 +47,41 @@ func TestConfig_Validate(t *testing.T) {
4347
},
4448
"should pass on invalid ship concurrency but shipping is disabled": {
4549
config: Config{
46-
Backend: "s3",
47-
ShipInterval: 0,
48-
ShipConcurrency: 0,
50+
Backend: "s3",
51+
ShipInterval: 0,
52+
ShipConcurrency: 0,
53+
HeadCompactionInterval: 1 * time.Minute,
54+
HeadCompactionConcurrency: 5,
55+
},
56+
expectedErr: nil,
57+
},
58+
"should fail on invalid compaction interval": {
59+
config: Config{
60+
Backend: "s3",
61+
HeadCompactionInterval: 0 * time.Minute,
62+
},
63+
expectedErr: errInvalidCompactionInterval,
64+
},
65+
"should fail on too high compaction interval": {
66+
config: Config{
67+
Backend: "s3",
68+
HeadCompactionInterval: 10 * time.Minute,
69+
},
70+
expectedErr: errInvalidCompactionInterval,
71+
},
72+
"should fail on invalid compaction concurrency": {
73+
config: Config{
74+
Backend: "s3",
75+
HeadCompactionInterval: time.Minute,
76+
HeadCompactionConcurrency: 0,
77+
},
78+
expectedErr: errInvalidCompactionConcurrency,
79+
},
80+
"should pass on on valid compaction config": {
81+
config: Config{
82+
Backend: "s3",
83+
HeadCompactionInterval: time.Minute,
84+
HeadCompactionConcurrency: 10,
4985
},
5086
expectedErr: nil,
5187
},

0 commit comments

Comments
 (0)