Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
* Updated the index hosted at the root prefix to point to the updated routes.
* Legacy routes hardcoded with the `/api/prom` prefix now respect the `-http.prefix` flag.
* [CHANGE] The metrics `cortex_distributor_ingester_appends_total` and `distributor_ingester_append_failures_total` now includes a `type` label to differentiate between `samples` and `metadata`. #2336
* [CHANGE] The metrics for number of chunks and bytes flushed to the chunk store are renamed from: #2463
* `cortex_ingester_chunks_stored_total` > `cortex_chunk_store_stored_chunks_total`
* `cortex_ingester_chunk_stored_bytes_total` > `cortex_chunk_store_stored_chunk_bytes_total`
* [CHANGE] Experimental TSDB: renamed blocks meta fetcher metrics: #2375
* `cortex_querier_bucket_store_blocks_meta_syncs_total` > `cortex_querier_blocks_meta_syncs_total`
* `cortex_querier_bucket_store_blocks_meta_sync_failures_total` > `cortex_querier_blocks_meta_sync_failures_total`
Expand All @@ -24,6 +27,9 @@
* [ENHANCEMENT] Single Binary: Added query-frontend to the single binary. Single binary users will now benefit from various query-frontend features. Primarily: sharding, parallelization, load shedding, additional caching (if configured), and query retries. #2437
* [ENHANCEMENT] Allow 1w (where w denotes week) and 1y (where y denotes year) when setting `-store.cache-lookups-older-than` and `-store.max-look-back-period`. #2454
* [ENHANCEMENT] Optimize index queries for matchers using "a|b|c"-type regex. #2446 #2475
* [ENHANCEMENT] Added per tenant metrics for queries and chunks and bytes read from chunk store: #2463
* `cortex_chunk_store_fetched_chunks_total` and `cortex_chunk_store_fetched_chunk_bytes_total`
* `cortex_query_frontend_queries_total` (per tenant queries counted by the frontend)
* [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372
* [BUGFIX] Experimental TSDB: fixed chunk data corruption when querying back series using the experimental blocks storage. #2400
* [BUGFIX] Cassandra Storage: Fix endpoint TLS host verification. #2109
Expand Down
8 changes: 8 additions & 0 deletions integration/chunks_storage_backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,19 @@ func TestChunksStorageAllIndexBackends(t *testing.T) {
// lets wait till ingester has no chunks in memory
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(0), "cortex_ingester_memory_chunks"))

// lets verify that chunk store chunk metrics are updated.
require.NoError(t, ingester.WaitSumMetrics(e2e.Greater(0), "cortex_chunk_store_stored_chunks_total"))
require.NoError(t, ingester.WaitSumMetrics(e2e.Greater(0), "cortex_chunk_store_stored_chunk_bytes_total"))

// Query back the series.
result, err := client.Query("series_1", ts)
require.NoError(t, err)
require.Equal(t, model.ValVector, result.Type())
assert.Equal(t, expectedVector, result.(model.Vector))

// check we've queried them from the chunk store.
require.NoError(t, querier.WaitSumMetrics(e2e.Greater(0), "cortex_chunk_store_fetched_chunks_total"))
require.NoError(t, querier.WaitSumMetrics(e2e.Greater(0), "cortex_chunk_store_fetched_chunk_bytes_total"))
}

// Ensure no service-specific metrics prefix is used by the wrong service.
Expand Down
2 changes: 2 additions & 0 deletions integration/query_frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ func runQueryFrontendTest(t *testing.T, setup queryFrontendSetup) {

wg.Wait()

require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numUsers*numQueriesPerUser), "cortex_query_frontend_queries_total"))

// Ensure no service-specific metrics prefix is used by the wrong service.
assertServiceMetricsPrefixes(t, Distributor, distributor)
assertServiceMetricsPrefixes(t, Ingester, ingester)
Expand Down
7 changes: 6 additions & 1 deletion pkg/chunk/storage/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/aws"
Expand Down Expand Up @@ -100,7 +101,9 @@ func (cfg *Config) Validate() error {
}

