Skip to content

TSDB: add bucket store metrics in querier #1996

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Jan 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
57a3b0c
Moved MetricFamiliesPerUser type to util package.
pstibrany Jan 16, 2020
c27da49
Added prometheus registry to block stores.
pstibrany Jan 16, 2020
4785568
Expose first set of metrics from TSDB bucket store.
pstibrany Jan 17, 2020
b93dd89
Support for summaries with labels.
pstibrany Jan 17, 2020
2834f8d
Send data directly to channel to avoid allocating extra slices/maps j…
pstibrany Jan 17, 2020
520911f
Added more summaries.
pstibrany Jan 17, 2020
7e3a2d5
Added remaining metrics from TSDB Bucket Store.
pstibrany Jan 17, 2020
db6808d
Extracted TSDB bucket store metrics into separate type
pstibrany Jan 17, 2020
88a118d
Added test for bucket_store_metrics_test.
pstibrany Jan 17, 2020
23efddb
Added test for bucket_store_metrics_test.
pstibrany Jan 17, 2020
821b95b
Gather and report metrics from Thanos' storecache.InMemoryIndexCache.
pstibrany Jan 20, 2020
d146805
Extracted common code that builds MetricFamiliesPerUser
pstibrany Jan 20, 2020
a9274b2
Added benchmarks
pstibrany Jan 20, 2020
2ec4fa1
Updated CHANGELOG.md
pstibrany Jan 20, 2020
9b44603
Added tests to sum and getMetricsWithLabelNames functions.
pstibrany Jan 21, 2020
eee9e51
Fixes.
pstibrany Jan 21, 2020
ece88df
Replaced cortex_bucket_store prefix with cortex_querier_bucket_store.
pstibrany Jan 21, 2020
2ff0240
Removed uninteresting cortex_querier_bucket_store_sent_chunk_size_byt…
pstibrany Jan 21, 2020
92050ab
Replaced cortex_store_index_cache prefix with cortex_querier_blocks_i…
pstibrany Jan 21, 2020
abaf8cb
Group metrics registration, and register only into non-nil registry.
pstibrany Jan 21, 2020
8857202
Make message generic.
pstibrany Jan 21, 2020
6e803b8
Ignore result and error and make lint happy.
pstibrany Jan 21, 2020
d5f9693
Added test for getMetricsWithLabelNames with no labels.
pstibrany Jan 21, 2020
020ae23
Comment about missing m1 in test output.
pstibrany Jan 21, 2020
02d54c4
Fixed duplicate entry in CHANGELOG.md
pstibrany Jan 21, 2020
7741f31
Use single call.
pstibrany Jan 21, 2020
a9a95e0
Typo
pstibrany Jan 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ instructions below to upgrade your Postgres.
* [ENHANCEMENT] Experimental TSDB: Open existing TSDB on startup to prevent ingester from becoming ready before it can accept writes. #1917
* `--experimental.tsdb.max-tsdb-opening-concurrency-on-startup`
* [ENHANCEMENT] Experimental TSDB: Added `cortex_ingester_shipper_dir_syncs_total`, `cortex_ingester_shipper_dir_sync_failures_total`, `cortex_ingester_shipper_uploads_total` and `cortex_ingester_shipper_upload_failures_total` metrics from TSDB shipper component. #1983
* [ENHANCEMENT] Experimental TSDB: Querier now exports aggregate metrics from Thanos bucket store and in memory index cache (many metrics to list, but all have `cortex_querier_bucket_store_` or `cortex_querier_blocks_index_cache_` prefix). #1996
* [BUGFIX] Fixed unnecessary CAS operations done by the HA tracker when the jitter is enabled. #1861
* [BUGFIX] Fixed #1904 ingesters getting stuck in a LEAVING state after coming up from an ungraceful exit. #1921
* [BUGFIX] Reduce memory usage when ingester Push() errors. #1922
Expand Down
99 changes: 7 additions & 92 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import (
"sync"

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)

