Skip to content

Commit 93151d7

Browse files
author
Thor
committed
querier: move chunk stores MinChunkAge to QueryStoreAfter
Signed-off-by: Thor <[email protected]>
1 parent c7c88cc commit 93151d7

File tree

8 files changed

+197
-67
lines changed

8 files changed

+197
-67
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
## master / unreleased
44

55
* [CHANGE] Removed remaining support for using denormalised tokens in the ring. If you're still running ingesters with denormalised tokens (Cortex 0.4 or earlier, with `-ingester.normalise-tokens=false`), such ingesters will now be completely invisible to distributors and need to be either switched to Cortex 0.6.0 or later, or be configured to use normalised tokens. #2034
6+
* [CHANGE] Moved `--store.min-chunk-age` to the Querier config as `--querier.query-store-after`, allowing the store to be skipped during query time if the metrics wouldn't be found. The YAML config option `ingestermaxquerylookback` has been renamed to `query_ingesters_within` to match its CLI flag. #1893
7+
* `--store.min-chunk-age` has been removed
8+
* `--querier.query-store-after` has been added in it's place.
69
* [ENHANCEMENT] Experimental TSDB: Export TSDB Syncer metrics from Compactor component, they are prefixed with `cortex_compactor_`. #2023
710
* [ENHANCEMENT] Experimental TSDB: Added dedicated flag `-experimental.tsdb.bucket-store.tenant-sync-concurrency` to configure the maximum number of concurrent tenants for which blocks are synched. #2026
811
* [ENHANCEMENT] Experimental TSDB: Expose metrics for objstore operations (prefixed with `cortex_<component>_thanos_objstore_`, component being one of `ingester`, `querier` and `compactor`). #2027

docs/configuration/config-file-reference.md

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,12 @@ The `querier_config` configures the Cortex querier.
508508
# Maximum lookback beyond which queries are not sent to ingester. 0 means all
509509
# queries are sent to ingester.
510510
# CLI flag: -querier.query-ingesters-within
511-
[ingestermaxquerylookback: <duration> | default = 0s]
511+
[query_ingesters_within: <duration> | default = 0s]
512+
513+
# The time after which a metric should only be queried from storage and not just
514+
# ingesters. 0 means all queries are sent to store.
515+
# CLI flag: -querier.query-store-after
516+
[query_store_after: <duration> | default = 0s]
512517
513518
# The default evaluation interval or step size for subqueries.
514519
# CLI flag: -querier.default-evaluation-interval
@@ -1517,10 +1522,6 @@ write_dedupe_cache_config:
15171522
# The CLI flags prefix for this block config is: store.index-cache-write
15181523
[fifocache: <fifo_cache_config>]
15191524
1520-
# Minimum time between chunk update and being saved to the store.
1521-
# CLI flag: -store.min-chunk-age
1522-
[min_chunk_age: <duration> | default = 0s]
1523-
15241525
# Cache index entries older than this period. 0 to disable.
15251526
# CLI flag: -store.cache-lookups-older-than
15261527
[cache_lookups_older_than: <duration> | default = 0s]

