diff --git a/CHANGELOG.md b/CHANGELOG.md index cb8f4af4698..e8203a5a5c0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,16 @@ ## master / unreleased +* [FEATURE] Fan out parallelizable queries to backend queriers concurrently. #1878 + * `querier.parallelise-shardable-queries` (bool) + * Requires a shard-compatible schema (v10+) + * This causes the number of traces to increase accordingly. + * The query-frontend now requires a schema config to determine how/when to shard queries, either from a file or from flags (i.e. by the `config-yaml` CLI flag). This is the same schema config the queriers consume. The schema is only required to use this option. + * It's also advised to increase downstream concurrency controls as well: + * `querier.max-outstanding-requests-per-tenant` + * `querier.max-query-parallelism` + * `querier.max-concurrent` + * `server.grpc-max-concurrent-streams` (for both query-frontends and queriers) * [CHANGE] The frontend http server will now send 502 in case of deadline exceeded and 499 if the user requested cancellation. #2156 * [CHANGE] Config file changed to remove top level `config_store` field in favor of a nested `configdb` field. #2125 * [CHANGE] Removed unnecessary `frontend.cache-split-interval` in favor of `querier.split-queries-by-interval` both to reduce configuration complexity and guarantee alignment of these two configs. Starting from now, `-querier.cache-results` may only be enabled in conjunction with `-querier.split-queries-by-interval` (previously the cache interval default was `24h` so if you want to preserve the same behaviour you should set `-querier.split-queries-by-interval=24h`). #2040 diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index 737a52f727d..94acfd86878 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -68,6 +68,30 @@ The ingester query API was improved over time, but defaults to the old behaviour ## Query Frontend +- `-querier.parallelise-shardable-queries` + + If set to true, will cause the query frontend to mutate incoming queries when possible by turning `sum` operations into sharded `sum` operations. This requires a shard-compatible schema (v10+). An abridged example: + `sum by (foo) (rate(bar{baz=”blip”}[1m]))` -> + ``` + sum by (foo) ( + sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”0of16”}[1m])) or + sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”1of16”}[1m])) or + ... + sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”15of16”}[1m])) + ) + ``` + When enabled, the query-frontend requires a schema config to determine how/when to shard queries, either from a file or from flags (i.e. by the `config-yaml` CLI flag). This is the same schema config the queriers consume. + It's also advised to increase downstream concurrency controls as well to account for more queries of smaller sizes: + + - `querier.max-outstanding-requests-per-tenant` + - `querier.max-query-parallelism` + - `querier.max-concurrent` + - `server.grpc-max-concurrent-streams` (for both query-frontends and queriers) + + Furthermore, both querier and query-frontend components require the `querier.query-ingesters-within` parameter to know when to start sharding requests (ingester queries are not sharded). It's recommended to align this with `ingester.max-chunk-age`. + + Instrumentation (traces) also scale with the number of sharded queries and it's suggested to account for increased throughput there as well (for instance via `JAEGER_REPORTER_MAX_QUEUE_SIZE`). + - `-querier.align-querier-with-step` If set to true, will cause the query frontend to mutate incoming queries and align their start and end parameters to the step parameter of the query. This improves the cacheability of the query results. diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index f5476173763..1201fe0e839 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -641,6 +641,11 @@ results_cache: # error is returned. # CLI flag: -querier.max-retries-per-request [max_retries: | default = 5] + +# Perform query parallelisations based on storage sharding configuration and +# query ASTs. This feature is supported only by the chunks storage engine. +# CLI flag: -querier.parallelise-shardable-queries +[parallelise_shardable_queries: | default = false] ``` ## `ruler_config` diff --git a/docs/operations/query-auditor.md b/docs/operations/query-auditor.md new file mode 100644 index 00000000000..32828cef159 --- /dev/null +++ b/docs/operations/query-auditor.md @@ -0,0 +1,140 @@ +--- +title: "Query Auditor (tool)" +linkTitle: "Query Auditor (tool)" +weight: 2 +slug: query-auditor +--- + +The query auditor is a tool bundled in the Cortex repository, but **not** included in Docker images -- this must be built from source. It's primarily useful for those _developing_ Cortex, but can be helpful to operators as well during certain scenarios (backend migrations come to mind). + +## How it works + +The `query-audit` tool performs a set of queries against two backends that expose the Prometheus read API. This is generally the `query-frontend` component of two Cortex deployments. It will then compare the differences in the responses to determine the average difference for each query. It does this by: + + - Ensuring the resulting label sets match. + - For each label set, ensuring they contain the same number of samples as their pair from the other backend. + - For each sample, calculates their difference against it's pair from the other backend/label set. + - Calculates the average diff per query from the above diffs. + +### Limitations + +It currently only supports queries with `Matrix` response types. + +### Use cases + +- Correctness testing when working on the read path. +- Comparing results from different backends. + +### Example Configuration + +```yaml +control: + host: http://localhost:8080/api/prom + headers: + "X-Scope-OrgID": 1234 + +test: + host: http://localhost:8081/api/prom + headers: + "X-Scope-OrgID": 1234 + +queries: + - query: 'sum(rate(container_cpu_usage_seconds_total[5m]))' + start: 2019-11-25T00:00:00Z + end: 2019-11-28T00:00:00Z + step_size: 15m + - query: 'sum(rate(container_cpu_usage_seconds_total[5m])) by (container_name)' + start: 2019-11-25T00:00:00Z + end: 2019-11-28T00:00:00Z + step_size: 15m + - query: 'sum(rate(container_cpu_usage_seconds_total[5m])) without (container_name)' + start: 2019-11-25T00:00:00Z + end: 2019-11-26T00:00:00Z + step_size: 15m + - query: 'histogram_quantile(0.9, sum(rate(cortex_cache_value_size_bytes_bucket[5m])) by (le, job))' + start: 2019-11-25T00:00:00Z + end: 2019-11-25T06:00:00Z + step_size: 15m + # two shardable legs + - query: 'sum without (instance, job) (rate(cortex_query_frontend_queue_length[5m])) or sum by (job) (rate(cortex_query_frontend_queue_length[5m]))' + start: 2019-11-25T00:00:00Z + end: 2019-11-25T06:00:00Z + step_size: 15m + # one shardable leg + - query: 'sum without (instance, job) (rate(cortex_cache_request_duration_seconds_count[5m])) or rate(cortex_cache_request_duration_seconds_count[5m])' + start: 2019-11-25T00:00:00Z + end: 2019-11-25T06:00:00Z + step_size: 15m +``` + +### Example Output + +Under ideal circumstances, you'll see output like the following: + +``` +$ go run ./tools/query-audit/ -f config.yaml + +0.000000% avg diff for: + query: sum(rate(container_cpu_usage_seconds_total[5m])) + series: 1 + samples: 289 + start: 2019-11-25 00:00:00 +0000 UTC + end: 2019-11-28 00:00:00 +0000 UTC + step: 15m0s + +0.000000% avg diff for: + query: sum(rate(container_cpu_usage_seconds_total[5m])) by (container_name) + series: 95 + samples: 25877 + start: 2019-11-25 00:00:00 +0000 UTC + end: 2019-11-28 00:00:00 +0000 UTC + step: 15m0s + +0.000000% avg diff for: + query: sum(rate(container_cpu_usage_seconds_total[5m])) without (container_name) + series: 4308 + samples: 374989 + start: 2019-11-25 00:00:00 +0000 UTC + end: 2019-11-26 00:00:00 +0000 UTC + step: 15m0s + +0.000000% avg diff for: + query: histogram_quantile(0.9, sum(rate(cortex_cache_value_size_bytes_bucket[5m])) by (le, job)) + series: 13 + samples: 325 + start: 2019-11-25 00:00:00 +0000 UTC + end: 2019-11-25 06:00:00 +0000 UTC + step: 15m0s + +0.000000% avg diff for: + query: sum without (instance, job) (rate(cortex_query_frontend_queue_length[5m])) or sum by (job) (rate(cortex_query_frontend_queue_length[5m])) + series: 21 + samples: 525 + start: 2019-11-25 00:00:00 +0000 UTC + end: 2019-11-25 06:00:00 +0000 UTC + step: 15m0s + +0.000000% avg diff for: + query: sum without (instance, job) (rate(cortex_cache_request_duration_seconds_count[5m])) or rate(cortex_cache_request_duration_seconds_count[5m]) + series: 942 + samples: 23550 + start: 2019-11-25 00:00:00 +0000 UTC + end: 2019-11-25 06:00:00 +0000 UTC + step: 15m0s + +0.000000% avg diff for: + query: sum by (namespace) (predict_linear(container_cpu_usage_seconds_total[5m], 10)) + series: 16 + samples: 400 + start: 2019-11-25 00:00:00 +0000 UTC + end: 2019-11-25 06:00:00 +0000 UTC + step: 15m0s + +0.000000% avg diff for: + query: sum by (namespace) (avg_over_time((rate(container_cpu_usage_seconds_total[5m]))[10m:]) > 1) + series: 4 + samples: 52 + start: 2019-11-25 00:00:00 +0000 UTC + end: 2019-11-25 01:00:00 +0000 UTC + step: 5m0s +``` diff --git a/go.mod b/go.mod index 3f247cd7477..35c28decf1b 100644 --- a/go.mod +++ b/go.mod @@ -72,6 +72,7 @@ require ( google.golang.org/api v0.14.0 google.golang.org/grpc v1.25.1 gopkg.in/yaml.v2 v2.2.5 + sigs.k8s.io/yaml v1.1.0 ) replace github.com/Azure/azure-sdk-for-go => github.com/Azure/azure-sdk-for-go v36.2.0+incompatible diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index a3dd5fcad19..566c0e89f55 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -429,6 +429,9 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, userID string, fro } func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) { + log, ctx := spanlogger.New(ctx, "store.lookupEntriesByQueries") + defer log.Span.Finish() + var lock sync.Mutex var entries []IndexEntry err := c.index.QueryPages(ctx, queries, func(query IndexQuery, resp ReadBatch) bool { diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index 41cd0944deb..f7f96ae9271 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -77,6 +77,8 @@ func newTestChunkStoreConfig(t require.TestingT, schemaName string, storeCfg Sto tbmConfig TableManagerConfig schemaCfg = DefaultSchemaConfig("", schemaName, 0) ) + err := schemaCfg.Validate() + require.NoError(t, err) flagext.DefaultValues(&tbmConfig) storage := NewMockStorage() tableManager, err := NewTableManager(tbmConfig, schemaCfg, maxChunkAge, storage, nil) diff --git a/pkg/chunk/chunk_store_utils.go b/pkg/chunk/chunk_store_utils.go index eb6ced986d1..27a5a84fe97 100644 --- a/pkg/chunk/chunk_store_utils.go +++ b/pkg/chunk/chunk_store_utils.go @@ -146,13 +146,13 @@ func (c *Fetcher) worker() { // FetchChunks fetches a set of chunks from cache and store. Note that the keys passed in must be // lexicographically sorted, while the returned chunks are not in the same order as the passed in chunks. func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string) ([]Chunk, error) { - log, ctx := spanlogger.New(ctx, "ChunkStore.fetchChunks") + log, ctx := spanlogger.New(ctx, "ChunkStore.FetchChunks") defer log.Span.Finish() // Now fetch the actual chunk data from Memcache / S3 cacheHits, cacheBufs, _ := c.cache.Fetch(ctx, keys) - fromCache, missing, err := c.processCacheResponse(chunks, cacheHits, cacheBufs) + fromCache, missing, err := c.processCacheResponse(ctx, chunks, cacheHits, cacheBufs) if err != nil { level.Warn(log).Log("msg", "error fetching from cache", "err", err) } @@ -199,12 +199,14 @@ func (c *Fetcher) writeBackCache(ctx context.Context, chunks []Chunk) error { // ProcessCacheResponse decodes the chunks coming back from the cache, separating // hits and misses. -func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) ([]Chunk, []Chunk, error) { +func (c *Fetcher) processCacheResponse(ctx context.Context, chunks []Chunk, keys []string, bufs [][]byte) ([]Chunk, []Chunk, error) { var ( requests = make([]decodeRequest, 0, len(keys)) responses = make(chan decodeResponse) missing []Chunk ) + log, _ := spanlogger.New(ctx, "Fetcher.processCacheResponse") + defer log.Span.Finish() i, j := 0, 0 for i < len(chunks) && j < len(keys) { @@ -229,6 +231,7 @@ func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]b for ; i < len(chunks); i++ { missing = append(missing, chunks[i]) } + level.Debug(log).Log("chunks", len(chunks), "decodeRequests", len(requests), "missing", len(missing)) go func() { for _, request := range requests { diff --git a/pkg/chunk/schema.go b/pkg/chunk/schema.go index e52a5eed115..7a9441e0d73 100644 --- a/pkg/chunk/schema.go +++ b/pkg/chunk/schema.go @@ -5,11 +5,16 @@ import ( "encoding/hex" "errors" "fmt" + "strconv" "strings" + "github.com/go-kit/kit/log/level" jsoniter "github.com/json-iterator/go" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" + + "github.com/cortexproject/cortex/pkg/querier/astmapper" + "github.com/cortexproject/cortex/pkg/util" ) const ( @@ -48,6 +53,7 @@ type Schema interface { GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]IndexQuery, error) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]IndexQuery, error) + FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery // If the query resulted in series IDs, use this method to find chunks. GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) @@ -218,6 +224,10 @@ func (s schema) GetLabelNamesForSeries(from, through model.Time, userID string, return result, nil } +func (s schema) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery { + return s.entries.FilterReadQueries(queries, shard) +} + type entries interface { GetWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) GetLabelWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) @@ -228,6 +238,7 @@ type entries interface { GetReadMetricLabelValueQueries(bucket Bucket, metricName string, labelName string, labelValue string) ([]IndexQuery, error) GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error) GetLabelNamesForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error) + FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery } // original entries: @@ -303,6 +314,10 @@ func (originalEntries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, return nil, ErrNotSupported } +func (originalEntries) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery { + return queries +} + // v3Schema went to base64 encoded label values & a version ID // - range key: