diff --git a/CHANGELOG.md b/CHANGELOG.md index 3359b590514..0823a44944e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ instructions below to upgrade your Postgres. * [ENHANCEMENT] Experimental TSDB: Open existing TSDB on startup to prevent ingester from becoming ready before it can accept writes. #1917 * `--experimental.tsdb.max-tsdb-opening-concurrency-on-startup` * [ENHANCEMENT] Experimental TSDB: Added `cortex_ingester_shipper_dir_syncs_total`, `cortex_ingester_shipper_dir_sync_failures_total`, `cortex_ingester_shipper_uploads_total` and `cortex_ingester_shipper_upload_failures_total` metrics from TSDB shipper component. #1983 +* [ENHANCEMENT] Experimental TSDB: Querier now exports aggregate metrics from Thanos bucket store and in memory index cache (many metrics to list, but all have `cortex_querier_bucket_store_` or `cortex_querier_blocks_index_cache_` prefix). #1996 * [BUGFIX] Fixed unnecessary CAS operations done by the HA tracker when the jitter is enabled. #1861 * [BUGFIX] Fixed #1904 ingesters getting stuck in a LEAVING state after coming up from an ungraceful exit. #1921 * [BUGFIX] Reduce memory usage when ingester Push() errors. #1922 diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 7a97258b97b..b57633c727a 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -4,9 +4,7 @@ import ( "sync" "github.com/cortexproject/cortex/pkg/util" - "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" - dto "github.com/prometheus/client_model/go" ) const ( @@ -122,10 +120,6 @@ type tsdbMetrics struct { memSeriesCreatedTotal *prometheus.Desc memSeriesRemovedTotal *prometheus.Desc - // These maps drive the collection output. Key = original metric name to group. - sumCountersGlobally map[string]*prometheus.Desc - sumCountersPerUser map[string]*prometheus.Desc - regsMu sync.RWMutex // custom mutex for shipper registry, to avoid blocking main user state mutex on collection regs map[string]*prometheus.Registry // One prometheus registry per tenant } @@ -155,18 +149,6 @@ func newTSDBMetrics(r prometheus.Registerer) *tsdbMetrics { memSeriesRemovedTotal: prometheus.NewDesc(memSeriesRemovedTotalName, memSeriesRemovedTotalHelp, []string{"user"}, nil), } - m.sumCountersGlobally = map[string]*prometheus.Desc{ - "thanos_shipper_dir_syncs_total": m.dirSyncs, - "thanos_shipper_dir_sync_failures_total": m.dirSyncFailures, - "thanos_shipper_uploads_total": m.uploads, - "thanos_shipper_upload_failures_total": m.uploadFailures, - } - - m.sumCountersPerUser = map[string]*prometheus.Desc{ - "prometheus_tsdb_head_series_created_total": m.memSeriesCreatedTotal, - "prometheus_tsdb_head_series_removed_total": m.memSeriesRemovedTotal, - } - if r != nil { r.MustRegister(m) } @@ -183,30 +165,16 @@ func (sm *tsdbMetrics) Describe(out chan<- *prometheus.Desc) { } func (sm *tsdbMetrics) Collect(out chan<- prometheus.Metric) { - regs := sm.registries() - data := gatheredMetricsPerUser{} - - for userID, r := range regs { - m, err := r.Gather() - if err != nil { - level.Warn(util.Logger).Log("msg", "failed to gather metrics from TSDB shipper", "user", userID, "err", err) - continue - } - - data.addGatheredDataForUser(userID, m) - } + data := util.BuildMetricFamiliesPerUserFromUserRegistries(sm.registries()) // OK, we have it all. Let's build results. - for metric, desc := range sm.sumCountersGlobally { - out <- prometheus.MustNewConstMetric(desc, prometheus.CounterValue, data.sumCountersAcrossAllUsers(metric)) - } + data.SendSumOfCounters(out, sm.dirSyncs, "thanos_shipper_dir_syncs_total") + data.SendSumOfCounters(out, sm.dirSyncFailures, "thanos_shipper_dir_sync_failures_total") + data.SendSumOfCounters(out, sm.uploads, "thanos_shipper_uploads_total") + data.SendSumOfCounters(out, sm.uploadFailures, "thanos_shipper_upload_failures_total") - for metric, desc := range sm.sumCountersPerUser { - userValues := data.sumCountersPerUser(metric) - for user, val := range userValues { - out <- prometheus.MustNewConstMetric(desc, prometheus.CounterValue, val, user) - } - } + data.SendSumOfCountersPerUser(out, sm.memSeriesCreatedTotal, "prometheus_tsdb_head_series_created_total") + data.SendSumOfCountersPerUser(out, sm.memSeriesRemovedTotal, "prometheus_tsdb_head_series_removed_total") } // make a copy of the map, so that metrics can be gathered while the new registry is being added. @@ -226,56 +194,3 @@ func (sm *tsdbMetrics) setRegistryForUser(userID string, registry *prometheus.Re sm.regs[userID] = registry sm.regsMu.Unlock() } - -func sumCounters(mfs []*dto.MetricFamily) float64 { - result := float64(0) - for _, mf := range mfs { - if mf.Type == nil || *mf.Type != dto.MetricType_COUNTER { - continue - } - - for _, m := range mf.Metric { - if m == nil || m.Counter == nil || m.Counter.Value == nil { - continue - } - - result += *m.Counter.Value - } - } - return result -} - -// first key = userID, second key = metric name. Value = slice of gathered values with the same metric name. -type gatheredMetricsPerUser map[string]map[string][]*dto.MetricFamily - -func (d gatheredMetricsPerUser) addGatheredDataForUser(userID string, metrics []*dto.MetricFamily) { - // first, create new map which maps metric names to a slice of MetricFamily instances. - // That makes it easier to do searches later. - perMetricName := map[string][]*dto.MetricFamily{} - - for _, m := range metrics { - if m.Name == nil { - continue - } - perMetricName[*m.Name] = append(perMetricName[*m.Name], m) - } - - d[userID] = perMetricName -} - -func (d gatheredMetricsPerUser) sumCountersAcrossAllUsers(counter string) float64 { - result := float64(0) - for _, perMetric := range d { - result += sumCounters(perMetric[counter]) - } - return result -} - -func (d gatheredMetricsPerUser) sumCountersPerUser(counter string) map[string]float64 { - result := map[string]float64{} - for user, perMetric := range d { - v := sumCounters(perMetric[counter]) - result[user] = v - } - return result -} diff --git a/pkg/ingester/metrics_test.go b/pkg/ingester/metrics_test.go index 1cb450dd21b..749cbe4ca48 100644 --- a/pkg/ingester/metrics_test.go +++ b/pkg/ingester/metrics_test.go @@ -10,7 +10,7 @@ import ( ) func TestTSDBMetrics(t *testing.T) { - mainReg := prometheus.NewRegistry() + mainReg := prometheus.NewPedanticRegistry() tsdbMetrics := newTSDBMetrics(mainReg) @@ -18,15 +18,6 @@ func TestTSDBMetrics(t *testing.T) { tsdbMetrics.setRegistryForUser("user2", populateTSDBMetrics(85787)) tsdbMetrics.setRegistryForUser("user3", populateTSDBMetrics(999)) - metricNames := []string{ - "cortex_ingester_shipper_dir_syncs_total", - "cortex_ingester_shipper_dir_sync_failures_total", - "cortex_ingester_shipper_uploads_total", - "cortex_ingester_shipper_upload_failures_total", - "cortex_ingester_memory_series_created_total", - "cortex_ingester_memory_series_removed_total", - } - err := testutil.GatherAndCompare(mainReg, bytes.NewBufferString(` # HELP cortex_ingester_shipper_dir_syncs_total TSDB: Total dir sync attempts # TYPE cortex_ingester_shipper_dir_syncs_total counter @@ -61,7 +52,7 @@ func TestTSDBMetrics(t *testing.T) { cortex_ingester_memory_series_removed_total{user="user1"} 74070 cortex_ingester_memory_series_removed_total{user="user2"} 514722 cortex_ingester_memory_series_removed_total{user="user3"} 5994 - `), metricNames...) + `)) require.NoError(t, err) } diff --git a/pkg/querier/block.go b/pkg/querier/block.go index b90360ac25e..7246b6f4eaf 100644 --- a/pkg/querier/block.go +++ b/pkg/querier/block.go @@ -39,14 +39,16 @@ func NewBlockQuerier(cfg tsdb.Config, logLevel logging.Level, r prometheus.Regis }), } - r.MustRegister(b.syncTimes) - us, err := NewUserStore(cfg, logLevel, util.Logger) if err != nil { return nil, err } b.us = us + if r != nil { + r.MustRegister(b.syncTimes, us.tsdbMetrics) + } + level.Info(util.Logger).Log("msg", "synchronizing TSDB blocks for all users") if err := us.InitialSync(context.Background()); err != nil { level.Warn(util.Logger).Log("msg", "failed to synchronize TSDB blocks", "err", err) diff --git a/pkg/querier/block_store.go b/pkg/querier/block_store.go index 1c00a8c05c2..2e5942703a5 100644 --- a/pkg/querier/block_store.go +++ b/pkg/querier/block_store.go @@ -12,6 +12,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/thanos/pkg/model" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/store" @@ -24,12 +25,13 @@ import ( // UserStore is a multi-tenant version of Thanos BucketStore type UserStore struct { - logger log.Logger - cfg tsdb.Config - bucket objstore.BucketReader - stores map[string]*store.BucketStore - client storepb.StoreClient - logLevel logging.Level + logger log.Logger + cfg tsdb.Config + bucket objstore.BucketReader + stores map[string]*store.BucketStore + client storepb.StoreClient + logLevel logging.Level + tsdbMetrics *tsdbBucketStoreMetrics } // NewUserStore returns a new UserStore @@ -40,11 +42,12 @@ func NewUserStore(cfg tsdb.Config, logLevel logging.Level, logger log.Logger) (* } u := &UserStore{ - logger: logger, - cfg: cfg, - bucket: bkt, - stores: make(map[string]*store.BucketStore), - logLevel: logLevel, + logger: logger, + cfg: cfg, + bucket: bkt, + stores: map[string]*store.BucketStore{}, + logLevel: logLevel, + tsdbMetrics: newTSDBBucketStoreMetrics(), } serv := grpc.NewServer() @@ -115,9 +118,11 @@ func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context, Bucket: bkt, } + reg := prometheus.NewRegistry() + indexCacheSizeBytes := u.cfg.BucketStore.IndexCacheSizeBytes maxItemSizeBytes := indexCacheSizeBytes / 2 - indexCache, err := storecache.NewInMemoryIndexCache(u.logger, nil, storecache.Opts{ + indexCache, err := storecache.NewInMemoryIndexCache(u.logger, reg, storecache.Opts{ MaxSizeBytes: indexCacheSizeBytes, MaxItemSizeBytes: maxItemSizeBytes, }) @@ -126,7 +131,7 @@ func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context, } bs, err = store.NewBucketStore( u.logger, - nil, + reg, userBkt, filepath.Join(u.cfg.BucketStore.SyncDir, user), indexCache, @@ -147,6 +152,7 @@ func (u *UserStore) syncUserStores(ctx context.Context, f func(context.Context, } u.stores[user] = bs + u.tsdbMetrics.addUserRegistry(user, reg) } wg.Add(1) diff --git a/pkg/querier/block_store_metrics.go b/pkg/querier/block_store_metrics.go new file mode 100644 index 00000000000..6b2c5733ab0 --- /dev/null +++ b/pkg/querier/block_store_metrics.go @@ -0,0 +1,213 @@ +package querier + +import ( + "sync" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/prometheus/client_golang/prometheus" +) + +// This struct aggregates metrics exported by Thanos Bucket Store +// and re-exports those aggregates as Cortex metrics. +type tsdbBucketStoreMetrics struct { + // Maps userID -> registry + regsMu sync.Mutex + regs map[string]*prometheus.Registry + + // exported metrics, gathered from Thanos BucketStore + blockLoads *prometheus.Desc + blockLoadFailures *prometheus.Desc + blockDrops *prometheus.Desc + blockDropFailures *prometheus.Desc + blocksLoaded *prometheus.Desc + seriesDataTouched *prometheus.Desc + seriesDataFetched *prometheus.Desc + seriesDataSizeTouched *prometheus.Desc + seriesDataSizeFetched *prometheus.Desc + seriesBlocksQueried *prometheus.Desc + seriesGetAllDuration *prometheus.Desc + seriesMergeDuration *prometheus.Desc + resultSeriesCount *prometheus.Desc + + // Metrics gathered from Thanos storecache.InMemoryIndexCache + cacheItemsEvicted *prometheus.Desc + cacheItemsAdded *prometheus.Desc + cacheRequests *prometheus.Desc + cacheItemsOverflow *prometheus.Desc + cacheHits *prometheus.Desc + cacheItemsCurrentCount *prometheus.Desc + cacheItemsCurrentSize *prometheus.Desc + cacheItemsTotalCurrentSize *prometheus.Desc + + // Ignored: + // thanos_store_index_cache_max_size_bytes + // thanos_store_index_cache_max_item_size_bytes +} + +func newTSDBBucketStoreMetrics() *tsdbBucketStoreMetrics { + return &tsdbBucketStoreMetrics{ + regs: map[string]*prometheus.Registry{}, + + blockLoads: prometheus.NewDesc( + "cortex_querier_bucket_store_block_loads_total", + "TSDB: Total number of remote block loading attempts.", + nil, nil), + blockLoadFailures: prometheus.NewDesc( + "cortex_querier_bucket_store_block_load_failures_total", + "TSDB: Total number of failed remote block loading attempts.", + nil, nil), + blockDrops: prometheus.NewDesc( + "cortex_querier_bucket_store_block_drops_total", + "TSDB: Total number of local blocks that were dropped.", + nil, nil), + blockDropFailures: prometheus.NewDesc( + "cortex_querier_bucket_store_block_drop_failures_total", + "TSDB: Total number of local blocks that failed to be dropped.", + nil, nil), + blocksLoaded: prometheus.NewDesc( + "cortex_querier_bucket_store_blocks_loaded", + "TSDB: Number of currently loaded blocks.", + nil, nil), + seriesDataTouched: prometheus.NewDesc( + "cortex_querier_bucket_store_series_data_touched", + "TSDB: How many items of a data type in a block were touched for a single series request.", + []string{"data_type"}, nil), + seriesDataFetched: prometheus.NewDesc( + "cortex_querier_bucket_store_series_data_fetched", + "TSDB: How many items of a data type in a block were fetched for a single series request.", + []string{"data_type"}, nil), + seriesDataSizeTouched: prometheus.NewDesc( + "cortex_querier_bucket_store_series_data_size_touched_bytes", + "TSDB: Size of all items of a data type in a block were touched for a single series request.", + []string{"data_type"}, nil), + seriesDataSizeFetched: prometheus.NewDesc( + "cortex_querier_bucket_store_series_data_size_fetched_bytes", + "TSDB: Size of all items of a data type in a block were fetched for a single series request.", + []string{"data_type"}, nil), + seriesBlocksQueried: prometheus.NewDesc( + "cortex_querier_bucket_store_series_blocks_queried", + "TSDB: Number of blocks in a bucket store that were touched to satisfy a query.", + nil, nil), + + seriesGetAllDuration: prometheus.NewDesc( + "cortex_querier_bucket_store_series_get_all_duration_seconds", + "TSDB: Time it takes until all per-block prepares and preloads for a query are finished.", + nil, nil), + seriesMergeDuration: prometheus.NewDesc( + "cortex_querier_bucket_store_series_merge_duration_seconds", + "TSDB: Time it takes to merge sub-results from all queried blocks into a single result.", + nil, nil), + resultSeriesCount: prometheus.NewDesc( + "cortex_querier_bucket_store_series_result_series", + "Number of series observed in the final result of a query.", + nil, nil), + + // Cache + cacheItemsEvicted: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_items_evicted_total", + "TSDB: Total number of items that were evicted from the index cache.", + []string{"item_type"}, nil), + cacheItemsAdded: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_items_added_total", + "TSDB: Total number of items that were added to the index cache.", + []string{"item_type"}, nil), + cacheRequests: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_requests_total", + "TSDB: Total number of requests to the cache.", + []string{"item_type"}, nil), + cacheItemsOverflow: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_items_overflowed_total", + "TSDB: Total number of items that could not be added to the cache due to being too big.", + []string{"item_type"}, nil), + cacheHits: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_hits_total", + "TSDB: Total number of requests to the cache that were a hit.", + []string{"item_type"}, nil), + cacheItemsCurrentCount: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_items", + "TSDB: Current number of items in the index cache.", + []string{"item_type"}, nil), + cacheItemsCurrentSize: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_items_size_bytes", + "TSDB: Current byte size of items in the index cache.", + []string{"item_type"}, nil), + cacheItemsTotalCurrentSize: prometheus.NewDesc( + "cortex_querier_blocks_index_cache_total_size_bytes", + "TSDB: Current byte size of items (both value and key) in the index cache.", + []string{"item_type"}, nil), + } +} + +func (m *tsdbBucketStoreMetrics) addUserRegistry(user string, reg *prometheus.Registry) { + m.regsMu.Lock() + m.regs[user] = reg + m.regsMu.Unlock() +} + +func (m *tsdbBucketStoreMetrics) registries() map[string]*prometheus.Registry { + regs := map[string]*prometheus.Registry{} + + m.regsMu.Lock() + defer m.regsMu.Unlock() + for uid, r := range m.regs { + regs[uid] = r + } + + return regs +} + +func (m *tsdbBucketStoreMetrics) Describe(out chan<- *prometheus.Desc) { + out <- m.blockLoads + out <- m.blockLoadFailures + out <- m.blockDrops + out <- m.blockDropFailures + out <- m.blocksLoaded + out <- m.seriesDataTouched + out <- m.seriesDataFetched + out <- m.seriesDataSizeTouched + out <- m.seriesDataSizeFetched + out <- m.seriesBlocksQueried + out <- m.seriesGetAllDuration + out <- m.seriesMergeDuration + out <- m.resultSeriesCount + + out <- m.cacheItemsEvicted + out <- m.cacheItemsAdded + out <- m.cacheRequests + out <- m.cacheItemsOverflow + out <- m.cacheHits + out <- m.cacheItemsCurrentCount + out <- m.cacheItemsCurrentSize + out <- m.cacheItemsTotalCurrentSize +} + +func (m *tsdbBucketStoreMetrics) Collect(out chan<- prometheus.Metric) { + data := util.BuildMetricFamiliesPerUserFromUserRegistries(m.registries()) + + data.SendSumOfCounters(out, m.blockLoads, "thanos_bucket_store_block_loads_total") + data.SendSumOfCounters(out, m.blockLoadFailures, "thanos_bucket_store_block_load_failures_total") + data.SendSumOfCounters(out, m.blockDrops, "thanos_bucket_store_block_drops_total") + data.SendSumOfCounters(out, m.blockDropFailures, "thanos_bucket_store_block_drop_failures_total") + + data.SendSumOfGauges(out, m.blocksLoaded, "thanos_bucket_store_blocks_loaded") + + data.SendSumOfSummariesWithLabels(out, m.seriesDataTouched, "thanos_bucket_store_series_data_touched", "data_type") + data.SendSumOfSummariesWithLabels(out, m.seriesDataFetched, "thanos_bucket_store_series_data_fetched", "data_type") + data.SendSumOfSummariesWithLabels(out, m.seriesDataSizeTouched, "thanos_bucket_store_series_data_size_touched_bytes", "data_type") + data.SendSumOfSummariesWithLabels(out, m.seriesDataSizeFetched, "thanos_bucket_store_series_data_size_fetched_bytes", "data_type") + data.SendSumOfSummariesWithLabels(out, m.seriesBlocksQueried, "thanos_bucket_store_series_blocks_queried") + + data.SendSumOfHistograms(out, m.seriesGetAllDuration, "thanos_bucket_store_series_get_all_duration_seconds") + data.SendSumOfHistograms(out, m.seriesMergeDuration, "thanos_bucket_store_series_merge_duration_seconds") + data.SendSumOfSummaries(out, m.resultSeriesCount, "thanos_bucket_store_series_result_series") + + data.SendSumOfCountersWithLabels(out, m.cacheItemsEvicted, "thanos_store_index_cache_items_evicted_total", "item_type") + data.SendSumOfCountersWithLabels(out, m.cacheItemsAdded, "thanos_store_index_cache_items_added_total", "item_type") + data.SendSumOfCountersWithLabels(out, m.cacheRequests, "thanos_store_index_cache_requests_total", "item_type") + data.SendSumOfCountersWithLabels(out, m.cacheItemsOverflow, "thanos_store_index_cache_items_overflowed_total", "item_type") + data.SendSumOfCountersWithLabels(out, m.cacheHits, "thanos_store_index_cache_hits_total", "item_type") + + data.SendSumOfGaugesWithLabels(out, m.cacheItemsCurrentCount, "thanos_store_index_cache_items", "item_type") + data.SendSumOfGaugesWithLabels(out, m.cacheItemsCurrentSize, "thanos_store_index_cache_items_size_bytes", "item_type") + data.SendSumOfGaugesWithLabels(out, m.cacheItemsTotalCurrentSize, "thanos_store_index_cache_total_size_bytes", "item_type") +} diff --git a/pkg/querier/bucket_store_metrics_test.go b/pkg/querier/bucket_store_metrics_test.go new file mode 100644 index 00000000000..cdcfa1ece2c --- /dev/null +++ b/pkg/querier/bucket_store_metrics_test.go @@ -0,0 +1,476 @@ +package querier + +import ( + "bytes" + "fmt" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +func TestTsdbBucketStoreMetrics(t *testing.T) { + mainReg := prometheus.NewPedanticRegistry() + + tsdbMetrics := newTSDBBucketStoreMetrics() + mainReg.MustRegister(tsdbMetrics) + + tsdbMetrics.addUserRegistry("user1", populateTSDBBucketStore(5328)) + tsdbMetrics.addUserRegistry("user2", populateTSDBBucketStore(6908)) + tsdbMetrics.addUserRegistry("user3", populateTSDBBucketStore(10283)) + + //noinspection ALL + err := testutil.GatherAndCompare(mainReg, bytes.NewBufferString(` + # HELP cortex_querier_bucket_store_blocks_loaded TSDB: Number of currently loaded blocks. + # TYPE cortex_querier_bucket_store_blocks_loaded gauge + cortex_querier_bucket_store_blocks_loaded 22519 + + # HELP cortex_querier_bucket_store_block_loads_total TSDB: Total number of remote block loading attempts. + # TYPE cortex_querier_bucket_store_block_loads_total counter + cortex_querier_bucket_store_block_loads_total 45038 + + # HELP cortex_querier_bucket_store_block_load_failures_total TSDB: Total number of failed remote block loading attempts. + # TYPE cortex_querier_bucket_store_block_load_failures_total counter + cortex_querier_bucket_store_block_load_failures_total 67557 + + # HELP cortex_querier_bucket_store_block_drops_total TSDB: Total number of local blocks that were dropped. + # TYPE cortex_querier_bucket_store_block_drops_total counter + cortex_querier_bucket_store_block_drops_total 90076 + + # HELP cortex_querier_bucket_store_block_drop_failures_total TSDB: Total number of local blocks that failed to be dropped. + # TYPE cortex_querier_bucket_store_block_drop_failures_total counter + cortex_querier_bucket_store_block_drop_failures_total 112595 + + # HELP cortex_querier_bucket_store_series_blocks_queried TSDB: Number of blocks in a bucket store that were touched to satisfy a query. + # TYPE cortex_querier_bucket_store_series_blocks_queried summary + cortex_querier_bucket_store_series_blocks_queried_sum 1.283583e+06 + cortex_querier_bucket_store_series_blocks_queried_count 9 + + # HELP cortex_querier_bucket_store_series_data_fetched TSDB: How many items of a data type in a block were fetched for a single series request. + # TYPE cortex_querier_bucket_store_series_data_fetched summary + cortex_querier_bucket_store_series_data_fetched_sum{data_type="fetched-a"} 202671 + cortex_querier_bucket_store_series_data_fetched_count{data_type="fetched-a"} 3 + cortex_querier_bucket_store_series_data_fetched_sum{data_type="fetched-b"} 225190 + cortex_querier_bucket_store_series_data_fetched_count{data_type="fetched-b"} 3 + cortex_querier_bucket_store_series_data_fetched_sum{data_type="fetched-c"} 247709 + cortex_querier_bucket_store_series_data_fetched_count{data_type="fetched-c"} 3 + + # HELP cortex_querier_bucket_store_series_data_size_fetched_bytes TSDB: Size of all items of a data type in a block were fetched for a single series request. + # TYPE cortex_querier_bucket_store_series_data_size_fetched_bytes summary + cortex_querier_bucket_store_series_data_size_fetched_bytes_sum{data_type="size-fetched-a"} 337785 + cortex_querier_bucket_store_series_data_size_fetched_bytes_count{data_type="size-fetched-a"} 3 + cortex_querier_bucket_store_series_data_size_fetched_bytes_sum{data_type="size-fetched-b"} 360304 + cortex_querier_bucket_store_series_data_size_fetched_bytes_count{data_type="size-fetched-b"} 3 + cortex_querier_bucket_store_series_data_size_fetched_bytes_sum{data_type="size-fetched-c"} 382823 + cortex_querier_bucket_store_series_data_size_fetched_bytes_count{data_type="size-fetched-c"} 3 + + # HELP cortex_querier_bucket_store_series_data_size_touched_bytes TSDB: Size of all items of a data type in a block were touched for a single series request. + # TYPE cortex_querier_bucket_store_series_data_size_touched_bytes summary + cortex_querier_bucket_store_series_data_size_touched_bytes_sum{data_type="size-touched-a"} 270228 + cortex_querier_bucket_store_series_data_size_touched_bytes_count{data_type="size-touched-a"} 3 + cortex_querier_bucket_store_series_data_size_touched_bytes_sum{data_type="size-touched-b"} 292747 + cortex_querier_bucket_store_series_data_size_touched_bytes_count{data_type="size-touched-b"} 3 + cortex_querier_bucket_store_series_data_size_touched_bytes_sum{data_type="size-touched-c"} 315266 + cortex_querier_bucket_store_series_data_size_touched_bytes_count{data_type="size-touched-c"} 3 + + # HELP cortex_querier_bucket_store_series_data_touched TSDB: How many items of a data type in a block were touched for a single series request. + # TYPE cortex_querier_bucket_store_series_data_touched summary + cortex_querier_bucket_store_series_data_touched_sum{data_type="touched-a"} 135114 + cortex_querier_bucket_store_series_data_touched_count{data_type="touched-a"} 3 + cortex_querier_bucket_store_series_data_touched_sum{data_type="touched-b"} 157633 + cortex_querier_bucket_store_series_data_touched_count{data_type="touched-b"} 3 + cortex_querier_bucket_store_series_data_touched_sum{data_type="touched-c"} 180152 + cortex_querier_bucket_store_series_data_touched_count{data_type="touched-c"} 3 + + # HELP cortex_querier_bucket_store_series_get_all_duration_seconds TSDB: Time it takes until all per-block prepares and preloads for a query are finished. + # TYPE cortex_querier_bucket_store_series_get_all_duration_seconds histogram + cortex_querier_bucket_store_series_get_all_duration_seconds_bucket{le="0.001"} 0 + cortex_querier_bucket_store_series_get_all_duration_seconds_bucket{le="0.01"} 0 + cortex_querier_bucket_store_series_get_all_duration_seconds_bucket{le="0.1"} 0 + cortex_querier_bucket_store_series_get_all_duration_seconds_bucket{le="0.3"} 0 + cortex_querier_bucket_store_series_get_all_duration_seconds_bucket{le="0.6"} 0 + cortex_querier_bucket_store_series_get_all_duration_seconds_bucket{le="1"} 0 + cortex_querier_bucket_store_series_get_all_duration_seconds_bucket{le="3"} 0 + cortex_querier_bucket_store_series_get_all_duration_seconds_bucket{le="6"} 0 + cortex_querier_bucket_store_series_get_all_duration_seconds_bucket{le="9"} 0 + cortex_querier_bucket_store_series_get_all_duration_seconds_bucket{le="20"} 0 + cortex_querier_bucket_store_series_get_all_duration_seconds_bucket{le="30"} 0 + cortex_querier_bucket_store_series_get_all_duration_seconds_bucket{le="60"} 0 + cortex_querier_bucket_store_series_get_all_duration_seconds_bucket{le="90"} 0 + cortex_querier_bucket_store_series_get_all_duration_seconds_bucket{le="120"} 0 + cortex_querier_bucket_store_series_get_all_duration_seconds_bucket{le="+Inf"} 9 + cortex_querier_bucket_store_series_get_all_duration_seconds_sum 1.486254e+06 + cortex_querier_bucket_store_series_get_all_duration_seconds_count 9 + + # HELP cortex_querier_bucket_store_series_merge_duration_seconds TSDB: Time it takes to merge sub-results from all queried blocks into a single result. + # TYPE cortex_querier_bucket_store_series_merge_duration_seconds histogram + cortex_querier_bucket_store_series_merge_duration_seconds_bucket{le="0.001"} 0 + cortex_querier_bucket_store_series_merge_duration_seconds_bucket{le="0.01"} 0 + cortex_querier_bucket_store_series_merge_duration_seconds_bucket{le="0.1"} 0 + cortex_querier_bucket_store_series_merge_duration_seconds_bucket{le="0.3"} 0 + cortex_querier_bucket_store_series_merge_duration_seconds_bucket{le="0.6"} 0 + cortex_querier_bucket_store_series_merge_duration_seconds_bucket{le="1"} 0 + cortex_querier_bucket_store_series_merge_duration_seconds_bucket{le="3"} 0 + cortex_querier_bucket_store_series_merge_duration_seconds_bucket{le="6"} 0 + cortex_querier_bucket_store_series_merge_duration_seconds_bucket{le="9"} 0 + cortex_querier_bucket_store_series_merge_duration_seconds_bucket{le="20"} 0 + cortex_querier_bucket_store_series_merge_duration_seconds_bucket{le="30"} 0 + cortex_querier_bucket_store_series_merge_duration_seconds_bucket{le="60"} 0 + cortex_querier_bucket_store_series_merge_duration_seconds_bucket{le="90"} 0 + cortex_querier_bucket_store_series_merge_duration_seconds_bucket{le="120"} 0 + cortex_querier_bucket_store_series_merge_duration_seconds_bucket{le="+Inf"} 9 + cortex_querier_bucket_store_series_merge_duration_seconds_sum 1.688925e+06 + cortex_querier_bucket_store_series_merge_duration_seconds_count 9 + + # HELP cortex_querier_bucket_store_series_result_series Number of series observed in the final result of a query. + # TYPE cortex_querier_bucket_store_series_result_series summary + cortex_querier_bucket_store_series_result_series_sum 1.238545e+06 + cortex_querier_bucket_store_series_result_series_count 6 + + # HELP cortex_querier_blocks_index_cache_items_evicted_total TSDB: Total number of items that were evicted from the index cache. + # TYPE cortex_querier_blocks_index_cache_items_evicted_total counter + cortex_querier_blocks_index_cache_items_evicted_total{item_type="Postings"} 1125950 + cortex_querier_blocks_index_cache_items_evicted_total{item_type="Series"} 1148469 + + # HELP cortex_querier_blocks_index_cache_requests_total TSDB: Total number of requests to the cache. + # TYPE cortex_querier_blocks_index_cache_requests_total counter + cortex_querier_blocks_index_cache_requests_total{item_type="Postings"} 1170988 + cortex_querier_blocks_index_cache_requests_total{item_type="Series"} 1193507 + + # HELP cortex_querier_blocks_index_cache_hits_total TSDB: Total number of requests to the cache that were a hit. + # TYPE cortex_querier_blocks_index_cache_hits_total counter + cortex_querier_blocks_index_cache_hits_total{item_type="Postings"} 1216026 + cortex_querier_blocks_index_cache_hits_total{item_type="Series"} 1238545 + + # HELP cortex_querier_blocks_index_cache_items_added_total TSDB: Total number of items that were added to the index cache. + # TYPE cortex_querier_blocks_index_cache_items_added_total counter + cortex_querier_blocks_index_cache_items_added_total{item_type="Postings"} 1261064 + cortex_querier_blocks_index_cache_items_added_total{item_type="Series"} 1283583 + + # HELP cortex_querier_blocks_index_cache_items TSDB: Current number of items in the index cache. + # TYPE cortex_querier_blocks_index_cache_items gauge + cortex_querier_blocks_index_cache_items{item_type="Postings"} 1306102 + cortex_querier_blocks_index_cache_items{item_type="Series"} 1328621 + + # HELP cortex_querier_blocks_index_cache_items_size_bytes TSDB: Current byte size of items in the index cache. + # TYPE cortex_querier_blocks_index_cache_items_size_bytes gauge + cortex_querier_blocks_index_cache_items_size_bytes{item_type="Postings"} 1351140 + cortex_querier_blocks_index_cache_items_size_bytes{item_type="Series"} 1373659 + + # HELP cortex_querier_blocks_index_cache_total_size_bytes TSDB: Current byte size of items (both value and key) in the index cache. + # TYPE cortex_querier_blocks_index_cache_total_size_bytes gauge + cortex_querier_blocks_index_cache_total_size_bytes{item_type="Postings"} 1396178 + cortex_querier_blocks_index_cache_total_size_bytes{item_type="Series"} 1418697 + + # HELP cortex_querier_blocks_index_cache_items_overflowed_total TSDB: Total number of items that could not be added to the cache due to being too big. + # TYPE cortex_querier_blocks_index_cache_items_overflowed_total counter + cortex_querier_blocks_index_cache_items_overflowed_total{item_type="Postings"} 1441216 + cortex_querier_blocks_index_cache_items_overflowed_total{item_type="Series"} 1463735 + +`)) + require.NoError(t, err) +} + +func BenchmarkMetricsCollections10(b *testing.B) { + benchmarkMetricsCollection(b, 10) +} + +func BenchmarkMetricsCollections100(b *testing.B) { + benchmarkMetricsCollection(b, 100) +} + +func BenchmarkMetricsCollections1000(b *testing.B) { + benchmarkMetricsCollection(b, 1000) +} + +func BenchmarkMetricsCollections10000(b *testing.B) { + benchmarkMetricsCollection(b, 10000) +} + +func benchmarkMetricsCollection(b *testing.B, users int) { + mainReg := prometheus.NewRegistry() + + tsdbMetrics := newTSDBBucketStoreMetrics() + mainReg.MustRegister(tsdbMetrics) + + base := 123456.0 + for i := 0; i < users; i++ { + tsdbMetrics.addUserRegistry(fmt.Sprintf("user-%d", i), populateTSDBBucketStore(base*float64(i))) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, _ = mainReg.Gather() + } +} + +func populateTSDBBucketStore(base float64) *prometheus.Registry { + reg := prometheus.NewRegistry() + m := newBucketStoreMetrics(reg) + + m.blocksLoaded.Add(1 * base) + m.blockLoads.Add(2 * base) + m.blockLoadFailures.Add(3 * base) + m.blockDrops.Add(4 * base) + m.blockDropFailures.Add(5 * base) + m.seriesDataTouched.WithLabelValues("touched-a").Observe(6 * base) + m.seriesDataTouched.WithLabelValues("touched-b").Observe(7 * base) + m.seriesDataTouched.WithLabelValues("touched-c").Observe(8 * base) + + m.seriesDataFetched.WithLabelValues("fetched-a").Observe(9 * base) + m.seriesDataFetched.WithLabelValues("fetched-b").Observe(10 * base) + m.seriesDataFetched.WithLabelValues("fetched-c").Observe(11 * base) + + m.seriesDataSizeTouched.WithLabelValues("size-touched-a").Observe(12 * base) + m.seriesDataSizeTouched.WithLabelValues("size-touched-b").Observe(13 * base) + m.seriesDataSizeTouched.WithLabelValues("size-touched-c").Observe(14 * base) + + m.seriesDataSizeFetched.WithLabelValues("size-fetched-a").Observe(15 * base) + m.seriesDataSizeFetched.WithLabelValues("size-fetched-b").Observe(16 * base) + m.seriesDataSizeFetched.WithLabelValues("size-fetched-c").Observe(17 * base) + + m.seriesBlocksQueried.Observe(18 * base) + m.seriesBlocksQueried.Observe(19 * base) + m.seriesBlocksQueried.Observe(20 * base) + + m.seriesGetAllDuration.Observe(21 * base) + m.seriesGetAllDuration.Observe(22 * base) + m.seriesGetAllDuration.Observe(23 * base) + + m.seriesMergeDuration.Observe(24 * base) + m.seriesMergeDuration.Observe(25 * base) + m.seriesMergeDuration.Observe(26 * base) + + m.resultSeriesCount.Observe(27 * base) + m.resultSeriesCount.Observe(28 * base) + + m.chunkSizeBytes.Observe(29 * base) + m.chunkSizeBytes.Observe(30 * base) + + m.queriesDropped.Add(31 * base) + m.queriesLimit.Add(32 * base) + + c := newIndexStoreCacheMetrics(reg) + + c.evicted.WithLabelValues(cacheTypePostings).Add(base * 50) + c.evicted.WithLabelValues(cacheTypeSeries).Add(base * 51) + c.requests.WithLabelValues(cacheTypePostings).Add(base * 52) + c.requests.WithLabelValues(cacheTypeSeries).Add(base * 53) + c.hits.WithLabelValues(cacheTypePostings).Add(base * 54) + c.hits.WithLabelValues(cacheTypeSeries).Add(base * 55) + c.added.WithLabelValues(cacheTypePostings).Add(base * 56) + c.added.WithLabelValues(cacheTypeSeries).Add(base * 57) + c.current.WithLabelValues(cacheTypePostings).Set(base * 58) + c.current.WithLabelValues(cacheTypeSeries).Set(base * 59) + c.currentSize.WithLabelValues(cacheTypePostings).Set(base * 60) + c.currentSize.WithLabelValues(cacheTypeSeries).Set(base * 61) + c.totalCurrentSize.WithLabelValues(cacheTypePostings).Set(base * 62) + c.totalCurrentSize.WithLabelValues(cacheTypeSeries).Set(base * 63) + c.overflow.WithLabelValues(cacheTypePostings).Add(base * 64) + c.overflow.WithLabelValues(cacheTypeSeries).Add(base * 65) + return reg +} + +// copied from Thanos, pkg/store/bucket.go +type bucketStoreMetrics struct { + blocksLoaded prometheus.Gauge + blockLoads prometheus.Counter + blockLoadFailures prometheus.Counter + blockDrops prometheus.Counter + blockDropFailures prometheus.Counter + seriesDataTouched *prometheus.SummaryVec + seriesDataFetched *prometheus.SummaryVec + seriesDataSizeTouched *prometheus.SummaryVec + seriesDataSizeFetched *prometheus.SummaryVec + seriesBlocksQueried prometheus.Summary + seriesGetAllDuration prometheus.Histogram + seriesMergeDuration prometheus.Histogram + resultSeriesCount prometheus.Summary + chunkSizeBytes prometheus.Histogram + queriesDropped prometheus.Counter + queriesLimit prometheus.Gauge +} + +// Copied from Thanos, pkg/store/cache/inmemory.go, InMemoryIndexCache struct +type indexStoreCacheMetrics struct { + evicted *prometheus.CounterVec + requests *prometheus.CounterVec + hits *prometheus.CounterVec + added *prometheus.CounterVec + current *prometheus.GaugeVec + currentSize *prometheus.GaugeVec + totalCurrentSize *prometheus.GaugeVec + overflow *prometheus.CounterVec +} + +const ( + cacheTypePostings string = "Postings" + cacheTypeSeries string = "Series" +) + +func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { + var m bucketStoreMetrics + + m.blockLoads = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_block_loads_total", + Help: "Total number of remote block loading attempts.", + }) + m.blockLoadFailures = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_block_load_failures_total", + Help: "Total number of failed remote block loading attempts.", + }) + m.blockDrops = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_block_drops_total", + Help: "Total number of local blocks that were dropped.", + }) + m.blockDropFailures = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_block_drop_failures_total", + Help: "Total number of local blocks that failed to be dropped.", + }) + m.blocksLoaded = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "thanos_bucket_store_blocks_loaded", + Help: "Number of currently loaded blocks.", + }) + + m.seriesDataTouched = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Name: "thanos_bucket_store_series_data_touched", + Help: "How many items of a data type in a block were touched for a single series request.", + }, []string{"data_type"}) + m.seriesDataFetched = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Name: "thanos_bucket_store_series_data_fetched", + Help: "How many items of a data type in a block were fetched for a single series request.", + }, []string{"data_type"}) + + m.seriesDataSizeTouched = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Name: "thanos_bucket_store_series_data_size_touched_bytes", + Help: "Size of all items of a data type in a block were touched for a single series request.", + }, []string{"data_type"}) + m.seriesDataSizeFetched = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Name: "thanos_bucket_store_series_data_size_fetched_bytes", + Help: "Size of all items of a data type in a block were fetched for a single series request.", + }, []string{"data_type"}) + + m.seriesBlocksQueried = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "thanos_bucket_store_series_blocks_queried", + Help: "Number of blocks in a bucket store that were touched to satisfy a query.", + }) + m.seriesGetAllDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "thanos_bucket_store_series_get_all_duration_seconds", + Help: "Time it takes until all per-block prepares and preloads for a query are finished.", + Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, + }) + m.seriesMergeDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "thanos_bucket_store_series_merge_duration_seconds", + Help: "Time it takes to merge sub-results from all queried blocks into a single result.", + Buckets: []float64{0.001, 0.01, 0.1, 0.3, 0.6, 1, 3, 6, 9, 20, 30, 60, 90, 120}, + }) + m.resultSeriesCount = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "thanos_bucket_store_series_result_series", + Help: "Number of series observed in the final result of a query.", + }) + + m.chunkSizeBytes = prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "thanos_bucket_store_sent_chunk_size_bytes", + Help: "Size in bytes of the chunks for the single series, which is adequate to the gRPC message size sent to querier.", + Buckets: []float64{ + 32, 256, 512, 1024, 32 * 1024, 256 * 1024, 512 * 1024, 1024 * 1024, 32 * 1024 * 1024, 256 * 1024 * 1024, 512 * 1024 * 1024, + }, + }) + + m.queriesDropped = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "thanos_bucket_store_queries_dropped_total", + Help: "Number of queries that were dropped due to the sample limit.", + }) + m.queriesLimit = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "thanos_bucket_store_queries_concurrent_max", + Help: "Number of maximum concurrent queries.", + }) + + if reg != nil { + reg.MustRegister( + m.blockLoads, + m.blockLoadFailures, + m.blockDrops, + m.blockDropFailures, + m.blocksLoaded, + m.seriesDataTouched, + m.seriesDataFetched, + m.seriesDataSizeTouched, + m.seriesDataSizeFetched, + m.seriesBlocksQueried, + m.seriesGetAllDuration, + m.seriesMergeDuration, + m.resultSeriesCount, + m.chunkSizeBytes, + m.queriesDropped, + m.queriesLimit, + ) + } + return &m +} + +func newIndexStoreCacheMetrics(reg prometheus.Registerer) *indexStoreCacheMetrics { + c := indexStoreCacheMetrics{} + c.evicted = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_items_evicted_total", + Help: "Total number of items that were evicted from the index cache.", + }, []string{"item_type"}) + c.evicted.WithLabelValues(cacheTypePostings) + c.evicted.WithLabelValues(cacheTypeSeries) + + c.added = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_items_added_total", + Help: "Total number of items that were added to the index cache.", + }, []string{"item_type"}) + c.added.WithLabelValues(cacheTypePostings) + c.added.WithLabelValues(cacheTypeSeries) + + c.requests = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_requests_total", + Help: "Total number of requests to the cache.", + }, []string{"item_type"}) + c.requests.WithLabelValues(cacheTypePostings) + c.requests.WithLabelValues(cacheTypeSeries) + + c.overflow = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_items_overflowed_total", + Help: "Total number of items that could not be added to the cache due to being too big.", + }, []string{"item_type"}) + c.overflow.WithLabelValues(cacheTypePostings) + c.overflow.WithLabelValues(cacheTypeSeries) + + c.hits = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_hits_total", + Help: "Total number of requests to the cache that were a hit.", + }, []string{"item_type"}) + c.hits.WithLabelValues(cacheTypePostings) + c.hits.WithLabelValues(cacheTypeSeries) + + c.current = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "thanos_store_index_cache_items", + Help: "Current number of items in the index cache.", + }, []string{"item_type"}) + c.current.WithLabelValues(cacheTypePostings) + c.current.WithLabelValues(cacheTypeSeries) + + c.currentSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "thanos_store_index_cache_items_size_bytes", + Help: "Current byte size of items in the index cache.", + }, []string{"item_type"}) + c.currentSize.WithLabelValues(cacheTypePostings) + c.currentSize.WithLabelValues(cacheTypeSeries) + + c.totalCurrentSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "thanos_store_index_cache_total_size_bytes", + Help: "Current byte size of items (both value and key) in the index cache.", + }, []string{"item_type"}) + c.totalCurrentSize.WithLabelValues(cacheTypePostings) + c.totalCurrentSize.WithLabelValues(cacheTypeSeries) + + if reg != nil { + reg.MustRegister(c.requests, c.hits, c.added, c.evicted, c.current, c.currentSize, c.totalCurrentSize, c.overflow) + } + + return &c +} diff --git a/pkg/util/metrics_helper.go b/pkg/util/metrics_helper.go new file mode 100644 index 00000000000..4957c40dede --- /dev/null +++ b/pkg/util/metrics_helper.go @@ -0,0 +1,296 @@ +package util + +import ( + "bytes" + "errors" + "fmt" + + "github.com/go-kit/kit/log/level" + "github.com/prometheus/client_golang/prometheus" + dto "github.com/prometheus/client_model/go" +) + +// MetricFamiliesPerUser is a collection of metrics gathered via calling Gatherer.Gather() method on different +// gatherers, one per user. +// First key = userID, second key = metric name. +// Value = slice of gathered values with the same metric name. +type MetricFamiliesPerUser map[string]map[string]*dto.MetricFamily + +func BuildMetricFamiliesPerUserFromUserRegistries(regs map[string]*prometheus.Registry) MetricFamiliesPerUser { + data := MetricFamiliesPerUser{} + for userID, r := range regs { + m, err := r.Gather() + if err == nil { + err = data.AddGatheredDataForUser(userID, m) + } + + if err != nil { + level.Warn(Logger).Log("msg", "failed to gather metrics from registry", "user", userID, "err", err) + continue + } + } + return data +} + +// AddGatheredDataForUser adds user-specific output of Gatherer.Gather method. +// Gatherer.Gather specifies that there metric families are uniquely named, and we use that fact here. +// If they are not, this method returns error. +func (d MetricFamiliesPerUser) AddGatheredDataForUser(userID string, metrics []*dto.MetricFamily) error { + // Keeping map of metric name to its family makes it easier to do searches later. + perMetricName := map[string]*dto.MetricFamily{} + + for _, m := range metrics { + name := m.GetName() + // these errors should never happen when passing Gatherer.Gather() output. + if name == "" { + return errors.New("empty name for metric family") + } + if perMetricName[name] != nil { + return fmt.Errorf("non-unique name for metric family: %q", name) + } + + perMetricName[name] = m + } + + d[userID] = perMetricName + return nil +} + +func (d MetricFamiliesPerUser) SendSumOfCounters(out chan<- prometheus.Metric, desc *prometheus.Desc, counter string) { + result := float64(0) + for _, perMetric := range d { + result += sum(perMetric[counter], counterValue) + } + + out <- prometheus.MustNewConstMetric(desc, prometheus.CounterValue, result) +} + +func (d MetricFamiliesPerUser) SendSumOfCountersWithLabels(out chan<- prometheus.Metric, desc *prometheus.Desc, counter string, labelNames ...string) { + result := d.sumOfSingleValuesWithLabels(counter, counterValue, labelNames) + for _, cr := range result { + out <- prometheus.MustNewConstMetric(desc, prometheus.CounterValue, cr.value, cr.labelValues...) + } +} + +func (d MetricFamiliesPerUser) SendSumOfCountersPerUser(out chan<- prometheus.Metric, desc *prometheus.Desc, counter string) { + for user, perMetric := range d { + v := sum(perMetric[counter], counterValue) + + out <- prometheus.MustNewConstMetric(desc, prometheus.CounterValue, v, user) + } +} + +func (d MetricFamiliesPerUser) SendSumOfGauges(out chan<- prometheus.Metric, desc *prometheus.Desc, gauge string) { + result := float64(0) + for _, perMetric := range d { + result += sum(perMetric[gauge], gaugeValue) + } + out <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, result) +} + +func (d MetricFamiliesPerUser) SendSumOfGaugesWithLabels(out chan<- prometheus.Metric, desc *prometheus.Desc, gauge string, labelNames ...string) { + result := d.sumOfSingleValuesWithLabels(gauge, gaugeValue, labelNames) + for _, cr := range result { + out <- prometheus.MustNewConstMetric(desc, prometheus.GaugeValue, cr.value, cr.labelValues...) + } +} + +type singleResult struct { + value float64 + labelValues []string +} + +func (d MetricFamiliesPerUser) sumOfSingleValuesWithLabels(metric string, fn func(*dto.Metric) float64, labelNames []string) map[string]singleResult { + result := map[string]singleResult{} + + for _, userMetrics := range d { + metricsPerLabelValue := getMetricsWithLabelNames(userMetrics[metric], labelNames) + + for key, mlv := range metricsPerLabelValue { + for _, m := range mlv.metrics { + r := result[key] + if r.labelValues == nil { + r.labelValues = mlv.labelValues + } + + r.value += fn(m) + result[key] = r + } + } + } + + return result +} + +func (d MetricFamiliesPerUser) SendSumOfSummaries(out chan<- prometheus.Metric, desc *prometheus.Desc, summaryName string) { + var ( + sampleCount uint64 + sampleSum float64 + quantiles map[float64]float64 + ) + + for _, userMetrics := range d { + for _, m := range userMetrics[summaryName].GetMetric() { + summary := m.GetSummary() + sampleCount += summary.GetSampleCount() + sampleSum += summary.GetSampleSum() + quantiles = mergeSummaryQuantiles(quantiles, summary.GetQuantile()) + } + } + + out <- prometheus.MustNewConstSummary(desc, sampleCount, sampleSum, quantiles) +} + +func (d MetricFamiliesPerUser) SendSumOfSummariesWithLabels(out chan<- prometheus.Metric, desc *prometheus.Desc, summaryName string, labelNames ...string) { + type summaryResult struct { + sampleCount uint64 + sampleSum float64 + quantiles map[float64]float64 + labelValues []string + } + + result := map[string]summaryResult{} + + for _, userMetrics := range d { + metricsPerLabelValue := getMetricsWithLabelNames(userMetrics[summaryName], labelNames) + + for key, mwl := range metricsPerLabelValue { + for _, m := range mwl.metrics { + r := result[key] + if r.labelValues == nil { + r.labelValues = mwl.labelValues + } + + summary := m.GetSummary() + r.sampleCount += summary.GetSampleCount() + r.sampleSum += summary.GetSampleSum() + r.quantiles = mergeSummaryQuantiles(r.quantiles, summary.GetQuantile()) + + result[key] = r + } + } + } + + for _, sr := range result { + out <- prometheus.MustNewConstSummary(desc, sr.sampleCount, sr.sampleSum, sr.quantiles, sr.labelValues...) + } +} + +func (d MetricFamiliesPerUser) SendSumOfHistograms(out chan<- prometheus.Metric, desc *prometheus.Desc, histogramName string) { + var ( + sampleCount uint64 + sampleSum float64 + buckets map[float64]uint64 + ) + + for _, userMetrics := range d { + for _, m := range userMetrics[histogramName].GetMetric() { + histo := m.GetHistogram() + sampleCount += histo.GetSampleCount() + sampleSum += histo.GetSampleSum() + buckets = mergeHistogramBuckets(buckets, histo.GetBucket()) + } + } + + out <- prometheus.MustNewConstHistogram(desc, sampleCount, sampleSum, buckets) +} + +func mergeSummaryQuantiles(quantiles map[float64]float64, summaryQuantiles []*dto.Quantile) map[float64]float64 { + if len(summaryQuantiles) == 0 { + return quantiles + } + + out := quantiles + if out == nil { + out = map[float64]float64{} + } + + for _, q := range summaryQuantiles { + // we assume that all summaries have same quantiles + out[q.GetQuantile()] += q.GetValue() + } + return out +} + +func mergeHistogramBuckets(buckets map[float64]uint64, histogramBuckets []*dto.Bucket) map[float64]uint64 { + if len(histogramBuckets) == 0 { + return buckets + } + + out := buckets + if out == nil { + out = map[float64]uint64{} + } + + for _, q := range histogramBuckets { + // we assume that all histograms have same buckets + out[q.GetUpperBound()] += q.GetCumulativeCount() + } + return out +} + +type metricsWithLabels struct { + labelValues []string + metrics []*dto.Metric +} + +func getMetricsWithLabelNames(mf *dto.MetricFamily, labelNames []string) map[string]metricsWithLabels { + result := map[string]metricsWithLabels{} + + for _, m := range mf.GetMetric() { + lbls, include := getLabelValues(m, labelNames) + if !include { + continue + } + + key := getLabelsString(lbls) + r := result[key] + if r.labelValues == nil { + r.labelValues = lbls + } + r.metrics = append(r.metrics, m) + result[key] = r + } + return result +} + +func getLabelValues(m *dto.Metric, labelNames []string) ([]string, bool) { + all := map[string]string{} + for _, lp := range m.GetLabel() { + all[lp.GetName()] = lp.GetValue() + } + + result := make([]string, 0, len(labelNames)) + for _, ln := range labelNames { + lv, ok := all[ln] + if !ok { + // required labels not found + return nil, false + } + result = append(result, lv) + } + return result, true +} + +func getLabelsString(labelValues []string) string { + buf := bytes.Buffer{} + for _, v := range labelValues { + buf.WriteString(v) + buf.WriteByte(0) // separator, not used in prometheus labels + } + return buf.String() +} + +// sum returns sum of values from all metrics from same metric family (= series with the same metric name, but different labels) +// Supplied function extracts value. +func sum(mf *dto.MetricFamily, fn func(*dto.Metric) float64) float64 { + result := float64(0) + for _, m := range mf.GetMetric() { + result += fn(m) + } + return result +} + +// This works even if m is nil, m.Counter is nil or m.Counter.Value is nil (it returns 0 in those cases) +func counterValue(m *dto.Metric) float64 { return m.GetCounter().GetValue() } +func gaugeValue(m *dto.Metric) float64 { return m.GetGauge().GetValue() } diff --git a/pkg/util/metrics_helper_test.go b/pkg/util/metrics_helper_test.go new file mode 100644 index 00000000000..2b4b3f6ab81 --- /dev/null +++ b/pkg/util/metrics_helper_test.go @@ -0,0 +1,83 @@ +package util + +import ( + "testing" + + "github.com/gogo/protobuf/proto" + dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/require" +) + +func TestSum(t *testing.T) { + require.Equal(t, float64(0), sum(nil, counterValue)) + require.Equal(t, float64(0), sum(&dto.MetricFamily{Metric: nil}, counterValue)) + require.Equal(t, float64(0), sum(&dto.MetricFamily{Metric: []*dto.Metric{{Counter: &dto.Counter{}}}}, counterValue)) + require.Equal(t, 12345.6789, sum(&dto.MetricFamily{Metric: []*dto.Metric{{Counter: &dto.Counter{Value: proto.Float64(12345.6789)}}}}, counterValue)) + require.Equal(t, 20235.80235, sum(&dto.MetricFamily{Metric: []*dto.Metric{ + {Counter: &dto.Counter{Value: proto.Float64(12345.6789)}}, + {Counter: &dto.Counter{Value: proto.Float64(7890.12345)}}, + }}, counterValue)) + // using 'counterValue' as function only sums counters + require.Equal(t, float64(0), sum(&dto.MetricFamily{Metric: []*dto.Metric{ + {Gauge: &dto.Gauge{Value: proto.Float64(12345.6789)}}, + {Gauge: &dto.Gauge{Value: proto.Float64(7890.12345)}}, + }}, counterValue)) +} + +func TestCounterValue(t *testing.T) { + require.Equal(t, float64(0), counterValue(nil)) + require.Equal(t, float64(0), counterValue(&dto.Metric{})) + require.Equal(t, float64(0), counterValue(&dto.Metric{Counter: &dto.Counter{}})) + require.Equal(t, float64(543857.12837), counterValue(&dto.Metric{Counter: &dto.Counter{Value: proto.Float64(543857.12837)}})) +} + +func TestGetMetricsWithLabelNames(t *testing.T) { + labels := []string{"a", "b"} + + require.Equal(t, map[string]metricsWithLabels{}, getMetricsWithLabelNames(nil, labels)) + require.Equal(t, map[string]metricsWithLabels{}, getMetricsWithLabelNames(&dto.MetricFamily{}, labels)) + + m1 := &dto.Metric{Label: makeLabels("a", "5"), Counter: &dto.Counter{Value: proto.Float64(1)}} + m2 := &dto.Metric{Label: makeLabels("a", "10", "b", "20"), Counter: &dto.Counter{Value: proto.Float64(1.5)}} + m3 := &dto.Metric{Label: makeLabels("a", "10", "b", "20", "c", "1"), Counter: &dto.Counter{Value: proto.Float64(2)}} + m4 := &dto.Metric{Label: makeLabels("a", "10", "b", "20", "c", "2"), Counter: &dto.Counter{Value: proto.Float64(3)}} + m5 := &dto.Metric{Label: makeLabels("a", "11", "b", "21"), Counter: &dto.Counter{Value: proto.Float64(4)}} + m6 := &dto.Metric{Label: makeLabels("ignored", "123", "a", "12", "b", "22", "c", "30"), Counter: &dto.Counter{Value: proto.Float64(4)}} + + out := getMetricsWithLabelNames(&dto.MetricFamily{Metric: []*dto.Metric{m1, m2, m3, m4, m5, m6}}, labels) + + // m1 is not returned at all, as it doesn't have both required labels. + require.Equal(t, map[string]metricsWithLabels{ + getLabelsString([]string{"10", "20"}): { + labelValues: []string{"10", "20"}, + metrics: []*dto.Metric{m2, m3, m4}}, + getLabelsString([]string{"11", "21"}): { + labelValues: []string{"11", "21"}, + metrics: []*dto.Metric{m5}}, + getLabelsString([]string{"12", "22"}): { + labelValues: []string{"12", "22"}, + metrics: []*dto.Metric{m6}}, + }, out) + + // no labels -- returns all metrics in single key. this isn't very efficient, and there are other functions + // (without labels) to handle this better, but it still works. + out2 := getMetricsWithLabelNames(&dto.MetricFamily{Metric: []*dto.Metric{m1, m2, m3, m4, m5, m6}}, nil) + require.Equal(t, map[string]metricsWithLabels{ + getLabelsString(nil): { + labelValues: []string{}, + metrics: []*dto.Metric{m1, m2, m3, m4, m5, m6}}, + }, out2) +} + +func makeLabels(namesAndValues ...string) []*dto.LabelPair { + out := []*dto.LabelPair(nil) + + for i := 0; i+1 < len(namesAndValues); i = i + 2 { + out = append(out, &dto.LabelPair{ + Name: proto.String(namesAndValues[i]), + Value: proto.String(namesAndValues[i+1]), + }) + } + + return out +}