const (
Expand Down Expand Up @@ -122,10 +120,6 @@ type tsdbMetrics struct {
memSeriesCreatedTotal *prometheus.Desc
memSeriesRemovedTotal *prometheus.Desc

// These maps drive the collection output. Key = original metric name to group.
sumCountersGlobally map[string]*prometheus.Desc
sumCountersPerUser map[string]*prometheus.Desc

regsMu sync.RWMutex // custom mutex for shipper registry, to avoid blocking main user state mutex on collection
regs map[string]*prometheus.Registry // One prometheus registry per tenant
}
Expand Down Expand Up @@ -155,18 +149,6 @@ func newTSDBMetrics(r prometheus.Registerer) *tsdbMetrics {
memSeriesRemovedTotal: prometheus.NewDesc(memSeriesRemovedTotalName, memSeriesRemovedTotalHelp, []string{"user"}, nil),
}

m.sumCountersGlobally = map[string]*prometheus.Desc{
"thanos_shipper_dir_syncs_total": m.dirSyncs,
"thanos_shipper_dir_sync_failures_total": m.dirSyncFailures,
"thanos_shipper_uploads_total": m.uploads,
"thanos_shipper_upload_failures_total": m.uploadFailures,
}

m.sumCountersPerUser = map[string]*prometheus.Desc{
"prometheus_tsdb_head_series_created_total": m.memSeriesCreatedTotal,
"prometheus_tsdb_head_series_removed_total": m.memSeriesRemovedTotal,
}

if r != nil {
r.MustRegister(m)
}
Expand All @@ -183,30 +165,16 @@ func (sm *tsdbMetrics) Describe(out chan<- *prometheus.Desc) {
}

func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) {
regs := sm.registries()
data := gatheredMetricsPerUser{}

for userID, r := range regs {
m, err := r.Gather()
if err != nil {
level.Warn(util.Logger).Log("msg", "failed to gather metrics from TSDB shipper", "user", userID, "err", err)
continue
}

data.addGatheredDataForUser(userID, m)
}
data := util.BuildMetricFamiliesPerUserFromUserRegistries(sm.registries())

// OK, we have it all. Let's build results.
for metric, desc := range sm.sumCountersGlobally {
out <- prometheus.MustNewConstMetric(desc, prometheus.CounterValue, data.sumCountersAcrossAllUsers(metric))
}
data.SendSumOfCounters(out, sm.dirSyncs, "thanos_shipper_dir_syncs_total")
data.SendSumOfCounters(out, sm.dirSyncFailures, "thanos_shipper_dir_sync_failures_total")
data.SendSumOfCounters(out, sm.uploads, "thanos_shipper_uploads_total")
data.SendSumOfCounters(out, sm.uploadFailures, "thanos_shipper_upload_failures_total")

for metric, desc := range sm.sumCountersPerUser {
userValues := data.sumCountersPerUser(metric)
for user, val := range userValues {
out <- prometheus.MustNewConstMetric(desc, prometheus.CounterValue, val, user)
}
}
data.SendSumOfCountersPerUser(out, sm.memSeriesCreatedTotal, "prometheus_tsdb_head_series_created_total")
data.SendSumOfCountersPerUser(out, sm.memSeriesRemovedTotal, "prometheus_tsdb_head_series_removed_total")
}

// make a copy of the map, so that metrics can be gathered while the new registry is being added.
Expand All @@ -226,56 +194,3 @@ func (sm *tsdbMetrics) setRegistryForUser(userID string, registry *prometheus.Re
sm.regs[userID] = registry
sm.regsMu.Unlock()
}

func sumCounters(mfs []*dto.MetricFamily) float64 {
result := float64(0)
for _, mf := range mfs {
if mf.Type == nil || *mf.Type != dto.MetricType_COUNTER {
continue
}

for _, m := range mf.Metric {
if m == nil || m.Counter == nil || m.Counter.Value == nil {
continue
}

result += *m.Counter.Value
}
}
return result
}

// first key = userID, second key = metric name. Value = slice of gathered values with the same metric name.
type gatheredMetricsPerUser map[string]map[string][]*dto.MetricFamily

func (d gatheredMetricsPerUser) addGatheredDataForUser(userID string, metrics []*dto.MetricFamily) {
// first, create new map which maps metric names to a slice of MetricFamily instances.
// That makes it easier to do searches later.
perMetricName := map[string][]*dto.MetricFamily{}

for _, m := range metrics {
if m.Name == nil {
continue
}
perMetricName[*m.Name] = append(perMetricName[*m.Name], m)
}

d[userID] = perMetricName
}

func (d gatheredMetricsPerUser) sumCountersAcrossAllUsers(counter string) float64 {
result := float64(0)
for _, perMetric := range d {
result += sumCounters(perMetric[counter])
}
return result
}

func (d gatheredMetricsPerUser) sumCountersPerUser(counter string) map[string]float64 {
result := map[string]float64{}
for user, perMetric := range d {
v := sumCounters(perMetric[counter])
result[user] = v
}
return result
}
13 changes: 2 additions & 11 deletions pkg/ingester/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,14 @@ import (
)