pkg/chunk/chunk_store.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ type StoreConfig struct {
4545
ChunkCacheConfig cache.Config `yaml:"chunk_cache_config,omitempty"`
4646
WriteDedupeCacheConfig cache.Config `yaml:"write_dedupe_cache_config,omitempty"`
4747

48-
MinChunkAge time.Duration `yaml:"min_chunk_age,omitempty"`
4948
CacheLookupsOlderThan time.Duration `yaml:"cache_lookups_older_than,omitempty"`
5049

5150
// Limits query start time to be greater than now() - MaxLookBackPeriod, if set.
@@ -61,7 +60,6 @@ func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) {
6160
f.BoolVar(&cfg.chunkCacheStubs, "store.chunk-cache-stubs", false, "If true, don't write the full chunk to cache, just a stub entry.")
6261
cfg.WriteDedupeCacheConfig.RegisterFlagsWithPrefix("store.index-cache-write.", "Cache config for index entry writing. ", f)
6362

64-
f.DurationVar(&cfg.MinChunkAge, "store.min-chunk-age", 0, "Minimum time between chunk update and being saved to the store.")
6563
f.DurationVar(&cfg.CacheLookupsOlderThan, "store.cache-lookups-older-than", 0, "Cache index entries older than this period. 0 to disable.")
6664
f.DurationVar(&cfg.MaxLookBackPeriod, "store.max-look-back-period", 0, "Limit how long back data can be queried")
6765

@@ -269,11 +267,6 @@ func (c *store) validateQueryTimeRange(ctx context.Context, userID string, from
269267
return true, nil
270268
}
271269

272-
if from.After(now.Add(-c.cfg.MinChunkAge)) {
273-
// no data relevant to this query will have arrived at the store yet
274-
return true, nil
275-
}
276-
277270
if c.cfg.MaxLookBackPeriod != 0 {
278271
oldestStartTime := model.Now().Add(-c.cfg.MaxLookBackPeriod)
279272
if oldestStartTime.After(*from) {

pkg/cortex/cortex.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@ func (c *Config) Validate() error {
144144
if err := c.Distributor.Validate(); err != nil {
145145
return errors.Wrap(err, "invalid distributor config")
146146
}
147+
if err := c.Querier.Validate(); err != nil {
148+
return errors.Wrap(err, "invalid querier config")
149+
}
147150
return nil
148151
}
149152

pkg/querier/querier.go

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package querier
22

33
import (
44
"context"
5+
"errors"
56
"flag"
67
"time"
78

@@ -19,13 +20,16 @@ import (
1920

2021
// Config contains the configuration require to create a querier
2122
type Config struct {
22-
MaxConcurrent int
23-
Timeout time.Duration
24-
Iterators bool
25-
BatchIterators bool
26-
IngesterStreaming bool
27-
MaxSamples int
28-
IngesterMaxQueryLookback time.Duration
23+
MaxConcurrent int
24+
Timeout time.Duration
25+
Iterators bool
26+
BatchIterators bool
27+
IngesterStreaming bool
28+
MaxSamples int
29+
QueryIngestersWithin time.Duration `yaml:"query_ingesters_within"`
30+
31+
// QueryStoreAfter the time after which queries should also be sent to the store and not just ingesters.
32+
QueryStoreAfter time.Duration `yaml:"query_store_after"`
2933

3034
// The default evaluation interval for the promql engine.
3135
// Needs to be configured for subqueries to work as it is the default
@@ -36,6 +40,10 @@ type Config struct {
3640
metricsRegisterer prometheus.Registerer `yaml:"-"`
3741
}
3842

43+
var (
44+
errBadLookbackConfigs = errors.New("bad settings, query_store_after >= query_ingesters_within which can result in queries not being sent")
45+
)
46+
3947
// RegisterFlags adds the flags required to config this to the given FlagSet.
4048
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
4149
f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.")
@@ -47,11 +55,25 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
4755
f.BoolVar(&cfg.BatchIterators, "querier.batch-iterators", false, "Use batch iterators to execute query, as opposed to fully materialising the series in memory. Takes precedent over the -querier.iterators flag.")
4856
f.BoolVar(&cfg.IngesterStreaming, "querier.ingester-streaming", false, "Use streaming RPCs to query ingester.")
4957
f.IntVar(&cfg.MaxSamples, "querier.max-samples", 50e6, "Maximum number of samples a single query can load into memory.")
50-
f.DurationVar(&cfg.IngesterMaxQueryLookback, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
58+
f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
5159
f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.")
60+
f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should only be queried from storage and not just ingesters. 0 means all queries are sent to store.")
5261
cfg.metricsRegisterer = prometheus.DefaultRegisterer
5362
}
5463

64+
// Validate the config
65+
func (cfg *Config) Validate() error {
66+
67+
// Ensure the config wont create a situation where no queriers are returned.
68+
if cfg.QueryIngestersWithin != 0 && cfg.QueryStoreAfter != 0 {
69+
if cfg.QueryStoreAfter >= cfg.QueryIngestersWithin {
70+
return errBadLookbackConfigs
71+
}
72+
}
73+
74+
return nil
75+
}
76+
5577
// ChunkStore is the read-interface to the Chunk Store. Made an interface here
5678
// to reduce package coupling.
5779
type ChunkStore interface {
@@ -70,11 +92,11 @@ func New(cfg Config, distributor Distributor, chunkStore ChunkStore) (storage.Qu
7092
var queryable storage.Queryable
7193
if cfg.IngesterStreaming {
7294
dq := newIngesterStreamingQueryable(distributor, iteratorFunc)
73-
queryable = newUnifiedChunkQueryable(dq, chunkStore, distributor, iteratorFunc, cfg.IngesterMaxQueryLookback)
95+
queryable = newUnifiedChunkQueryable(dq, chunkStore, distributor, iteratorFunc, cfg)
7496
} else {
7597
cq := newChunkStoreQueryable(chunkStore, iteratorFunc)
7698
dq := newDistributorQueryable(distributor)
77-
queryable = NewQueryable(dq, cq, distributor, cfg.IngesterMaxQueryLookback)
99+
queryable = NewQueryable(dq, cq, distributor, cfg)
78100
}
79101

80102
lazyQueryable := storage.QueryableFunc(func(ctx context.Context, mint int64, maxt int64) (storage.Querier, error) {
@@ -97,30 +119,35 @@ func New(cfg Config, distributor Distributor, chunkStore ChunkStore) (storage.Qu
97119
}
98120

99121
// NewQueryable creates a new Queryable for cortex.
100-
func NewQueryable(dq, cq storage.Queryable, distributor Distributor, ingesterMaxQueryLookback time.Duration) storage.Queryable {
122+
func NewQueryable(dq, cq storage.Queryable, distributor Distributor, cfg Config) storage.Queryable {
101123
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
102-
cqr, err := cq.Querier(ctx, mint, maxt)
103-
if err != nil {
104-
return nil, err
105-
}
106-
107124
q := querier{
108-
queriers: []storage.Querier{cqr},
109125
distributor: distributor,
110126
ctx: ctx,
111127
mint: mint,
112128
maxt: maxt,
113129
}
114130

115-
// Include ingester only if maxt is within ingesterMaxQueryLookback w.r.t. current time.
116-
if ingesterMaxQueryLookback == 0 || maxt >= time.Now().Add(-ingesterMaxQueryLookback).UnixNano()/1e6 {
131+
// Include ingester only if maxt is within queryIngestersWithin w.r.t. current time.
132+
now := model.Now()
133+
if cfg.QueryIngestersWithin == 0 || maxt >= int64(now.Add(-cfg.QueryIngestersWithin)) {
117134
dqr, err := dq.Querier(ctx, mint, maxt)
118135
if err != nil {
119136
return nil, err
120137
}
121138
q.queriers = append(q.queriers, dqr)
122139
}
123140

141+
// Include store only if mint is within QueryStoreAfter w.r.t current time.
142+
if cfg.QueryStoreAfter == 0 || mint <= int64(now.Add(-cfg.QueryStoreAfter)) {
143+
cqr, err := cq.Querier(ctx, mint, maxt)
144+
if err != nil {
145+
return nil, err
146+
}
147+
148+
q.queriers = append(q.queriers, cqr)
149+
}
150+
124151
return q, nil
125152
})
126153
}

pkg/querier/querier_test.go

Lines changed: 129 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"strconv"
7+
"sync"
78
"testing"
89
"time"
910

@@ -149,45 +150,45 @@ func TestQuerier(t *testing.T) {
149150

150151
func TestNoHistoricalQueryToIngester(t *testing.T) {
151152
testCases := []struct {
152-
name string
153-
mint, maxt time.Time
154-
hitIngester bool
155-
ingesterMaxQueryLookback time.Duration
153+
name string
154+
mint, maxt time.Time
155+
hitIngester bool
156+
queryIngestersWithin time.Duration
156157
}{
157158
{
158-
name: "hit-test1",
159-
mint: time.Now().Add(-5 * time.Hour),
160-
maxt: time.Now().Add(1 * time.Hour),
161-
hitIngester: true,
162-
ingesterMaxQueryLookback: 1 * time.Hour,
159+
name: "hit-test1",
160+
mint: time.Now().Add(-5 * time.Hour),
161+
maxt: time.Now().Add(1 * time.Hour),
162+
hitIngester: true,
163+
queryIngestersWithin: 1 * time.Hour,
163164
},
164165
{
165-
name: "hit-test2",
166-
mint: time.Now().Add(-5 * time.Hour),
167-
maxt: time.Now().Add(-59 * time.Minute),
168-
hitIngester: true,
169-
ingesterMaxQueryLookback: 1 * time.Hour,
166+
name: "hit-test2",
167+
mint: time.Now().Add(-5 * time.Hour),
168+
maxt: time.Now().Add(-59 * time.Minute),
169+
hitIngester: true,
170+
queryIngestersWithin: 1 * time.Hour,
170171
},
171172
{ // Skipping ingester is disabled.
172-
name: "hit-test2",
173-
mint: time.Now().Add(-5 * time.Hour),
174-
maxt: time.Now().Add(-50 * time.Minute),
175-
hitIngester: true,
176-
ingesterMaxQueryLookback: 0,
173+
name: "hit-test2",
174+
mint: time.Now().Add(-5 * time.Hour),
175+
maxt: time.Now().Add(-50 * time.Minute),
176+
hitIngester: true,
177+
queryIngestersWithin: 0,
177178
},
178179
{
179-
name: "dont-hit-test1",
180-
mint: time.Now().Add(-5 * time.Hour),
181-
maxt: time.Now().Add(-100 * time.Minute),
182-
hitIngester: false,
183-
ingesterMaxQueryLookback: 1 * time.Hour,
180+
name: "dont-hit-test1",
181+
mint: time.Now().Add(-5 * time.Hour),
182+
maxt: time.Now().Add(-100 * time.Minute),
183+
hitIngester: false,
184+
queryIngestersWithin: 1 * time.Hour,
184185
},
185186
{
186-
name: "dont-hit-test2",
187-
mint: time.Now().Add(-5 * time.Hour),
188-
maxt: time.Now().Add(-61 * time.Minute),
189-
hitIngester: false,
190-
ingesterMaxQueryLookback: 1 * time.Hour,
187+
name: "dont-hit-test2",
188+
mint: time.Now().Add(-5 * time.Hour),
189+
maxt: time.Now().Add(-61 * time.Minute),
190+
hitIngester: false,
191+
queryIngestersWithin: 1 * time.Hour,
191192
},
192193
}
193194

@@ -201,7 +202,7 @@ func TestNoHistoricalQueryToIngester(t *testing.T) {
201202
for _, ingesterStreaming := range []bool{true, false} {
202203
cfg.IngesterStreaming = ingesterStreaming
203204
for _, c := range testCases {
204-
cfg.IngesterMaxQueryLookback = c.ingesterMaxQueryLookback
205+
cfg.QueryIngestersWithin = c.queryIngestersWithin
205206
t.Run(fmt.Sprintf("IngesterStreaming=%t,test=%s", cfg.IngesterStreaming, c.name), func(t *testing.T) {
206207
chunkStore, _ := makeMockChunkStore(t, 24, encodings[0].e)
207208
distributor := &errDistributor{}
@@ -300,3 +301,101 @@ func (m *errDistributor) LabelNames(context.Context) ([]string, error) {
300301
func (m *errDistributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) {
301302
return nil, errDistributorError
302303
}
304+
305+
type emptyChunkStore struct {
306+
sync.Mutex
307+
called bool
308+
}
309+
310+
func (c *emptyChunkStore) Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) {
311+
c.Lock()
312+
defer c.Unlock()
313+
c.called = true
314+
return nil, nil
315+
}
316+
317+
func (c *emptyChunkStore) IsCalled() bool {
318+
c.Lock()
319+
defer c.Unlock()
320+
return c.called
321+
}
322+
323+
func TestShortTermQueryToLTS(t *testing.T) {
324+
testCases := []struct {
325+
name string
326+
mint, maxt time.Time
327+
hitIngester bool
328+
hitLTS bool
329+
queryIngestersWithin time.Duration
330+
queryStoreAfter time.Duration
331+
}{
332+
{
333+
name: "hit only ingester",
334+
mint: time.Now().Add(-5 * time.Minute),
335+
maxt: time.Now(),
336+
hitIngester: true,
337+
hitLTS: false,
338+
queryIngestersWithin: 1 * time.Hour,
339+
queryStoreAfter: time.Hour,
340+
},
341+
{
342+
name: "hit both",
343+
mint: time.Now().Add(-5 * time.Hour),
344+
maxt: time.Now(),
345+
hitIngester: true,
346+
hitLTS: true,
347+
queryIngestersWithin: 1 * time.Hour,
348+
queryStoreAfter: time.Hour,
349+
},
350+
{
351+
name: "hit only storage",
352+
mint: time.Now().Add(-5 * time.Hour),
353+
maxt: time.Now().Add(-2 * time.Hour),
354+
hitIngester: false,
355+
hitLTS: true,
356+
queryIngestersWithin: 1 * time.Hour,
357+
queryStoreAfter: 0,
358+
},
359+
}
360+
361+
engine := promql.NewEngine(promql.EngineOpts{
362+
Logger: util.Logger,
363+
MaxConcurrent: 10,
364+
MaxSamples: 1e6,
365+
Timeout: 1 * time.Minute,
366+
})
367+
cfg := Config{}
368+
for _, ingesterStreaming := range []bool{true, false} {
369+
cfg.IngesterStreaming = ingesterStreaming
370+
for _, c := range testCases {
371+
cfg.QueryIngestersWithin = c.queryIngestersWithin
372+
cfg.QueryStoreAfter = c.queryStoreAfter
373+
t.Run(fmt.Sprintf("IngesterStreaming=%t,test=%s", cfg.IngesterStreaming, c.name), func(t *testing.T) {
374+
chunkStore := &emptyChunkStore{}
375+
distributor := &errDistributor{}
376+
377+
queryable, _ := New(cfg, distributor, chunkStore)
378+
query, err := engine.NewRangeQuery(queryable, "dummy", c.mint, c.maxt, 1*time.Minute)
379+
require.NoError(t, err)
380+
381+
ctx := user.InjectOrgID(context.Background(), "0")
382+
r := query.Exec(ctx)
383+
_, err = r.Matrix()
384+
385+
if c.hitIngester {
386+
// If the ingester was hit, the distributor always returns errDistributorError.
387+
require.Error(t, err)
388+
require.Equal(t, errDistributorError.Error(), err.Error())
389+
} else {
390+
// If the ingester was hit, there would have been an error from errDistributor.
391+
require.NoError(t, err)
392+
}
393+
394+
// Verify if the test did/did not hit the LTS
395+
time.Sleep(30 * time.Millisecond) // NOTE: Since this is a lazy querier there is a race condition between the response and chunk store being called
396+
require.Equal(t, c.hitLTS, chunkStore.IsCalled())
397+
})
398+
}
399+
}
400+
401+
}

0 commit comments

Comments
 (0)