Skip to content

TSDB head compactor concurrency #2172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Feb 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* [CHANGE] Renamed the cache configuration setting `defaul_validity` to `default_validity`. #2140
* [CHANGE] Removed unused /validate_expr endpoint. #2152
* [CHANGE] Updated Prometheus dependency to v2.16.0. This Prometheus version uses Active Query Tracker to limit concurrent queries. In order to keep `-querier.max-concurrent` working, Active Query Tracker is enabled by default, and is configured to store its data to `active-query-tracker` directory (relative to current directory when Cortex started). This can be changed by using `-querier.active-query-tracker-dir` option. Purpose of Active Query Tracker is to log queries that were running when Cortex crashes. This logging happens on next Cortex start. #2088
* [CHANGE] Experimental TSDB: TSDB head compaction interval and concurrency is now configurable (defaults to 1 min interval and 5 concurrent head compactions). New options: `-experimental.tsdb.head-compaction-interval` and `-experimental.tsdb.head-compaction-concurrency`. #2172
* [FEATURE] Added a read-only local alertmanager config store using files named corresponding to their tenant id. #2125
* [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947
* `--experimental.distributor.user-subring-size`
Expand Down
9 changes: 9 additions & 0 deletions docs/operations/blocks-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,15 @@ tsdb:
# CLI flag: -experimental.tsdb.ship-concurrency
[ship_concurrency: <int> | default = 10]

# How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range.
# Must be greater than 0 and max 5 minutes.
# CLI flag: -experimental.tsdb.head-compaction-interval
[head_compaction_interval: <duration> | default = 1m]

# Maximum number of tenants concurrently compacting TSDB head into a new block.
# CLI flag: -experimental.tsdb.head-compaction-concurrency
[head_compaction_concurrency: <int> | default = 5]

# The bucket store configuration applies to queriers and configure how queriers
# iteract with the long-term storage backend.
bucket_store:
Expand Down
130 changes: 102 additions & 28 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ type TSDBState struct {
transferOnce sync.Once

tsdbMetrics *tsdbMetrics

// Head compactions metrics.
compactionsTriggered prometheus.Counter
compactionsFailed prometheus.Counter
}

// NewV2 returns a new Ingester that uses prometheus block storage instead of chunk storage
Expand All @@ -90,6 +94,16 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
dbs: make(map[string]*userTSDB),
bucket: bucketClient,
tsdbMetrics: newTSDBMetrics(registerer),

compactionsTriggered: prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_tsdb_compactions_triggered_total",
Help: "Total number of triggered compactions.",
}),

compactionsFailed: prometheus.NewCounter(prometheus.CounterOpts{
Name: "cortex_ingester_tsdb_compactions_failed_total",
Help: "Total number of compactions that failed.",
}),
},
}

Expand All @@ -101,6 +115,8 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
Name: "cortex_ingester_memory_series",
Help: "The current number of series in memory.",
}, i.numSeriesInTSDB))
registerer.MustRegister(i.TSDBState.compactionsTriggered)
registerer.MustRegister(i.TSDBState.compactionsFailed)
}

i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true)
Expand Down Expand Up @@ -129,6 +145,9 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides,
i.done.Add(1)
go i.updateLoop()

i.done.Add(1)
go i.compactionLoop()

return i, nil
}

Expand Down Expand Up @@ -592,6 +611,7 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
if err != nil {
return nil, err
}
db.DisableCompactions() // we will compact on our own schedule