func TestTSDBMetrics(t *testing.T) {
mainReg := prometheus.NewRegistry()
mainReg := prometheus.NewPedanticRegistry()

tsdbMetrics := newTSDBMetrics(mainReg)

tsdbMetrics.setRegistryForUser("user1", populateTSDBMetrics(12345))
tsdbMetrics.setRegistryForUser("user2", populateTSDBMetrics(85787))
tsdbMetrics.setRegistryForUser("user3", populateTSDBMetrics(999))

metricNames := []string{
"cortex_ingester_shipper_dir_syncs_total",
"cortex_ingester_shipper_dir_sync_failures_total",
"cortex_ingester_shipper_uploads_total",
"cortex_ingester_shipper_upload_failures_total",
"cortex_ingester_memory_series_created_total",
"cortex_ingester_memory_series_removed_total",
}

err := testutil.GatherAndCompare(mainReg, bytes.NewBufferString(`
# HELP cortex_ingester_shipper_dir_syncs_total TSDB: Total dir sync attempts
# TYPE cortex_ingester_shipper_dir_syncs_total counter
Expand Down Expand Up @@ -61,7 +52,7 @@ func TestTSDBMetrics(t *testing.T) {
cortex_ingester_memory_series_removed_total{user="user1"} 74070
cortex_ingester_memory_series_removed_total{user="user2"} 514722
cortex_ingester_memory_series_removed_total{user="user3"} 5994
`), metricNames...)
`))
require.NoError(t, err)
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/querier/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ func NewBlockQuerier(cfg tsdb.Config, logLevel logging.Level, r prometheus.Regis
}),
}

r.MustRegister(b.syncTimes)

us, err := NewUserStore(cfg, logLevel, util.Logger)
if err != nil {
return nil, err
}
b.us = us

if r != nil {
r.MustRegister(b.syncTimes, us.tsdbMetrics)
}

level.Info(util.Logger).Log("msg", "synchronizing TSDB blocks for all users")
if err := us.InitialSync(context.Background()); err != nil {
level.Warn(util.Logger).Log("msg", "failed to synchronize TSDB blocks", "err", err)
Expand Down
32 changes: 19 additions & 13 deletions pkg/querier/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/store"
Expand All @@ -24,12 +25,13 @@ import (

// UserStore is a multi-tenant version of Thanos BucketStore
type UserStore struct {
logger log.Logger
cfg tsdb.Config
bucket objstore.BucketReader
stores map[string]*store.BucketStore
client storepb.StoreClient
logLevel logging.Level
logger log.Logger
cfg tsdb.Config
bucket objstore.BucketReader
stores map[string]*store.BucketStore
client storepb.StoreClient
logLevel logging.Level
tsdbMetrics *tsdbBucketStoreMetrics
}

// NewUserStore returns a new UserStore
Expand All @@ -40,11 +42,12 @@ func NewUserStore(cfg tsdb.Config, logLevel logging.Level, logger log.Logger) (*
}

u := &UserStore{
logger: logger,
cfg: cfg,
bucket: bkt,
stores: make(map[string]*store.BucketStore),
logLevel: logLevel,
logger: logger,
cfg: cfg,
bucket: bkt,
stores: map[string]*store.BucketStore{},
logLevel: logLevel,
tsdbMetrics: newTSDBBucketStoreMetrics(),
}

serv := grpc.NewServer()
Expand Down Expand Up @@ -115,9 +118,11 @@ func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context,
Bucket: bkt,
}

reg := prometheus.NewRegistry()

indexCacheSizeBytes := u.cfg.BucketStore.IndexCacheSizeBytes
maxItemSizeBytes := indexCacheSizeBytes / 2
indexCache, err := storecache.NewInMemoryIndexCache(u.logger, nil, storecache.Opts{
indexCache, err := storecache.NewInMemoryIndexCache(u.logger, reg, storecache.Opts{
MaxSizeBytes: indexCacheSizeBytes,
MaxItemSizeBytes: maxItemSizeBytes,
})
Expand All @@ -126,7 +131,7 @@ func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context,
}
bs, err = store.NewBucketStore(
u.logger,
nil,
reg,
userBkt,
filepath.Join(u.cfg.BucketStore.SyncDir, user),
indexCache,
Expand All @@ -147,6 +152,7 @@ func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context,
}

u.stores[user] = bs
u.tsdbMetrics.addUserRegistry(user, reg)
}

wg.Add(1)
Expand Down
Loading