// NewStore makes the storage clients based on the configuration.
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits) (chunk.Store, error) {
func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits StoreLimits, reg prometheus.Registerer) (chunk.Store, error) {
chunkMetrics := newChunkClientMetrics(reg)

indexReadCache, err := cache.New(cfg.IndexQueriesCacheConfig)
if err != nil {
return nil, err
Expand Down Expand Up @@ -146,6 +149,8 @@ func NewStore(cfg Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConf
return nil, errors.Wrap(err, "error creating object client")
}

chunks = newMetricsChunkClient(chunks, chunkMetrics)

err = stores.AddPeriod(storeCfg, s, index, chunks, limits, chunksCache, writeDedupeCache)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunk/storage/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestFactoryStop(t *testing.T) {
limits, err := validation.NewOverrides(defaults, nil)
require.NoError(t, err)

store, err := NewStore(cfg, storeConfig, schemaConfig, limits)
store, err := NewStore(cfg, storeConfig, schemaConfig, limits, nil)
require.NoError(t, err)

store.Stop()
Expand Down
110 changes: 110 additions & 0 deletions pkg/chunk/storage/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package storage

import (
"context"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/cortexproject/cortex/pkg/chunk"
)

// takes a chunk client and exposes metrics for its operations.
type metricsChunkClient struct {
client chunk.Client

metrics chunkClientMetrics
}

func newMetricsChunkClient(client chunk.Client, metrics chunkClientMetrics) metricsChunkClient {
return metricsChunkClient{
client: client,
metrics: metrics,
}
}

type chunkClientMetrics struct {
chunksPutPerUser *prometheus.CounterVec
chunksSizePutPerUser *prometheus.CounterVec
chunksFetchedPerUser *prometheus.CounterVec
chunksSizeFetchedPerUser *prometheus.CounterVec
}

func newChunkClientMetrics(reg prometheus.Registerer) chunkClientMetrics {
return chunkClientMetrics{
chunksPutPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "chunk_store_stored_chunks_total",
Help: "Total stored chunks per user.",
}, []string{"user"}),
chunksSizePutPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "chunk_store_stored_chunk_bytes_total",
Help: "Total bytes stored in chunks per user.",
}, []string{"user"}),
chunksFetchedPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "chunk_store_fetched_chunks_total",
Help: "Total fetched chunks per user.",
}, []string{"user"}),
chunksSizeFetchedPerUser: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "chunk_store_fetched_chunk_bytes_total",
Help: "Total bytes fetched in chunks per user.",
}, []string{"user"}),
}
}

func (c metricsChunkClient) Stop() {
c.client.Stop()
}

func (c metricsChunkClient) PutChunks(ctx context.Context, chunks []chunk.Chunk) error {
if err := c.client.PutChunks(ctx, chunks); err != nil {
return err
}

// For PutChunks, we explicitly encode the userID in the chunk and don't use context.
userSizes := map[string]int{}
userCounts := map[string]int{}
for _, c := range chunks {
userSizes[c.UserID] += c.Data.Size()
userCounts[c.UserID]++
}
for user, size := range userSizes {
c.metrics.chunksSizePutPerUser.WithLabelValues(user).Add(float64(size))
}
for user, num := range userCounts {
c.metrics.chunksPutPerUser.WithLabelValues(user).Add(float64(num))
}

return nil
}

func (c metricsChunkClient) GetChunks(ctx context.Context, chunks []chunk.Chunk) ([]chunk.Chunk, error) {
chks, err := c.client.GetChunks(ctx, chunks)
if err != nil {
return chks, err
}

// For GetChunks, userID is the chunk and we don't need to use context.
// For now, we just load one user chunks at once, but the interface lets us do it for multiple users.
userSizes := map[string]int{}
userCounts := map[string]int{}
for _, c := range chks {
userSizes[c.UserID] += c.Data.Size()
userCounts[c.UserID]++
}
for user, size := range userSizes {
c.metrics.chunksSizeFetchedPerUser.WithLabelValues(user).Add(float64(size))
}
for user, num := range userCounts {
c.metrics.chunksFetchedPerUser.WithLabelValues(user).Add(float64(num))
}

return chks, nil
}

func (c metricsChunkClient) DeleteChunk(ctx context.Context, chunkID string) error {
return c.client.DeleteChunk(ctx, chunkID)
}
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (t *Cortex) initStore(cfg *Config) (serv services.Service, err error) {
return
}

