From de8f33023e156774f8be237a4dee0f1672c615cd Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Thu, 16 Jan 2020 18:22:39 -0500 Subject: [PATCH 01/25] querier.sum-shards Signed-off-by: Owen Diehl --- CHANGELOG.md | 11 + docs/configuration/arguments.md | 22 + docs/configuration/config-file-reference.md | 4 + go.mod | 1 + pkg/chunk/chunk_store.go | 3 + pkg/chunk/chunk_store_test.go | 2 + pkg/chunk/chunk_store_utils.go | 17 +- pkg/chunk/schema.go | 63 +- pkg/chunk/schema_config.go | 20 +- pkg/chunk/schema_test.go | 66 ++ pkg/chunk/schema_util.go | 3 +- pkg/chunk/schema_util_test.go | 2 +- pkg/chunk/series_store.go | 31 +- pkg/chunk/storage/caching_index_client.go | 3 + pkg/cortex/modules.go | 29 +- pkg/ingester/ingester.go | 2 + pkg/querier/astmapper/astmapper.go | 187 ++++++ pkg/querier/astmapper/astmapper_test.go | 102 +++ pkg/querier/astmapper/embedded.go | 120 ++++ pkg/querier/astmapper/parallel.go | 92 +++ pkg/querier/astmapper/parallel_test.go | 119 ++++ pkg/querier/astmapper/shard_summer.go | 293 ++++++++ pkg/querier/astmapper/shard_summer_test.go | 256 +++++++ pkg/querier/astmapper/subtree_folder.go | 102 +++ pkg/querier/astmapper/subtree_folder_test.go | 114 ++++ pkg/querier/chunk_store_queryable.go | 8 +- pkg/querier/chunk_tar_test.go | 8 +- pkg/querier/chunks_handler.go | 3 +- pkg/querier/chunkstore/chunkstore.go | 15 + pkg/querier/distributor_queryable.go | 3 +- pkg/querier/ingester_streaming_queryable.go | 3 +- .../lazyquery.go} | 47 +- pkg/querier/matrix.go | 5 +- pkg/querier/querier.go | 16 +- pkg/querier/queryrange/promql_test.go | 612 +++++++++++++++++ pkg/querier/queryrange/query_range.go | 16 +- pkg/querier/queryrange/query_range_test.go | 12 +- pkg/querier/queryrange/queryable.go | 120 ++++ pkg/querier/queryrange/queryable_test.go | 260 +++++++ pkg/querier/queryrange/querysharding.go | 311 +++++++++ pkg/querier/queryrange/querysharding_test.go | 633 ++++++++++++++++++ pkg/querier/queryrange/results_cache.go | 2 +- pkg/querier/queryrange/results_cache_test.go | 4 +- pkg/querier/queryrange/roundtrip.go | 38 +- pkg/querier/queryrange/roundtrip_test.go | 37 +- pkg/querier/queryrange/series.go | 55 ++ pkg/querier/queryrange/series_test.go | 75 +++ pkg/querier/queryrange/test_utils.go | 186 +++++ pkg/querier/queryrange/test_utils_test.go | 134 ++++ pkg/querier/queryrange/value.go | 73 ++ pkg/querier/queryrange/value_test.go | 167 +++++ pkg/querier/remote_read_test.go | 4 +- pkg/querier/{ => series}/series_set.go | 61 +- pkg/querier/{ => series}/series_set_test.go | 10 +- pkg/querier/unified_querier.go | 7 +- tools/query-audit/auditor.go | 92 +++ tools/query-audit/config.go | 67 ++ tools/query-audit/example-config.yaml | 37 + tools/query-audit/main.go | 87 +++ tools/query-audit/runner.go | 32 + 60 files changed, 4806 insertions(+), 98 deletions(-) create mode 100644 pkg/querier/astmapper/astmapper.go create mode 100644 pkg/querier/astmapper/astmapper_test.go create mode 100644 pkg/querier/astmapper/embedded.go create mode 100644 pkg/querier/astmapper/parallel.go create mode 100644 pkg/querier/astmapper/parallel_test.go create mode 100644 pkg/querier/astmapper/shard_summer.go create mode 100644 pkg/querier/astmapper/shard_summer_test.go create mode 100644 pkg/querier/astmapper/subtree_folder.go create mode 100644 pkg/querier/astmapper/subtree_folder_test.go create mode 100644 pkg/querier/chunkstore/chunkstore.go rename pkg/querier/{lazy_querier.go => lazyquery/lazyquery.go} (61%) create mode 100644 pkg/querier/queryrange/promql_test.go create mode 100644 pkg/querier/queryrange/queryable.go create mode 100644 pkg/querier/queryrange/queryable_test.go create mode 100644 pkg/querier/queryrange/querysharding.go create mode 100644 pkg/querier/queryrange/querysharding_test.go create mode 100644 pkg/querier/queryrange/series.go create mode 100644 pkg/querier/queryrange/series_test.go create mode 100644 pkg/querier/queryrange/test_utils.go create mode 100644 pkg/querier/queryrange/test_utils_test.go create mode 100644 pkg/querier/queryrange/value.go create mode 100644 pkg/querier/queryrange/value_test.go rename pkg/querier/{ => series}/series_set.go (64%) rename pkg/querier/{ => series}/series_set_test.go (88%) create mode 100644 tools/query-audit/auditor.go create mode 100644 tools/query-audit/config.go create mode 100644 tools/query-audit/example-config.yaml create mode 100644 tools/query-audit/main.go create mode 100644 tools/query-audit/runner.go diff --git a/CHANGELOG.md b/CHANGELOG.md index cbe944e7c8a..731cdb0621e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -44,6 +44,17 @@ Note that the ruler flags need to be changed in this upgrade. You're moving from Further, if you're using the configs service, we've upgraded the migration library and this requires some manual intervention. See full instructions below to upgrade your PostgreSQL. * [CHANGE] The frontend component now does not cache results if it finds a `Cache-Control` header and if one of its values is `no-store`. #1974 +* [FEATURE] Fan out parallelizable queries to backend queriers concurrently. + * `-querier.sum-shards` (bool) + * Requires a shard-compatible schema (v10+) + * This causes the number of traces to increase accordingly. + * The query-frontend now requires a schema config to determine how/when to shard queries, either from a file or from flags (i.e. by the `config-yaml` CLI flag). This is the same schema config the queriers consume. + * It's also advised to increase downstream concurrency controls as well: + * `querier.max-outstanding-requests-per-tenant` + * `querier.max-query-parallelism` + * `querier.max-concurrent` + * `server.grpc-max-concurrent-streams` (for both query-frontends and queriers) +* [ENHANCEMENT] metric `cortex_ingester_flush_reasons` gets a new `reason` value: `Spread`, when `-ingester.spread-flushes` option is enabled. * [CHANGE] Flags changed with transition to upstream Prometheus rules manager: * `-ruler.client-timeout` is now `ruler.configs.client-timeout` in order to match `ruler.configs.url`. * `-ruler.group-timeout`has been removed. diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index 27633155677..f9f267050e3 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -68,6 +68,28 @@ The ingester query API was improved over time, but defaults to the old behaviour ## Query Frontend +- `-querier.sum-shards` + + If set to true, will cause the query frontend to mutate incoming queries when possible by turning `sum` operations into sharded `sum` operations. This requires a shard-compatible schema (v10+). An abridged example: + `sum by (foo) (rate(bar{baz=”blip”}[1m]))` -> + ``` + sum by (foo) ( + sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”0of16”}[1m])) or + sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”1of16”}[1m])) or + ... + sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”15of16”}[1m])) + ) + ``` + When enabled, the query-frontend requires a schema config to determine how/when to shard queries, either from a file or from flags (i.e. by the `config-yaml` CLI flag). This is the same schema config the queriers consume. + It's also advised to increase downstream concurrency controls as well to account for more queries of smaller sizes: + + - `querier.max-outstanding-requests-per-tenant` + - `querier.max-query-parallelism` + - `querier.max-concurrent` + Furthermore, both querier and query-frontend components require the `querier.query-ingesters-within` parameter to know when to start sharding requests (ingester queries are not sharded). It's recommended to align this with `ingester.max-chunk-age`. + + Instrumentation (traces) also scale with the number of sharded queries and it's suggested to account for increased throughput there as well. + - `-querier.align-querier-with-step` If set to true, will cause the query frontend to mutate incoming queries and align their start and end parameters to the step parameter of the query. This improves the cacheability of the query results. diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 3faf24c5b97..7ecfe25a5aa 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -654,6 +654,10 @@ results_cache: # error is returned. # CLI flag: -querier.max-retries-per-request [max_retries: | default = 5] + +# Parse the ast and parallelize sums by shard. +# CLI flag: -querier.sum-shards +[sum_shards: | default = false] ``` ## `ruler_config` diff --git a/go.mod b/go.mod index 6df0f778e8c..b25c9de868c 100644 --- a/go.mod +++ b/go.mod @@ -72,6 +72,7 @@ require ( google.golang.org/api v0.14.0 google.golang.org/grpc v1.25.1 gopkg.in/yaml.v2 v2.2.5 + sigs.k8s.io/yaml v1.1.0 ) replace github.com/Azure/azure-sdk-for-go => github.com/Azure/azure-sdk-for-go v36.2.0+incompatible diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index a48b842ce63..a1481d22eee 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -430,6 +430,9 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, userID string, fro } func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) { + log, ctx := spanlogger.New(ctx, "store.lookupEntriesByQueries") + defer log.Span.Finish() + var lock sync.Mutex var entries []IndexEntry err := c.index.QueryPages(ctx, queries, func(query IndexQuery, resp ReadBatch) bool { diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index b6b91de93aa..2e9cd63b58b 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -78,6 +78,8 @@ func newTestChunkStoreConfig(t require.TestingT, schemaName string, storeCfg Sto tbmConfig TableManagerConfig schemaCfg = DefaultSchemaConfig("", schemaName, 0) ) + err := schemaCfg.Validate() + require.NoError(t, err) flagext.DefaultValues(&tbmConfig) storage := NewMockStorage() tableManager, err := NewTableManager(tbmConfig, schemaCfg, maxChunkAge, storage, nil) diff --git a/pkg/chunk/chunk_store_utils.go b/pkg/chunk/chunk_store_utils.go index 114e2b30106..89fac15dd93 100644 --- a/pkg/chunk/chunk_store_utils.go +++ b/pkg/chunk/chunk_store_utils.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/cortexproject/cortex/pkg/chunk/cache" + "github.com/cortexproject/cortex/pkg/querier/astmapper" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/spanlogger" ) @@ -146,13 +147,13 @@ func (c *Fetcher) worker() { // FetchChunks fetches a set of chunks from cache and store. Note that the keys passed in must be // lexicographically sorted, while the returned chunks are not in the same order as the passed in chunks. func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string) ([]Chunk, error) { - log, ctx := spanlogger.New(ctx, "ChunkStore.fetchChunks") + log, ctx := spanlogger.New(ctx, "ChunkStore.FetchChunks") defer log.Span.Finish() // Now fetch the actual chunk data from Memcache / S3 cacheHits, cacheBufs, _ := c.cache.Fetch(ctx, keys) - fromCache, missing, err := c.processCacheResponse(chunks, cacheHits, cacheBufs) + fromCache, missing, err := c.processCacheResponse(ctx, chunks, cacheHits, cacheBufs) if err != nil { level.Warn(log).Log("msg", "error fetching from cache", "err", err) } @@ -199,12 +200,14 @@ func (c *Fetcher) writeBackCache(ctx context.Context, chunks []Chunk) error { // ProcessCacheResponse decodes the chunks coming back from the cache, separating // hits and misses. -func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) ([]Chunk, []Chunk, error) { +func (c *Fetcher) processCacheResponse(ctx context.Context, chunks []Chunk, keys []string, bufs [][]byte) ([]Chunk, []Chunk, error) { var ( requests = make([]decodeRequest, 0, len(keys)) responses = make(chan decodeResponse) missing []Chunk ) + log, _ := spanlogger.New(ctx, "Fetcher.processCacheResponse") + defer log.Span.Finish() i, j := 0, 0 for i < len(chunks) && j < len(keys) { @@ -229,6 +232,7 @@ func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]b for ; i < len(chunks); i++ { missing = append(missing, chunks[i]) } + level.Debug(log).Log("chunks", len(chunks), "decodeRequests", len(requests), "missing", len(missing)) go func() { for _, request := range requests { @@ -252,3 +256,10 @@ func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]b } return found, missing, err } + +func injectShardLabels(chunks []Chunk, shard astmapper.ShardAnnotation) { + for i, chunk := range chunks { + chunk.Metric = append(chunk.Metric, shard.Label()) + chunks[i] = chunk + } +} diff --git a/pkg/chunk/schema.go b/pkg/chunk/schema.go index e52a5eed115..3ef4506a7d1 100644 --- a/pkg/chunk/schema.go +++ b/pkg/chunk/schema.go @@ -7,7 +7,11 @@ import ( "fmt" "strings" + "strconv" + jsoniter "github.com/json-iterator/go" + + "github.com/cortexproject/cortex/pkg/querier/astmapper" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" ) @@ -48,6 +52,7 @@ type Schema interface { GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]IndexQuery, error) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]IndexQuery, error) + FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery // If the query resulted in series IDs, use this method to find chunks. GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) @@ -116,7 +121,7 @@ func (s schema) GetCacheKeysAndLabelWriteEntries(from, through model.Time, userI key := strings.Join([]string{ bucket.tableName, bucket.hashKey, - string(labelsSeriesID(labels)), + string(LabelsSeriesID(labels)), }, "-", ) @@ -218,6 +223,10 @@ func (s schema) GetLabelNamesForSeries(from, through model.Time, userID string, return result, nil } +func (s schema) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery { + return s.entries.FilterReadQueries(queries, shard) +} + type entries interface { GetWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) GetLabelWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) @@ -228,13 +237,23 @@ type entries interface { GetReadMetricLabelValueQueries(bucket Bucket, metricName string, labelName string, labelValue string) ([]IndexQuery, error) GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error) GetLabelNamesForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error) + FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery +} + +// noops is a placeholder which can be embedded to provide default implementations +type noops struct{} + +func (n noops) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery { + return queries } // original entries: // - hash key: :: // - range key: