Skip to content

Commit 1638c36

Browse files
authored
Add initial per tenant query and chunk metrics AND RENAME SOME EXISTING METRICS (#2463)
* Add initial per tenant query metric More will be added once the following is merged: prometheus/prometheus#6890 Signed-off-by: Goutham Veeramachaneni <[email protected]> * Add per tenant chunks stored and fetched metrics Signed-off-by: Goutham Veeramachaneni <[email protected]> * Address feedback Signed-off-by: Goutham Veeramachaneni <[email protected]>
1 parent f81290a commit 1638c36

File tree

11 files changed

+158
-18
lines changed

11 files changed

+158
-18
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
* Updated the index hosted at the root prefix to point to the updated routes.
99
* Legacy routes hardcoded with the `/api/prom` prefix now respect the `-http.prefix` flag.
1010
* [CHANGE] The metrics `cortex_distributor_ingester_appends_total` and `distributor_ingester_append_failures_total` now includes a `type` label to differentiate between `samples` and `metadata`. #2336
11+
* [CHANGE] The metrics for number of chunks and bytes flushed to the chunk store are renamed from: #2463
12+
* `cortex_ingester_chunks_stored_total` > `cortex_chunk_store_stored_chunks_total`
13+
* `cortex_ingester_chunk_stored_bytes_total` > `cortex_chunk_store_stored_chunk_bytes_total`
1114
* [CHANGE] Experimental TSDB: renamed blocks meta fetcher metrics: #2375
1215
* `cortex_querier_bucket_store_blocks_meta_syncs_total` > `cortex_querier_blocks_meta_syncs_total`
1316
* `cortex_querier_bucket_store_blocks_meta_sync_failures_total` > `cortex_querier_blocks_meta_sync_failures_total`
@@ -24,6 +27,9 @@
2427
* [ENHANCEMENT] Single Binary: Added query-frontend to the single binary. Single binary users will now benefit from various query-frontend features. Primarily: sharding, parallelization, load shedding, additional caching (if configured), and query retries. #2437
2528
* [ENHANCEMENT] Allow 1w (where w denotes week) and 1y (where y denotes year) when setting `-store.cache-lookups-older-than` and `-store.max-look-back-period`. #2454
2629
* [ENHANCEMENT] Optimize index queries for matchers using "a|b|c"-type regex. #2446 #2475
30+
* [ENHANCEMENT] Added per tenant metrics for queries and chunks and bytes read from chunk store: #2463
31+
* `cortex_chunk_store_fetched_chunks_total` and `cortex_chunk_store_fetched_chunk_bytes_total`
32+
* `cortex_query_frontend_queries_total` (per tenant queries counted by the frontend)
2733
* [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372
2834
* [BUGFIX] Experimental TSDB: fixed chunk data corruption when querying back series using the experimental blocks storage. #2400
2935
* [BUGFIX] Cassandra Storage: Fix endpoint TLS host verification. #2109

integration/chunks_storage_backends_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,19 @@ func TestChunksStorageAllIndexBackends(t *testing.T) {
102102
// lets wait till ingester has no chunks in memory
103103
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(0), "cortex_ingester_memory_chunks"))
104104

105+
// lets verify that chunk store chunk metrics are updated.
106+
require.NoError(t, ingester.WaitSumMetrics(e2e.Greater(0), "cortex_chunk_store_stored_chunks_total"))
107+
require.NoError(t, ingester.WaitSumMetrics(e2e.Greater(0), "cortex_chunk_store_stored_chunk_bytes_total"))
108+
105109
// Query back the series.
106110
result, err := client.Query("series_1", ts)
107111
require.NoError(t, err)
108112
require.Equal(t, model.ValVector, result.Type())
109113
assert.Equal(t, expectedVector, result.(model.Vector))
114+
115+
// check we've queried them from the chunk store.
116+
require.NoError(t, querier.WaitSumMetrics(e2e.Greater(0), "cortex_chunk_store_fetched_chunks_total"))
117+
require.NoError(t, querier.WaitSumMetrics(e2e.Greater(0), "cortex_chunk_store_fetched_chunk_bytes_total"))
110118
}
111119

112120
// Ensure no service-specific metrics prefix is used by the wrong service.

integration/query_frontend_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ func runQueryFrontendTest(t *testing.T, setup queryFrontendSetup) {
159159

160160
wg.Wait()
161161

162+
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numUsers*numQueriesPerUser), "cortex_query_frontend_queries_total"))
163+
162164
// Ensure no service-specific metrics prefix is used by the wrong service.
163165
assertServiceMetricsPrefixes(t, Distributor, distributor)
164166
assertServiceMetricsPrefixes(t, Ingester, ingester)