t.store, err = storage.NewStore(cfg.Storage, cfg.ChunkStore, cfg.Schema, t.overrides)
t.store, err = storage.NewStore(cfg.Storage, cfg.ChunkStore, cfg.Schema, t.overrides, prometheus.DefaultRegisterer)
if err != nil {
return
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/ingester/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,17 +339,13 @@ func (i *Ingester) flushChunks(ctx context.Context, userID string, fp model.Fing
return err
}

sizePerUser := i.metrics.chunkSizePerUser.WithLabelValues(userID)
countPerUser := i.metrics.chunksPerUser.WithLabelValues(userID)
// Record statistics only when actual put request did not return error.
for _, chunkDesc := range chunkDescs {
utilization, length, size := chunkDesc.C.Utilization(), chunkDesc.C.Len(), chunkDesc.C.Size()
util.Event().Log("msg", "chunk flushed", "userID", userID, "fp", fp, "series", metric, "nlabels", len(metric), "utilization", utilization, "length", length, "size", size, "firstTime", chunkDesc.FirstTime, "lastTime", chunkDesc.LastTime)
i.metrics.chunkUtilization.Observe(utilization)
i.metrics.chunkLength.Observe(float64(length))
i.metrics.chunkSize.Observe(float64(size))
sizePerUser.Add(float64(size))
countPerUser.Inc()
i.metrics.chunkAge.Observe(model.Now().Sub(chunkDesc.FirstTime).Seconds())
}

Expand Down
10 changes: 0 additions & 10 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ type ingesterMetrics struct {
chunkUtilization prometheus.Histogram
chunkLength prometheus.Histogram
chunkSize prometheus.Histogram
chunksPerUser *prometheus.CounterVec
chunkSizePerUser *prometheus.CounterVec
chunkAge prometheus.Histogram
memoryChunks prometheus.Gauge
flushReasons *prometheus.CounterVec
Expand Down Expand Up @@ -153,14 +151,6 @@ func newIngesterMetrics(r prometheus.Registerer, createMetricsConflictingWithTSD
Help: "Distribution of stored chunk sizes (when stored).",
Buckets: prometheus.ExponentialBuckets(500, 2, 5), // biggest bucket is 500*2^(5-1) = 8000
}),
chunksPerUser: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_chunks_stored_total",
Help: "Total stored chunks per user.",
}, []string{"user"}),
chunkSizePerUser: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_chunk_stored_bytes_total",
Help: "Total bytes stored in chunks per user.",
}, []string{"user"}),
chunkAge: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ingester_chunk_age_seconds",
Help: "Distribution of chunk ages (when stored).",
Expand Down
1 change: 1 addition & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func getChunksIteratorFunction(cfg Config) chunkIteratorFunc {
return mergeChunks
}

// NewChunkStoreQueryable returns the storage.Queryable implementation against the chunks store.
func NewChunkStoreQueryable(cfg Config, chunkStore chunkstore.ChunkStore) storage.Queryable {
return newChunkStoreQueryable(chunkStore, getChunksIteratorFunction(cfg))
}
Expand Down
24 changes: 23 additions & 1 deletion pkg/querier/queryrange/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/promql"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -68,6 +69,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.ResultsCacheConfig.RegisterFlags(f)
}

// Validate validates the config.
func (cfg *Config) Validate(log log.Logger) error {
// SplitQueriesByDay is deprecated use SplitQueriesByInterval.
if cfg.SplitQueriesByDay {
Expand Down Expand Up @@ -130,6 +132,13 @@ func NewTripperware(
minShardingLookback time.Duration,
registerer prometheus.Registerer,
) (frontend.Tripperware, cache.Cache, error) {
// Per tenant query metrics.
queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "query_frontend_queries_total",
Help: "Total queries sent per tenant.",
}, []string{"op", "user"})

// Metric used to keep track of each middleware execution duration.
metrics := NewInstrumentMiddlewareMetrics(registerer)

Expand Down Expand Up @@ -181,7 +190,20 @@ func NewTripperware(
if len(queryRangeMiddleware) > 0 {
queryrange := NewRoundTripper(next, codec, queryRangeMiddleware...)
return frontend.RoundTripFunc(func(r *http.Request) (*http.Response, error) {
if !strings.HasSuffix(r.URL.Path, "/query_range") {
isQueryRange := strings.HasSuffix(r.URL.Path, "/query_range")
op := "query"
if isQueryRange {
op = "query_range"
}

user, err := user.ExtractOrgID(r.Context())
// This should never happen anyways because we have auth middleware before this.
if err != nil {
return nil, err
}
queriesPerTenant.WithLabelValues(op, user).Inc()

if !isQueryRange {
return next.RoundTrip(r)
}
return queryrange.RoundTrip(r)
Expand Down