userDB := &userTSDB{
DB: db,
Expand Down Expand Up @@ -785,44 +805,98 @@ func (i *Ingester) shipBlocks() {
return
}

// Create a pool of workers which will synchronize blocks. The pool size
// is limited in order to avoid to concurrently sync a lot of tenants in
// a large cluster.
workersChan := make(chan string)
wg := &sync.WaitGroup{}
wg.Add(i.cfg.TSDBConfig.ShipConcurrency)
// Number of concurrent workers is limited in order to avoid to concurrently sync a lot
// of tenants in a large cluster.
i.runConcurrentUserWorkers(i.cfg.TSDBConfig.ShipConcurrency, func(userID string) {
// Get the user's DB. If the user doesn't exist, we skip it.
userDB := i.getTSDB(userID)
if userDB == nil || userDB.shipper == nil {
return
}

for j := 0; j < i.cfg.TSDBConfig.ShipConcurrency; j++ {
go func() {
defer wg.Done()
// Skip if the shipper context has been canceled.
if userDB.shipperCtx.Err() != nil {
return
}

for userID := range workersChan {
// Get the user's DB. If the user doesn't exist, we skip it.
userDB := i.getTSDB(userID)
if userDB == nil || userDB.shipper == nil {
continue
}
// Run the shipper's Sync() to upload unshipped blocks.
if uploaded, err := userDB.shipper.Sync(userDB.shipperCtx); err != nil {
level.Warn(util.Logger).Log("msg", "shipper failed to synchronize TSDB blocks with the storage", "user", userID, "uploaded", uploaded, "err", err)
} else {
level.Debug(util.Logger).Log("msg", "shipper successfully synchronized TSDB blocks with storage", "user", userID, "uploaded", uploaded)
}
})
}

// Skip if the shipper context has been canceled.
if userDB.shipperCtx.Err() != nil {
continue
}
func (i *Ingester) compactionLoop() {
defer i.done.Done()

// Run the shipper's Sync() to upload unshipped blocks.
if uploaded, err := userDB.shipper.Sync(userDB.shipperCtx); err != nil {
level.Warn(util.Logger).Log("msg", "shipper failed to synchronize TSDB blocks with the storage", "user", userID, "uploaded", uploaded, "err", err)
} else {
level.Debug(util.Logger).Log("msg", "shipper successfully synchronized TSDB blocks with storage", "user", userID, "uploaded", uploaded)
}
ticker := time.NewTicker(i.cfg.TSDBConfig.HeadCompactionInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
i.compactBlocks()

case <-i.quit:
return
}
}
}

func (i *Ingester) compactBlocks() {
// Don't compact TSDB blocks while JOINING or LEAVING, as there may be ongoing blocks transfers.
if ingesterState := i.lifecycler.GetState(); ingesterState == ring.JOINING || ingesterState == ring.LEAVING {
level.Info(util.Logger).Log("msg", "TSDB blocks compaction has been skipped because of the current ingester state", "state", ingesterState)
return
}

i.runConcurrentUserWorkers(i.cfg.TSDBConfig.HeadCompactionConcurrency, func(userID string) {
userDB := i.getTSDB(userID)
if userDB == nil {
return
}

i.TSDBState.compactionsTriggered.Inc()
err := userDB.Compact()
if err != nil {
i.TSDBState.compactionsFailed.Inc()
level.Warn(util.Logger).Log("msg", "TSDB blocks compaction for user has failed", "user", userID, "err", err)
} else {
level.Debug(util.Logger).Log("msg", "TSDB blocks compaction completed successfully", "user", userID)
}
})
}

func (i *Ingester) runConcurrentUserWorkers(concurrency int, userFunc func(userID string)) {
wg := sync.WaitGroup{}
ch := make(chan string)

for ix := 0; ix < concurrency; ix++ {
wg.Add(1)
go func() {
defer wg.Done()

for userID := range ch {
userFunc(userID)
}
}()
}

sendLoop:
for _, userID := range i.getTSDBUsers() {
workersChan <- userID
select {
case ch <- userID:
// ok
case <-i.quit:
// don't start new tasks.
break sendLoop
}
}
close(workersChan)

// Wait until all workers completed.
close(ch)

// wait for ongoing workers to finish.
wg.Wait()
}
2 changes: 1 addition & 1 deletion pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func newIngesterMetrics(r prometheus.Registerer, registerMetricsConflictingWithT
return m
}

// TSDB metrics. Each tenant has its own registry, that TSDB code uses.
// TSDB metrics collector. Each tenant has its own registry, that TSDB code uses.
type tsdbMetrics struct {
// We aggregate metrics from individual TSDB registries into
// a single set of counters, which are exposed as Cortex metrics.
Expand Down
32 changes: 23 additions & 9 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,23 @@ const (

// Validation errors
var (
errUnsupportedBackend = errors.New("unsupported TSDB storage backend")
errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency")
errUnsupportedBackend = errors.New("unsupported TSDB storage backend")
errInvalidShipConcurrency = errors.New("invalid TSDB ship concurrency")
errInvalidCompactionInterval = errors.New("invalid TSDB compaction interval")
errInvalidCompactionConcurrency = errors.New("invalid TSDB compaction concurrency")
)

// Config holds the config information for TSDB storage
type Config struct {
Dir string `yaml:"dir"`
BlockRanges DurationList `yaml:"block_ranges_period"`
Retention time.Duration `yaml:"retention_period"`
ShipInterval time.Duration `yaml:"ship_interval"`
ShipConcurrency int `yaml:"ship_concurrency"`
Backend string `yaml:"backend"`
BucketStore BucketStoreConfig `yaml:"bucket_store"`
Dir string `yaml:"dir"`
BlockRanges DurationList `yaml:"block_ranges_period"`
Retention time.Duration `yaml:"retention_period"`
ShipInterval time.Duration `yaml:"ship_interval"`
ShipConcurrency int `yaml:"ship_concurrency"`
Backend string `yaml:"backend"`
BucketStore BucketStoreConfig `yaml:"bucket_store"`
HeadCompactionInterval time.Duration `yaml:"head_compaction_interval"`
HeadCompactionConcurrency int `yaml:"head_compaction_concurrency"`

// MaxTSDBOpeningConcurrencyOnStartup limits the number of concurrently opening TSDB's during startup
MaxTSDBOpeningConcurrencyOnStartup int `yaml:"max_tsdb_opening_concurrency_on_startup"`
Expand Down Expand Up @@ -108,6 +112,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.ShipConcurrency, "experimental.tsdb.ship-concurrency", 10, "Maximum number of tenants concurrently shipping blocks to the storage.")
f.StringVar(&cfg.Backend, "experimental.tsdb.backend", "s3", "TSDB storage backend to use")
f.IntVar(&cfg.MaxTSDBOpeningConcurrencyOnStartup, "experimental.tsdb.max-tsdb-opening-concurrency-on-startup", 10, "limit the number of concurrently opening TSDB's on startup")
f.DurationVar(&cfg.HeadCompactionInterval, "experimental.tsdb.head-compaction-interval", 1*time.Minute, "How frequently does Cortex try to compact TSDB head. Block is only created if data covers smallest block range. Must be greater than 0 and max 5 minutes.")
f.IntVar(&cfg.HeadCompactionConcurrency, "experimental.tsdb.head-compaction-concurrency", 5, "Maximum number of tenants concurrently compacting TSDB head into a new block")
}

// Validate the config
Expand All @@ -120,6 +126,14 @@ func (cfg *Config) Validate() error {
return errInvalidShipConcurrency
}

if cfg.HeadCompactionInterval <= 0 || cfg.HeadCompactionInterval > 5*time.Minute {
return errInvalidCompactionInterval
}

if cfg.HeadCompactionConcurrency <= 0 {
return errInvalidCompactionConcurrency
}

return nil
}

Expand Down
46 changes: 41 additions & 5 deletions pkg/storage/tsdb/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ func TestConfig_Validate(t *testing.T) {
}{
"should pass on S3 backend": {
config: Config{
Backend: "s3",
Backend: "s3",
HeadCompactionInterval: 1 * time.Minute,
HeadCompactionConcurrency: 5,
},
expectedErr: nil,
},
"should pass on GCS backend": {
config: Config{
Backend: "gcs",
Backend: "gcs",
HeadCompactionInterval: 1 * time.Minute,
HeadCompactionConcurrency: 5,
},
expectedErr: nil,
},
Expand All @@ -43,9 +47,41 @@ func TestConfig_Validate(t *testing.T) {
},
"should pass on invalid ship concurrency but shipping is disabled": {
config: Config{
Backend: "s3",
ShipInterval: 0,
ShipConcurrency: 0,
Backend: "s3",
ShipInterval: 0,
ShipConcurrency: 0,
HeadCompactionInterval: 1 * time.Minute,
HeadCompactionConcurrency: 5,
},
expectedErr: nil,
},
"should fail on invalid compaction interval": {
config: Config{
Backend: "s3",
HeadCompactionInterval: 0 * time.Minute,
},
expectedErr: errInvalidCompactionInterval,
},
"should fail on too high compaction interval": {
config: Config{
Backend: "s3",
HeadCompactionInterval: 10 * time.Minute,
},
expectedErr: errInvalidCompactionInterval,
},
"should fail on invalid compaction concurrency": {
config: Config{
Backend: "s3",
HeadCompactionInterval: time.Minute,
HeadCompactionConcurrency: 0,
},
expectedErr: errInvalidCompactionConcurrency,
},
"should pass on on valid compaction config": {
config: Config{
Backend: "s3",
HeadCompactionInterval: time.Minute,
HeadCompactionConcurrency: 10,
},
expectedErr: nil,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/grpcclient/ratelimit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/cortexproject/cortex/pkg/util/grpcclient"
)

Expand Down