pkg/chunk/storage/factory.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/go-kit/kit/log/level"
1111
"github.com/pkg/errors"
12+
"github.com/prometheus/client_golang/prometheus"
1213

1314
"github.com/cortexproject/cortex/pkg/chunk"
1415
"github.com/cortexproject/cortex/pkg/chunk/aws"
@@ -100,7 +101,9 @@ func (cfg *Config) Validate() error {
100101
}
101102

102103
// NewStore makes the storage clients based on the configuration.
103-
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits) (chunk.Store, error) {
104+
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits, reg prometheus.Registerer) (chunk.Store, error) {
105+
chunkMetrics := newChunkClientMetrics(reg)
106+
104107
indexReadCache, err := cache.New(cfg.IndexQueriesCacheConfig)
105108
if err != nil {
106109
return nil, err
@@ -146,6 +149,8 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
146149
return nil, errors.Wrap(err, "error creating object client")
147150
}
148151

152+
chunks = newMetricsChunkClient(chunks, chunkMetrics)
153+
149154
err = stores.AddPeriod(storeCfg, s, index, chunks, limits, chunksCache, writeDedupeCache)
150155
if err != nil {
151156
return nil, err

pkg/chunk/storage/factory_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestFactoryStop(t *testing.T) {
3939
limits, err := validation.NewOverrides(defaults, nil)
4040
require.NoError(t, err)
4141

42-
store, err := NewStore(cfg, storeConfig, schemaConfig, limits)
42+
store, err := NewStore(cfg, storeConfig, schemaConfig, limits, nil)
4343
require.NoError(t, err)
4444

4545
store.Stop()

pkg/chunk/storage/metrics.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package storage
2+
3+
import (
4+
"context"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
"github.com/prometheus/client_golang/prometheus/promauto"
8+
9+
"github.com/cortexproject/cortex/pkg/chunk"
10+
)
11+
12+
// takes a chunk client and exposes metrics for its operations.
13+
type metricsChunkClient struct {
14+
client chunk.Client
15+
16+
metrics chunkClientMetrics
17+
}
18+
19+
func newMetricsChunkClient(client chunk.Client, metrics chunkClientMetrics) metricsChunkClient {
20+
return metricsChunkClient{
21+
client: client,
22+
metrics: metrics,
23+
}
24+
}
25+
26+
type chunkClientMetrics struct {
27+
chunksPutPerUser *prometheus.CounterVec
28+
chunksSizePutPerUser *prometheus.CounterVec
29+
chunksFetchedPerUser *prometheus.CounterVec
30+
chunksSizeFetchedPerUser *prometheus.CounterVec
31+
}
32+
33+
func newChunkClientMetrics(reg prometheus.Registerer) chunkClientMetrics {
34+
return chunkClientMetrics{
35+
chunksPutPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
36+
Namespace: "cortex",
37+
Name: "chunk_store_stored_chunks_total",
38+
Help: "Total stored chunks per user.",
39+
}, []string{"user"}),
40+
chunksSizePutPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
41+
Namespace: "cortex",
42+
Name: "chunk_store_stored_chunk_bytes_total",
43+
Help: "Total bytes stored in chunks per user.",
44+
}, []string{"user"}),
45+
chunksFetchedPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
46+
Namespace: "cortex",
47+
Name: "chunk_store_fetched_chunks_total",
48+
Help: "Total fetched chunks per user.",
49+
}, []string{"user"}),
50+
chunksSizeFetchedPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
51+
Namespace: "cortex",
52+
Name: "chunk_store_fetched_chunk_bytes_total",
53+
Help: "Total bytes fetched in chunks per user.",
54+
}, []string{"user"}),
55+
}
56+
}
57+
58+
func (c metricsChunkClient) Stop() {
59+
c.client.Stop()
60+
}
61+
62+
func (c metricsChunkClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
63+
if err := c.client.PutChunks(ctx, chunks); err != nil {
64+
return err
65+
}
66+
67+
// For PutChunks, we explicitly encode the userID in the chunk and don't use context.
68+
userSizes := map[string]int{}
69+
userCounts := map[string]int{}
70+
for _, c := range chunks {
71+
userSizes[c.UserID] += c.Data.Size()
72+
userCounts[c.UserID]++
73+
}
74+
for user, size := range userSizes {
75+
c.metrics.chunksSizePutPerUser.WithLabelValues(user).Add(float64(size))
76+
}
77+
for user, num := range userCounts {
78+
c.metrics.chunksPutPerUser.WithLabelValues(user).Add(float64(num))
79+
}
80+
81+
return nil
82+
}
83+
84+
func (c metricsChunkClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
85+
chks, err := c.client.GetChunks(ctx, chunks)
86+
if err != nil {
87+
return chks, err
88+
}
89+
90+
// For GetChunks, userID is the chunk and we don't need to use context.
91+
// For now, we just load one user chunks at once, but the interface lets us do it for multiple users.
92+
userSizes := map[string]int{}
93+
userCounts := map[string]int{}
94+
for _, c := range chks {
95+
userSizes[c.UserID] += c.Data.Size()
96+
userCounts[c.UserID]++
97+
}
98+
for user, size := range userSizes {
99+
c.metrics.chunksSizeFetchedPerUser.WithLabelValues(user).Add(float64(size))
100+
}
101+
for user, num := range userCounts {
102+
c.metrics.chunksFetchedPerUser.WithLabelValues(user).Add(float64(num))
103+
}
104+
105+
return chks, nil
106+
}
107+
108+
func (c metricsChunkClient) DeleteChunk(ctx context.Context, chunkID string) error {
109+
return c.client.DeleteChunk(ctx, chunkID)
110+
}

pkg/cortex/modules.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ func (t *Cortex) initStore(cfg *Config) (serv services.Service, err error) {
278278
return
279279
}
280280

281-
t.store, err = storage.NewStore(cfg.Storage, cfg.ChunkStore, cfg.Schema, t.overrides)
281+
t.store, err = storage.NewStore(cfg.Storage, cfg.ChunkStore, cfg.Schema, t.overrides, prometheus.DefaultRegisterer)
282282
if err != nil {
283283
return
284284
}

pkg/ingester/flush.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -339,17 +339,13 @@ func (i *Ingester) flushChunks(ctx context.Context, userID string, fp model.Fing
339339
return err
340340
}
341341

342-
sizePerUser := i.metrics.chunkSizePerUser.WithLabelValues(userID)
343-
countPerUser := i.metrics.chunksPerUser.WithLabelValues(userID)
344342
// Record statistics only when actual put request did not return error.
345343
for _, chunkDesc := range chunkDescs {
346344
utilization, length, size := chunkDesc.C.Utilization(), chunkDesc.C.Len(), chunkDesc.C.Size()
347345
util.Event().Log("msg", "chunk flushed", "userID", userID, "fp", fp, "series", metric, "nlabels", len(metric), "utilization", utilization, "length", length, "size", size, "firstTime", chunkDesc.FirstTime, "lastTime", chunkDesc.LastTime)
348346
i.metrics.chunkUtilization.Observe(utilization)
349347
i.metrics.chunkLength.Observe(float64(length))
350348
i.metrics.chunkSize.Observe(float64(size))
351-
sizePerUser.Add(float64(size))
352-
countPerUser.Inc()
353349
i.metrics.chunkAge.Observe(model.Now().Sub(chunkDesc.FirstTime).Seconds())
354350
}
355351

pkg/ingester/metrics.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ type ingesterMetrics struct {
4545
chunkUtilization prometheus.Histogram
4646
chunkLength prometheus.Histogram
4747
chunkSize prometheus.Histogram
48-
chunksPerUser *prometheus.CounterVec
49-
chunkSizePerUser *prometheus.CounterVec
5048
chunkAge prometheus.Histogram
5149
memoryChunks prometheus.Gauge
5250
flushReasons *prometheus.CounterVec
@@ -153,14 +151,6 @@ func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSD
153151
Help: "Distribution of stored chunk sizes (when stored).",
154152
Buckets: prometheus.ExponentialBuckets(500, 2, 5), // biggest bucket is 500*2^(5-1) = 8000
155153
}),
156-
chunksPerUser: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
157-
Name: "cortex_ingester_chunks_stored_total",
158-
Help: "Total stored chunks per user.",
159-
}, []string{"user"}),
160-
chunkSizePerUser: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
161-
Name: "cortex_ingester_chunk_stored_bytes_total",
162-
Help: "Total bytes stored in chunks per user.",
163-
}, []string{"user"}),
164154
chunkAge: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
165155
Name: "cortex_ingester_chunk_age_seconds",
166156
Help: "Distribution of chunk ages (when stored).",

pkg/querier/querier.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ func getChunksIteratorFunction(cfg Config) chunkIteratorFunc {
9494
return mergeChunks
9595
}
9696

97+
// NewChunkStoreQueryable returns the storage.Queryable implementation against the chunks store.
9798
func NewChunkStoreQueryable(cfg Config, chunkStore chunkstore.ChunkStore) storage.Queryable {
9899
return newChunkStoreQueryable(chunkStore, getChunksIteratorFunction(cfg))
99100
}

0 commit comments

Comments
 (0)