diff --git a/CHANGELOG.md b/CHANGELOG.md index 34a734390ce..52c45f91d17 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ * Updated the index hosted at the root prefix to point to the updated routes. * Legacy routes hardcoded with the `/api/prom` prefix now respect the `-http.prefix` flag. * [CHANGE] The metrics `cortex_distributor_ingester_appends_total` and `distributor_ingester_append_failures_total` now includes a `type` label to differentiate between `samples` and `metadata`. #2336 +* [CHANGE] The metrics for number of chunks and bytes flushed to the chunk store are renamed from: #2463 + * `cortex_ingester_chunks_stored_total` > `cortex_chunk_store_stored_chunks_total` + * `cortex_ingester_chunk_stored_bytes_total` > `cortex_chunk_store_stored_chunk_bytes_total` * [CHANGE] Experimental TSDB: renamed blocks meta fetcher metrics: #2375 * `cortex_querier_bucket_store_blocks_meta_syncs_total` > `cortex_querier_blocks_meta_syncs_total` * `cortex_querier_bucket_store_blocks_meta_sync_failures_total` > `cortex_querier_blocks_meta_sync_failures_total` @@ -24,6 +27,9 @@ * [ENHANCEMENT] Single Binary: Added query-frontend to the single binary. Single binary users will now benefit from various query-frontend features. Primarily: sharding, parallelization, load shedding, additional caching (if configured), and query retries. #2437 * [ENHANCEMENT] Allow 1w (where w denotes week) and 1y (where y denotes year) when setting `-store.cache-lookups-older-than` and `-store.max-look-back-period`. #2454 * [ENHANCEMENT] Optimize index queries for matchers using "a|b|c"-type regex. #2446 #2475 +* [ENHANCEMENT] Added per tenant metrics for queries and chunks and bytes read from chunk store: #2463 + * `cortex_chunk_store_fetched_chunks_total` and `cortex_chunk_store_fetched_chunk_bytes_total` + * `cortex_query_frontend_queries_total` (per tenant queries counted by the frontend) * [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372 * [BUGFIX] Experimental TSDB: fixed chunk data corruption when querying back series using the experimental blocks storage. #2400 * [BUGFIX] Cassandra Storage: Fix endpoint TLS host verification. #2109 diff --git a/integration/chunks_storage_backends_test.go b/integration/chunks_storage_backends_test.go index 1c086e050e6..b42e3f88150 100644 --- a/integration/chunks_storage_backends_test.go +++ b/integration/chunks_storage_backends_test.go @@ -102,11 +102,19 @@ func TestChunksStorageAllIndexBackends(t *testing.T) { // lets wait till ingester has no chunks in memory require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(0), "cortex_ingester_memory_chunks")) + // lets verify that chunk store chunk metrics are updated. + require.NoError(t, ingester.WaitSumMetrics(e2e.Greater(0), "cortex_chunk_store_stored_chunks_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Greater(0), "cortex_chunk_store_stored_chunk_bytes_total")) + // Query back the series. result, err := client.Query("series_1", ts) require.NoError(t, err) require.Equal(t, model.ValVector, result.Type()) assert.Equal(t, expectedVector, result.(model.Vector)) + + // check we've queried them from the chunk store. + require.NoError(t, querier.WaitSumMetrics(e2e.Greater(0), "cortex_chunk_store_fetched_chunks_total")) + require.NoError(t, querier.WaitSumMetrics(e2e.Greater(0), "cortex_chunk_store_fetched_chunk_bytes_total")) } // Ensure no service-specific metrics prefix is used by the wrong service. diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 195fb620a2c..cac4e67adfe 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -159,6 +159,8 @@ func runQueryFrontendTest(t *testing.T, setup queryFrontendSetup) { wg.Wait() + require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numUsers*numQueriesPerUser), "cortex_query_frontend_queries_total")) + // Ensure no service-specific metrics prefix is used by the wrong service. assertServiceMetricsPrefixes(t, Distributor, distributor) assertServiceMetricsPrefixes(t, Ingester, ingester) diff --git a/pkg/chunk/storage/factory.go b/pkg/chunk/storage/factory.go index 436f6e92ba4..99d53c5b15d 100644 --- a/pkg/chunk/storage/factory.go +++ b/pkg/chunk/storage/factory.go @@ -9,6 +9,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/aws" @@ -100,7 +101,9 @@ func (cfg *Config) Validate() error { } // NewStore makes the storage clients based on the configuration. -func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits) (chunk.Store, error) { +func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits, reg prometheus.Registerer) (chunk.Store, error) { + chunkMetrics := newChunkClientMetrics(reg) + indexReadCache, err := cache.New(cfg.IndexQueriesCacheConfig) if err != nil { return nil, err @@ -146,6 +149,8 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf return nil, errors.Wrap(err, "error creating object client") } + chunks = newMetricsChunkClient(chunks, chunkMetrics) + err = stores.AddPeriod(storeCfg, s, index, chunks, limits, chunksCache, writeDedupeCache) if err != nil { return nil, err diff --git a/pkg/chunk/storage/factory_test.go b/pkg/chunk/storage/factory_test.go index 10d03ebace5..a2ff89b5b54 100644 --- a/pkg/chunk/storage/factory_test.go +++ b/pkg/chunk/storage/factory_test.go @@ -39,7 +39,7 @@ func TestFactoryStop(t *testing.T) { limits, err := validation.NewOverrides(defaults, nil) require.NoError(t, err) - store, err := NewStore(cfg, storeConfig, schemaConfig, limits) + store, err := NewStore(cfg, storeConfig, schemaConfig, limits, nil) require.NoError(t, err) store.Stop() diff --git a/pkg/chunk/storage/metrics.go b/pkg/chunk/storage/metrics.go new file mode 100644 index 00000000000..28feece364c --- /dev/null +++ b/pkg/chunk/storage/metrics.go @@ -0,0 +1,110 @@ +package storage + +import ( + "context" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/cortexproject/cortex/pkg/chunk" +) + +// takes a chunk client and exposes metrics for its operations. +type metricsChunkClient struct { + client chunk.Client + + metrics chunkClientMetrics +} + +func newMetricsChunkClient(client chunk.Client, metrics chunkClientMetrics) metricsChunkClient { + return metricsChunkClient{ + client: client, + metrics: metrics, + } +} + +type chunkClientMetrics struct { + chunksPutPerUser *prometheus.CounterVec + chunksSizePutPerUser *prometheus.CounterVec + chunksFetchedPerUser *prometheus.CounterVec + chunksSizeFetchedPerUser *prometheus.CounterVec +} + +func newChunkClientMetrics(reg prometheus.Registerer) chunkClientMetrics { + return chunkClientMetrics{ + chunksPutPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "chunk_store_stored_chunks_total", + Help: "Total stored chunks per user.", + }, []string{"user"}), + chunksSizePutPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "chunk_store_stored_chunk_bytes_total", + Help: "Total bytes stored in chunks per user.", + }, []string{"user"}), + chunksFetchedPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "chunk_store_fetched_chunks_total", + Help: "Total fetched chunks per user.", + }, []string{"user"}), + chunksSizeFetchedPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "chunk_store_fetched_chunk_bytes_total", + Help: "Total bytes fetched in chunks per user.", + }, []string{"user"}), + } +} + +func (c metricsChunkClient) Stop() { + c.client.Stop() +} + +func (c metricsChunkClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error { + if err := c.client.PutChunks(ctx, chunks); err != nil { + return err + } + + // For PutChunks, we explicitly encode the userID in the chunk and don't use context. + userSizes := map[string]int{} + userCounts := map[string]int{} + for _, c := range chunks { + userSizes[c.UserID] += c.Data.Size() + userCounts[c.UserID]++ + } + for user, size := range userSizes { + c.metrics.chunksSizePutPerUser.WithLabelValues(user).Add(float64(size)) + } + for user, num := range userCounts { + c.metrics.chunksPutPerUser.WithLabelValues(user).Add(float64(num)) + } + + return nil +} + +func (c metricsChunkClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) { + chks, err := c.client.GetChunks(ctx, chunks) + if err != nil { + return chks, err + } + + // For GetChunks, userID is the chunk and we don't need to use context. + // For now, we just load one user chunks at once, but the interface lets us do it for multiple users. + userSizes := map[string]int{} + userCounts := map[string]int{} + for _, c := range chks { + userSizes[c.UserID] += c.Data.Size() + userCounts[c.UserID]++ + } + for user, size := range userSizes { + c.metrics.chunksSizeFetchedPerUser.WithLabelValues(user).Add(float64(size)) + } + for user, num := range userCounts { + c.metrics.chunksFetchedPerUser.WithLabelValues(user).Add(float64(num)) + } + + return chks, nil +} + +func (c metricsChunkClient) DeleteChunk(ctx context.Context, chunkID string) error { + return c.client.DeleteChunk(ctx, chunkID) +} diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index adaeb89b348..aec8993e125 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -278,7 +278,7 @@ func (t *Cortex) initStore(cfg *Config) (serv services.Service, err error) { return } - t.store, err = storage.NewStore(cfg.Storage, cfg.ChunkStore, cfg.Schema, t.overrides) + t.store, err = storage.NewStore(cfg.Storage, cfg.ChunkStore, cfg.Schema, t.overrides, prometheus.DefaultRegisterer) if err != nil { return } diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 46de627fd76..7dee669454b 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -339,8 +339,6 @@ func (i *Ingester) flushChunks(ctx context.Context, userID string, fp model.Fing return err } - sizePerUser := i.metrics.chunkSizePerUser.WithLabelValues(userID) - countPerUser := i.metrics.chunksPerUser.WithLabelValues(userID) // Record statistics only when actual put request did not return error. for _, chunkDesc := range chunkDescs { utilization, length, size := chunkDesc.C.Utilization(), chunkDesc.C.Len(), chunkDesc.C.Size() @@ -348,8 +346,6 @@ func (i *Ingester) flushChunks(ctx context.Context, userID string, fp model.Fing i.metrics.chunkUtilization.Observe(utilization) i.metrics.chunkLength.Observe(float64(length)) i.metrics.chunkSize.Observe(float64(size)) - sizePerUser.Add(float64(size)) - countPerUser.Inc() i.metrics.chunkAge.Observe(model.Now().Sub(chunkDesc.FirstTime).Seconds()) } diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index f1b9548902b..636fd9ac209 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -45,8 +45,6 @@ type ingesterMetrics struct { chunkUtilization prometheus.Histogram chunkLength prometheus.Histogram chunkSize prometheus.Histogram - chunksPerUser *prometheus.CounterVec - chunkSizePerUser *prometheus.CounterVec chunkAge prometheus.Histogram memoryChunks prometheus.Gauge flushReasons *prometheus.CounterVec @@ -153,14 +151,6 @@ func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSD Help: "Distribution of stored chunk sizes (when stored).", Buckets: prometheus.ExponentialBuckets(500, 2, 5), // biggest bucket is 500*2^(5-1) = 8000 }), - chunksPerUser: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ingester_chunks_stored_total", - Help: "Total stored chunks per user.", - }, []string{"user"}), - chunkSizePerUser: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_ingester_chunk_stored_bytes_total", - Help: "Total bytes stored in chunks per user.", - }, []string{"user"}), chunkAge: promauto.With(r).NewHistogram(prometheus.HistogramOpts{ Name: "cortex_ingester_chunk_age_seconds", Help: "Distribution of chunk ages (when stored).", diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index bd0cec1545d..1543098e080 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -94,6 +94,7 @@ func getChunksIteratorFunction(cfg Config) chunkIteratorFunc { return mergeChunks } +// NewChunkStoreQueryable returns the storage.Queryable implementation against the chunks store. func NewChunkStoreQueryable(cfg Config, chunkStore chunkstore.ChunkStore) storage.Queryable { return newChunkStoreQueryable(chunkStore, getChunksIteratorFunction(cfg)) } diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index fb604734743..018c71942b0 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -26,6 +26,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/promql" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" @@ -68,6 +69,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.ResultsCacheConfig.RegisterFlags(f) } +// Validate validates the config. func (cfg *Config) Validate(log log.Logger) error { // SplitQueriesByDay is deprecated use SplitQueriesByInterval. if cfg.SplitQueriesByDay { @@ -130,6 +132,13 @@ func NewTripperware( minShardingLookback time.Duration, registerer prometheus.Registerer, ) (frontend.Tripperware, cache.Cache, error) { + // Per tenant query metrics. + queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "query_frontend_queries_total", + Help: "Total queries sent per tenant.", + }, []string{"op", "user"}) + // Metric used to keep track of each middleware execution duration. metrics := NewInstrumentMiddlewareMetrics(registerer) @@ -181,7 +190,20 @@ func NewTripperware( if len(queryRangeMiddleware) > 0 { queryrange := NewRoundTripper(next, codec, queryRangeMiddleware...) return frontend.RoundTripFunc(func(r *http.Request) (*http.Response, error) { - if !strings.HasSuffix(r.URL.Path, "/query_range") { + isQueryRange := strings.HasSuffix(r.URL.Path, "/query_range") + op := "query" + if isQueryRange { + op = "query_range" + } + + user, err := user.ExtractOrgID(r.Context()) + // This should never happen anyways because we have auth middleware before this. + if err != nil { + return nil, err + } + queriesPerTenant.WithLabelValues(op, user).Inc() + + if !isQueryRange { return next.RoundTrip(r) } return queryrange.RoundTrip(r)