diff --git a/CHANGELOG.md b/CHANGELOG.md index 687ebe39aef..a0e76e1280c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,16 @@ ## master / unreleased +* [FEATURE] Fan out parallelizable queries to backend queriers concurrently. + * `-querier.sum-shards` (bool) + * Requires a shard-compatible schema (v10+) + * This causes the number of traces to increase accordingly. + * The query-frontend now requires a schema config to determine how/when to shard queries, either from a file or from flags (i.e. by the `config-yaml` CLI flag). This is the same schema config the queriers consume. + * It's also advised to increase downstream concurrency controls as well: + * `querier.max-outstanding-requests-per-tenant` + * `querier.max-query-parallelism` + * `querier.max-concurrent` + * `server.grpc-max-concurrent-streams` (for both query-frontends and queriers) * [BUGFIX] Fixed unnecessary CAS operations done by the HA tracker when the jitter is enabled. #1861 ## 0.4.0 / 2019-12-02 diff --git a/docs/arguments.md b/docs/arguments.md index a0dd1feabf1..92a566a9084 100644 --- a/docs/arguments.md +++ b/docs/arguments.md @@ -63,6 +63,28 @@ The ingester query API was improved over time, but defaults to the old behaviour ## Query Frontend +- `-querier.sum-shards` + + If set to true, will cause the query frontend to mutate incoming queries when possible by turning `sum` operations into sharded `sum` operations. This requires a shard-compatible schema (v10+). An abridged example: + `sum by (foo) (rate(bar{baz=”blip”}[1m]))` -> + ``` + sum by (foo) ( + sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”0of16”}[1m])) or + sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”1of16”}[1m])) or + ... + sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”15of16”}[1m])) + ) + ``` + When enabled, the query-frontend requires a schema config to determine how/when to shard queries, either from a file or from flags (i.e. by the `config-yaml` CLI flag). This is the same schema config the queriers consume. + It's also advised to increase downstream concurrency controls as well to account for more queries of smaller sizes: + + - `querier.max-outstanding-requests-per-tenant` + - `querier.max-query-parallelism` + - `querier.max-concurrent` + - `server.grpc-max-concurrent-streams` + + Instrumentation (traces) also scale with the number of sharded queries and it's suggested to account for increased throughput there as well. + - `-querier.align-querier-with-step` If set to true, will cause the query frontend to mutate incoming queries and align their start and end parameters to the step parameter of the query. This improves the cacheability of the query results. diff --git a/go.mod b/go.mod index 755d4d040d1..e376e7a3142 100644 --- a/go.mod +++ b/go.mod @@ -79,6 +79,7 @@ require ( google.golang.org/api v0.11.0 google.golang.org/grpc v1.25.1 gopkg.in/yaml.v2 v2.2.2 + sigs.k8s.io/yaml v1.1.0 ) // Override since git.apache.org is down. The docs say to fetch from github. diff --git a/pkg/chunk/chunk_store.go b/pkg/chunk/chunk_store.go index da7e6eec604..9ab01275b53 100644 --- a/pkg/chunk/chunk_store.go +++ b/pkg/chunk/chunk_store.go @@ -436,6 +436,9 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, userID string, fro } func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) { + log, ctx := spanlogger.New(ctx, "store.lookupEntriesByQueries") + defer log.Span.Finish() + var lock sync.Mutex var entries []IndexEntry err := c.index.QueryPages(ctx, queries, func(query IndexQuery, resp ReadBatch) bool { @@ -459,6 +462,9 @@ func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery } func (c *store) parseIndexEntries(ctx context.Context, entries []IndexEntry, matcher *labels.Matcher) ([]string, error) { + log, ctx := spanlogger.New(ctx, "store.parseIndexEntries") + defer log.Span.Finish() + result := make([]string, 0, len(entries)) for _, entry := range entries { chunkKey, labelValue, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value) diff --git a/pkg/chunk/chunk_store_test.go b/pkg/chunk/chunk_store_test.go index 80632775a26..f4be5ab5f4a 100644 --- a/pkg/chunk/chunk_store_test.go +++ b/pkg/chunk/chunk_store_test.go @@ -77,6 +77,8 @@ func newTestChunkStoreConfig(t require.TestingT, schemaName string, storeCfg Sto tbmConfig TableManagerConfig schemaCfg = DefaultSchemaConfig("", schemaName, 0) ) + err := schemaCfg.Validate() + require.NoError(t, err) flagext.DefaultValues(&tbmConfig) storage := NewMockStorage() tableManager, err := NewTableManager(tbmConfig, schemaCfg, maxChunkAge, storage, nil) diff --git a/pkg/chunk/chunk_store_utils.go b/pkg/chunk/chunk_store_utils.go index 856af86be18..f5511293696 100644 --- a/pkg/chunk/chunk_store_utils.go +++ b/pkg/chunk/chunk_store_utils.go @@ -10,6 +10,7 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/cortexproject/cortex/pkg/chunk/cache" + "github.com/cortexproject/cortex/pkg/querier/astmapper" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/spanlogger" ) @@ -146,13 +147,13 @@ func (c *Fetcher) worker() { // FetchChunks fetches a set of chunks from cache and store. Note that the keys passed in must be // lexicographically sorted, while the returned chunks are not in the same order as the passed in chunks. func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string) ([]Chunk, error) { - log, ctx := spanlogger.New(ctx, "ChunkStore.fetchChunks") + log, ctx := spanlogger.New(ctx, "ChunkStore.FetchChunks") defer log.Span.Finish() // Now fetch the actual chunk data from Memcache / S3 cacheHits, cacheBufs, _ := c.cache.Fetch(ctx, keys) - fromCache, missing, err := c.processCacheResponse(chunks, cacheHits, cacheBufs) + fromCache, missing, err := c.processCacheResponse(ctx, chunks, cacheHits, cacheBufs) if err != nil { level.Warn(log).Log("msg", "error fetching from cache", "err", err) } @@ -199,12 +200,14 @@ func (c *Fetcher) writeBackCache(ctx context.Context, chunks []Chunk) error { // ProcessCacheResponse decodes the chunks coming back from the cache, separating // hits and misses. -func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) ([]Chunk, []Chunk, error) { +func (c *Fetcher) processCacheResponse(ctx context.Context, chunks []Chunk, keys []string, bufs [][]byte) ([]Chunk, []Chunk, error) { var ( requests = make([]decodeRequest, 0, len(keys)) responses = make(chan decodeResponse) missing []Chunk ) + log, ctx := spanlogger.New(ctx, "Fetcher.processCacheResponse") + defer log.Span.Finish() i, j := 0, 0 for i < len(chunks) && j < len(keys) { @@ -229,6 +232,7 @@ func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]b for ; i < len(chunks); i++ { missing = append(missing, chunks[i]) } + level.Debug(log).Log("chunks", len(chunks), "decodeRequests", len(requests), "missing", len(missing)) go func() { for _, request := range requests { @@ -252,3 +256,10 @@ func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]b } return found, missing, err } + +func injectShardLabels(chunks []Chunk, shard astmapper.ShardAnnotation) { + for i, chunk := range chunks { + chunk.Metric = append(chunk.Metric, shard.Label()) + chunks[i] = chunk + } +} diff --git a/pkg/chunk/schema.go b/pkg/chunk/schema.go index ce69abf8ed7..111373c5a0d 100644 --- a/pkg/chunk/schema.go +++ b/pkg/chunk/schema.go @@ -7,7 +7,11 @@ import ( "fmt" "strings" + "strconv" + jsoniter "github.com/json-iterator/go" + + "github.com/cortexproject/cortex/pkg/querier/astmapper" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" ) @@ -46,6 +50,7 @@ type Schema interface { GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]IndexQuery, error) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]IndexQuery, error) + FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery // If the query resulted in series IDs, use this method to find chunks. GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) @@ -114,7 +119,7 @@ func (s schema) GetCacheKeysAndLabelWriteEntries(from, through model.Time, userI key := strings.Join([]string{ bucket.tableName, bucket.hashKey, - string(labelsSeriesID(labels)), + string(LabelsSeriesID(labels)), }, "-", ) @@ -216,6 +221,10 @@ func (s schema) GetLabelNamesForSeries(from, through model.Time, userID string, return result, nil } +func (s schema) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery { + return s.entries.FilterReadQueries(queries, shard) +} + type entries interface { GetWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) GetLabelWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) @@ -226,13 +235,23 @@ type entries interface { GetReadMetricLabelValueQueries(bucket Bucket, metricName string, labelName string, labelValue string) ([]IndexQuery, error) GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error) GetLabelNamesForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error) + FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery +} + +// noops is a placeholder which can be embedded to provide default implementations +type noops struct{} + +func (n noops) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery { + return queries } // original entries: // - hash key: :: // - range key: