Skip to content

Commit dbf9266

Browse files
authored
Distributing sum queries (#1878)
* querier.sum-shards Signed-off-by: Owen Diehl <[email protected]> * addresses pr comments Signed-off-by: Owen Diehl <[email protected]> * instruments frontend sharding, splitby Signed-off-by: Owen Diehl <[email protected]> * LabelsSeriesID unexported again Signed-off-by: Owen Diehl <[email protected]> * removes unnecessary codec interface in astmapping Signed-off-by: Owen Diehl <[email protected]> * simplifies VectorSquasher as we never use matrices Signed-off-by: Owen Diehl <[email protected]> * combines queryrange series & value files Signed-off-by: Owen Diehl <[email protected]> * removes noops struct embedding strategy in schema, provides noop impls on all schemas instead Signed-off-by: Owen Diehl <[email protected]> * NewSubtreeFolder no longer can return an error as it inlines the jsonCodec Signed-off-by: Owen Diehl <[email protected]> * account for QueryIngestersWithin renaming Signed-off-by: Owen Diehl <[email protected]> * fixes rebase import collision Signed-off-by: Owen Diehl <[email protected]> * fixes rebase conflicts Signed-off-by: Owen Diehl <[email protected]> * -marks absent as non parallelizable Signed-off-by: Owen Diehl <[email protected]> * upstream promql compatibility changes Signed-off-by: Owen Diehl <[email protected]> * addresses pr comments Signed-off-by: Owen Diehl <[email protected]> * import collisions Signed-off-by: Owen Diehl <[email protected]> * linting - fixes goimports -local requirement Signed-off-by: Owen Diehl <[email protected]> * fixes merge conflicts Signed-off-by: Owen Diehl <[email protected]> * addresses pr comments Signed-off-by: Owen Diehl <[email protected]> * stylistic changes Signed-off-by: Owen Diehl <[email protected]> * s/downstream/sharded/ Signed-off-by: Owen Diehl <[email protected]> * s/sum_shards/parallelise_shardable_queries/ Signed-off-by: Owen Diehl <[email protected]> * query-audit docs Signed-off-by: Owen Diehl <[email protected]> * notes sharded parallelizations are only supported by chunk store Signed-off-by: Owen Diehl <[email protected]> * doc suggestions Signed-off-by: Owen Diehl <[email protected]>
1 parent c63a4f2 commit dbf9266

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+5043
-172
lines changed

CHANGELOG.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,16 @@
22

33
## master / unreleased
44

5+
* [FEATURE] Fan out parallelizable queries to backend queriers concurrently. #1878
6+
* `querier.parallelise-shardable-queries` (bool)
7+
* Requires a shard-compatible schema (v10+)
8+
* This causes the number of traces to increase accordingly.
9+
* The query-frontend now requires a schema config to determine how/when to shard queries, either from a file or from flags (i.e. by the `config-yaml` CLI flag). This is the same schema config the queriers consume. The schema is only required to use this option.
10+
* It's also advised to increase downstream concurrency controls as well:
11+
* `querier.max-outstanding-requests-per-tenant`
12+
* `querier.max-query-parallelism`
13+
* `querier.max-concurrent`
14+
* `server.grpc-max-concurrent-streams` (for both query-frontends and queriers)
515
* [CHANGE] The frontend http server will now send 502 in case of deadline exceeded and 499 if the user requested cancellation. #2156
616
* [CHANGE] Config file changed to remove top level `config_store` field in favor of a nested `configdb` field. #2125
717
* [CHANGE] Removed unnecessary `frontend.cache-split-interval` in favor of `querier.split-queries-by-interval` both to reduce configuration complexity and guarantee alignment of these two configs. Starting from now, `-querier.cache-results` may only be enabled in conjunction with `-querier.split-queries-by-interval` (previously the cache interval default was `24h` so if you want to preserve the same behaviour you should set `-querier.split-queries-by-interval=24h`). #2040

docs/configuration/arguments.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,30 @@ The ingester query API was improved over time, but defaults to the old behaviour
6868

6969
## Query Frontend
7070

71+
- `-querier.parallelise-shardable-queries`
72+
73+
If set to true, will cause the query frontend to mutate incoming queries when possible by turning `sum` operations into sharded `sum` operations. This requires a shard-compatible schema (v10+). An abridged example:
74+
`sum by (foo) (rate(bar{baz=”blip”}[1m]))` ->
75+
```
76+
sum by (foo) (
77+
sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”0of16”}[1m])) or
78+
sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”1of16”}[1m])) or
79+
...
80+
sum by (foo) (rate(bar{baz=”blip”,__cortex_shard__=”15of16”}[1m]))
81+
)
82+
```
83+
When enabled, the query-frontend requires a schema config to determine how/when to shard queries, either from a file or from flags (i.e. by the `config-yaml` CLI flag). This is the same schema config the queriers consume.
84+
It's also advised to increase downstream concurrency controls as well to account for more queries of smaller sizes:
85+
86+
- `querier.max-outstanding-requests-per-tenant`
87+
- `querier.max-query-parallelism`
88+
- `querier.max-concurrent`
89+
- `server.grpc-max-concurrent-streams` (for both query-frontends and queriers)
90+
91+
Furthermore, both querier and query-frontend components require the `querier.query-ingesters-within` parameter to know when to start sharding requests (ingester queries are not sharded). It's recommended to align this with `ingester.max-chunk-age`.
92+
93+
Instrumentation (traces) also scale with the number of sharded queries and it's suggested to account for increased throughput there as well (for instance via `JAEGER_REPORTER_MAX_QUEUE_SIZE`).
94+
7195
- `-querier.align-querier-with-step`
7296

7397
If set to true, will cause the query frontend to mutate incoming queries and align their start and end parameters to the step parameter of the query. This improves the cacheability of the query results.

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,11 @@ results_cache:
664664
# error is returned.
665665
# CLI flag: -querier.max-retries-per-request
666666
[max_retries: <int> | default = 5]
667+
668+
# Perform query parallelisations based on storage sharding configuration and
669+
# query ASTs. This feature is supported only by the chunks storage engine.
670+
# CLI flag: -querier.parallelise-shardable-queries
671+
[parallelise_shardable_queries: <boolean> | default = false]
667672
```
668673

669674
## `ruler_config`

docs/operations/query-auditor.md

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
---
2+
title: "Query Auditor (tool)"
3+
linkTitle: "Query Auditor (tool)"
4+
weight: 2
5+
slug: query-auditor
6+
---
7+
8+
The query auditor is a tool bundled in the Cortex repository, but **not** included in Docker images -- this must be built from source. It's primarily useful for those _developing_ Cortex, but can be helpful to operators as well during certain scenarios (backend migrations come to mind).
9+
10+
## How it works
11+
12+
The `query-audit` tool performs a set of queries against two backends that expose the Prometheus read API. This is generally the `query-frontend` component of two Cortex deployments. It will then compare the differences in the responses to determine the average difference for each query. It does this by:
13+
14+
- Ensuring the resulting label sets match.
15+
- For each label set, ensuring they contain the same number of samples as their pair from the other backend.
16+
- For each sample, calculates their difference against it's pair from the other backend/label set.
17+
- Calculates the average diff per query from the above diffs.
18+
19+
### Limitations
20+
21+
It currently only supports queries with `Matrix` response types.
22+
23+
### Use cases
24+
25+
- Correctness testing when working on the read path.
26+
- Comparing results from different backends.
27+
28+
### Example Configuration
29+
30+
```yaml
31+
control:
32+
host: http://localhost:8080/api/prom
33+
headers:
34+
"X-Scope-OrgID": 1234
35+
36+
test:
37+
host: http://localhost:8081/api/prom
38+
headers:
39+
"X-Scope-OrgID": 1234
40+
41+
queries:
42+
- query: 'sum(rate(container_cpu_usage_seconds_total[5m]))'
43+
start: 2019-11-25T00:00:00Z
44+
end: 2019-11-28T00:00:00Z
45+
step_size: 15m
46+
- query: 'sum(rate(container_cpu_usage_seconds_total[5m])) by (container_name)'
47+
start: 2019-11-25T00:00:00Z
48+
end: 2019-11-28T00:00:00Z
49+
step_size: 15m
50+
- query: 'sum(rate(container_cpu_usage_seconds_total[5m])) without (container_name)'
51+
start: 2019-11-25T00:00:00Z
52+
end: 2019-11-26T00:00:00Z
53+
step_size: 15m
54+
- query: 'histogram_quantile(0.9, sum(rate(cortex_cache_value_size_bytes_bucket[5m])) by (le, job))'
55+
start: 2019-11-25T00:00:00Z
56+
end: 2019-11-25T06:00:00Z
57+
step_size: 15m
58+
# two shardable legs
59+
- query: 'sum without (instance, job) (rate(cortex_query_frontend_queue_length[5m])) or sum by (job) (rate(cortex_query_frontend_queue_length[5m]))'
60+
start: 2019-11-25T00:00:00Z
61+
end: 2019-11-25T06:00:00Z
62+
step_size: 15m
63+
# one shardable leg
64+
- query: 'sum without (instance, job) (rate(cortex_cache_request_duration_seconds_count[5m])) or rate(cortex_cache_request_duration_seconds_count[5m])'
65+
start: 2019-11-25T00:00:00Z
66+
end: 2019-11-25T06:00:00Z
67+
step_size: 15m
68+
```
69+
70+
### Example Output
71+
72+
Under ideal circumstances, you'll see output like the following:
73+
74+
```
75+
$ go run ./tools/query-audit/ -f config.yaml
76+
77+
0.000000% avg diff for:
78+
query: sum(rate(container_cpu_usage_seconds_total[5m]))
79+
series: 1
80+
samples: 289
81+
start: 2019-11-25 00:00:00 +0000 UTC
82+
end: 2019-11-28 00:00:00 +0000 UTC
83+
step: 15m0s
84+
85+
0.000000% avg diff for:
86+
query: sum(rate(container_cpu_usage_seconds_total[5m])) by (container_name)
87+
series: 95
88+
samples: 25877
89+
start: 2019-11-25 00:00:00 +0000 UTC
90+
end: 2019-11-28 00:00:00 +0000 UTC
91+
step: 15m0s
92+
93+
0.000000% avg diff for:
94+
query: sum(rate(container_cpu_usage_seconds_total[5m])) without (container_name)
95+
series: 4308
96+
samples: 374989
97+
start: 2019-11-25 00:00:00 +0000 UTC
98+
end: 2019-11-26 00:00:00 +0000 UTC
99+
step: 15m0s
100+
101+
0.000000% avg diff for:
102+
query: histogram_quantile(0.9, sum(rate(cortex_cache_value_size_bytes_bucket[5m])) by (le, job))
103+
series: 13
104+
samples: 325
105+
start: 2019-11-25 00:00:00 +0000 UTC
106+
end: 2019-11-25 06:00:00 +0000 UTC
107+
step: 15m0s
108+
109+
0.000000% avg diff for:
110+
query: sum without (instance, job) (rate(cortex_query_frontend_queue_length[5m])) or sum by (job) (rate(cortex_query_frontend_queue_length[5m]))
111+
series: 21
112+
samples: 525
113+
start: 2019-11-25 00:00:00 +0000 UTC
114+
end: 2019-11-25 06:00:00 +0000 UTC
115+
step: 15m0s
116+
117+
0.000000% avg diff for:
118+
query: sum without (instance, job) (rate(cortex_cache_request_duration_seconds_count[5m])) or rate(cortex_cache_request_duration_seconds_count[5m])
119+
series: 942
120+
samples: 23550
121+
start: 2019-11-25 00:00:00 +0000 UTC
122+
end: 2019-11-25 06:00:00 +0000 UTC
123+
step: 15m0s
124+
125+
0.000000% avg diff for:
126+
query: sum by (namespace) (predict_linear(container_cpu_usage_seconds_total[5m], 10))
127+
series: 16
128+
samples: 400
129+
start: 2019-11-25 00:00:00 +0000 UTC
130+
end: 2019-11-25 06:00:00 +0000 UTC
131+
step: 15m0s
132+
133+
0.000000% avg diff for:
134+
query: sum by (namespace) (avg_over_time((rate(container_cpu_usage_seconds_total[5m]))[10m:]) > 1)
135+
series: 4
136+
samples: 52
137+
start: 2019-11-25 00:00:00 +0000 UTC
138+
end: 2019-11-25 01:00:00 +0000 UTC
139+
step: 5m0s
140+
```

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ require (
7272
google.golang.org/api v0.14.0
7373
google.golang.org/grpc v1.25.1
7474
gopkg.in/yaml.v2 v2.2.5
75+
sigs.k8s.io/yaml v1.1.0
7576
)
7677

7778
replace github.com/Azure/azure-sdk-for-go => github.com/Azure/azure-sdk-for-go v36.2.0+incompatible

pkg/chunk/chunk_store.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,9 @@ func (c *store) lookupChunksByMetricName(ctx context.Context, userID string, fro
429429
}
430430

431431
func (c *store) lookupEntriesByQueries(ctx context.Context, queries []IndexQuery) ([]IndexEntry, error) {
432+
log, ctx := spanlogger.New(ctx, "store.lookupEntriesByQueries")
433+
defer log.Span.Finish()
434+
432435
var lock sync.Mutex
433436
var entries []IndexEntry
434437
err := c.index.QueryPages(ctx, queries, func(query IndexQuery, resp ReadBatch) bool {

pkg/chunk/chunk_store_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ func newTestChunkStoreConfig(t require.TestingT, schemaName string, storeCfg Sto
7777
tbmConfig TableManagerConfig
7878
schemaCfg = DefaultSchemaConfig("", schemaName, 0)
7979
)
80+
err := schemaCfg.Validate()
81+
require.NoError(t, err)
8082
flagext.DefaultValues(&tbmConfig)
8183
storage := NewMockStorage()
8284
tableManager, err := NewTableManager(tbmConfig, schemaCfg, maxChunkAge, storage, nil)

pkg/chunk/chunk_store_utils.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,13 +146,13 @@ func (c *Fetcher) worker() {
146146
// FetchChunks fetches a set of chunks from cache and store. Note that the keys passed in must be
147147
// lexicographically sorted, while the returned chunks are not in the same order as the passed in chunks.
148148
func (c *Fetcher) FetchChunks(ctx context.Context, chunks []Chunk, keys []string) ([]Chunk, error) {
149-
log, ctx := spanlogger.New(ctx, "ChunkStore.fetchChunks")
149+
log, ctx := spanlogger.New(ctx, "ChunkStore.FetchChunks")
150150
defer log.Span.Finish()
151151

152152
// Now fetch the actual chunk data from Memcache / S3
153153
cacheHits, cacheBufs, _ := c.cache.Fetch(ctx, keys)
154154

155-
fromCache, missing, err := c.processCacheResponse(chunks, cacheHits, cacheBufs)
155+
fromCache, missing, err := c.processCacheResponse(ctx, chunks, cacheHits, cacheBufs)
156156
if err != nil {
157157
level.Warn(log).Log("msg", "error fetching from cache", "err", err)
158158
}
@@ -199,12 +199,14 @@ func (c *Fetcher) writeBackCache(ctx context.Context, chunks []Chunk) error {
199199

200200
// ProcessCacheResponse decodes the chunks coming back from the cache, separating
201201
// hits and misses.
202-
func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]byte) ([]Chunk, []Chunk, error) {
202+
func (c *Fetcher) processCacheResponse(ctx context.Context, chunks []Chunk, keys []string, bufs [][]byte) ([]Chunk, []Chunk, error) {
203203
var (
204204
requests = make([]decodeRequest, 0, len(keys))
205205
responses = make(chan decodeResponse)
206206
missing []Chunk
207207
)
208+
log, _ := spanlogger.New(ctx, "Fetcher.processCacheResponse")
209+
defer log.Span.Finish()
208210

209211
i, j := 0, 0
210212
for i < len(chunks) && j < len(keys) {
@@ -229,6 +231,7 @@ func (c *Fetcher) processCacheResponse(chunks []Chunk, keys []string, bufs [][]b
229231
for ; i < len(chunks); i++ {
230232
missing = append(missing, chunks[i])
231233
}
234+
level.Debug(log).Log("chunks", len(chunks), "decodeRequests", len(requests), "missing", len(missing))
232235

233236
go func() {
234237
for _, request := range requests {

0 commit comments

Comments
 (0)