Skip to content

Commit 43db2a7

Browse files
committed
Ingester Metadata APIs should honour QueryIngestersWithin
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
1 parent 061b348 commit 43db2a7

File tree

8 files changed

+81
-41
lines changed

8 files changed

+81
-41
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
* [ENHANCEMENT] Querier: limit series query to only ingesters if `start` param is not specified. #4976
66
* [ENHANCEMENT] Query-frontend/scheduler: add a new limit `frontend.max-outstanding-requests-per-tenant` for configuring queue size per tenant. Started deprecating two flags `-query-scheduler.max-outstanding-requests-per-tenant` and `-querier.max-outstanding-requests-per-tenant`, and change their value default to 0. Now if both the old flag and new flag are specified, the old flag's queue size will be picked. #5005
77
* [ENHANCEMENT] Query-tee: Add `/api/v1/query_exemplars` API endpoint support. #5010
8+
* [ENHANCEMENT] Ingester: The metadata APIs should honour `querier.query-ingesters-within` when `querier.query-store-for-labels-enabled` is true. #5027
89
* [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978
910
* [FEATURE] Ingester: Add active series to all_user_stats page. #4972
1011
* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000

pkg/cortex/modules.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,8 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) {
393393
t.Cfg.Ingester.DistributorShardingStrategy = t.Cfg.Distributor.ShardingStrategy
394394
t.Cfg.Ingester.DistributorShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels
395395
t.Cfg.Ingester.InstanceLimitsFn = ingesterInstanceLimits(t.RuntimeConfig)
396+
t.Cfg.Ingester.QueryStoreForLabels = t.Cfg.Querier.QueryStoreForLabels
397+
t.Cfg.Ingester.QueryIngestersWithin = t.Cfg.Querier.QueryIngestersWithin
396398
t.tsdbIngesterConfig()
397399

398400
t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Overrides, prometheus.DefaultRegisterer, util_log.Logger)

pkg/ingester/ingester.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ type Config struct {
106106
DistributorShardingStrategy string `yaml:"-"`
107107
DistributorShardByAllLabels bool `yaml:"-"`
108108

109+
// Injected at runtime and read from querier config.
110+
QueryStoreForLabels bool `yaml:"-"`
111+
QueryIngestersWithin time.Duration `yaml:"-"`
112+
109113
DefaultLimits InstanceLimits `yaml:"instance_limits"`
110114
InstanceLimitsFn func() *InstanceLimits `yaml:"-"`
111115

@@ -1303,7 +1307,7 @@ func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesReque
13031307
return &client.LabelValuesResponse{}, nil
13041308
}
13051309

1306-
mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db)
1310+
mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.cfg.QueryStoreForLabels, i.cfg.QueryIngestersWithin)
13071311
if err != nil {
13081312
return nil, err
13091313
}
@@ -1364,7 +1368,7 @@ func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest
13641368
return &client.LabelNamesResponse{}, nil
13651369
}
13661370

1367-
mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db)
1371+
mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db, i.cfg.QueryStoreForLabels, i.cfg.QueryIngestersWithin)
13681372
if err != nil {
13691373
return nil, err
13701374
}
@@ -1432,7 +1436,7 @@ func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.Metr
14321436
return nil, err
14331437
}
14341438

1435-
mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db)
1439+
mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db, i.cfg.QueryStoreForLabels, i.cfg.QueryIngestersWithin)
14361440
if err != nil {
14371441
return nil, err
14381442
}
@@ -2564,7 +2568,13 @@ func (i *Ingester) flushHandler(w http.ResponseWriter, r *http.Request) {
25642568
}
25652569

25662570
// metadataQueryRange returns the best range to query for metadata queries based on the timerange in the ingester.
2567-
func metadataQueryRange(queryStart, queryEnd int64, db *userTSDB) (mint, maxt int64, err error) {
2571+
func metadataQueryRange(queryStart, queryEnd int64, db *userTSDB, queryStoreForLabels bool, queryIngestersWithin time.Duration) (mint, maxt int64, err error) {
2572+
if queryIngestersWithin > 0 && queryStoreForLabels {
2573+
// If the feature for querying metadata from store-gateway is enabled,
2574+
// then we don't want to manipulate the mint and maxt.
2575+
return
2576+
}
2577+
25682578
// Ingesters are run with limited retention and we don't support querying the store-gateway for labels yet.
25692579
// This means if someone loads a dashboard that is outside the range of the ingester, and we only return the
25702580
// data for the timerange requested (which will be empty), the dashboards will break. To fix this we should

pkg/ingester/ingester_test.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1733,10 +1733,11 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) {
17331733
}
17341734

17351735
tests := map[string]struct {
1736-
from int64
1737-
to int64
1738-
matchers []*client.LabelMatchers
1739-
expected []*cortexpb.Metric
1736+
from int64
1737+
to int64
1738+
matchers []*client.LabelMatchers
1739+
expected []*cortexpb.Metric
1740+
queryStoreForLabels bool
17401741
}{
17411742
"should return an empty response if no metric match": {
17421743
from: math.MinInt64,
@@ -1794,6 +1795,17 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) {
17941795
{Labels: cortexpb.FromLabelsToLabelAdapters(fixtures[1].lbls)},
17951796
},
17961797
},
1798+
"should filter metrics by time range if queryStoreForLabels is enabled": {
1799+
from: 100,
1800+
to: 1000,
1801+
matchers: []*client.LabelMatchers{{
1802+
Matchers: []*client.LabelMatcher{
1803+
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "test_1"},
1804+
},
1805+
}},
1806+
expected: []*cortexpb.Metric{},
1807+
queryStoreForLabels: true,
1808+
},
17971809
"should not return duplicated metrics on overlapping matchers": {
17981810
from: math.MinInt64,
17991811
to: math.MaxInt64,
@@ -1860,7 +1872,7 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) {
18601872
EndTimestampMs: testData.to,
18611873
MatchersSet: testData.matchers,
18621874
}
1863-
1875+
i.cfg.QueryStoreForLabels = testData.queryStoreForLabels
18641876
res, err := i.MetricsForLabelMatchers(ctx, req)
18651877
require.NoError(t, err)
18661878
assert.ElementsMatch(t, testData.expected, res.Metric)

pkg/querier/distributor_queryable.go

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,14 @@ type Distributor interface {
3737
MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error)
3838
}
3939

40-
func newDistributorQueryable(distributor Distributor, streaming bool, streamingMetdata bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration) QueryableWithFilter {
40+
func newDistributorQueryable(distributor Distributor, streaming bool, streamingMetdata bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration, queryStoreForLabels bool) QueryableWithFilter {
4141
return distributorQueryable{
4242
distributor: distributor,
4343
streaming: streaming,
4444
streamingMetdata: streamingMetdata,
4545
iteratorFn: iteratorFn,
4646
queryIngestersWithin: queryIngestersWithin,
47+
queryStoreForLabels: queryStoreForLabels,
4748
}
4849
}
4950

@@ -53,6 +54,7 @@ type distributorQueryable struct {
5354
streamingMetdata bool
5455
iteratorFn chunkIteratorFunc
5556
queryIngestersWithin time.Duration
57+
queryStoreForLabels bool
5658
}
5759

5860
func (d distributorQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
@@ -65,6 +67,7 @@ func (d distributorQueryable) Querier(ctx context.Context, mint, maxt int64) (st
6567
streamingMetadata: d.streamingMetdata,
6668
chunkIterFn: d.iteratorFn,
6769
queryIngestersWithin: d.queryIngestersWithin,
70+
queryStoreForLabels: d.queryStoreForLabels,
6871
}, nil
6972
}
7073

@@ -81,6 +84,7 @@ type distributorQuerier struct {
8184
streamingMetadata bool
8285
chunkIterFn chunkIteratorFunc
8386
queryIngestersWithin time.Duration
87+
queryStoreForLabels bool
8488
}
8589

8690
// Select implements storage.Querier interface.
@@ -94,12 +98,32 @@ func (q *distributorQuerier) Select(sortSeries bool, sp *storage.SelectHints, ma
9498
minT, maxT = sp.Start, sp.End
9599
}
96100

101+
// If queryIngestersWithin is enabled, we do manipulate the query mint to query samples up until
102+
// now - queryIngestersWithin, because older time ranges are covered by the storage. This
103+
// optimization is particularly important for the blocks storage where the blocks retention in the
104+
// ingesters could be way higher than queryIngestersWithin.
97105
// If the querier receives a 'series' query, it means only metadata is needed.
98-
// For this specific case we shouldn't apply the queryIngestersWithin
99-
// time range manipulation, otherwise we'll end up returning no series at all for
106+
// For the specific case where queryStoreForLabels is disabled
107+
// we shouldn't apply this time range manipulation.
108+
// Otherwise we'll end up returning no series at all for
100109
// older time ranges (while in Cortex we do ignore the start/end and always return
101110
// series in ingesters).
102-
// Also, in the recent versions of Prometheus, we pass in the hint but with Func set to "series".
111+
if q.queryIngestersWithin > 0 && !(sp != nil && sp.Func == "series" && !q.queryStoreForLabels) {
112+
now := time.Now()
113+
origMinT := minT
114+
minT = math.Max64(minT, util.TimeToMillis(now.Add(-q.queryIngestersWithin)))
115+
116+
if origMinT != minT {
117+
level.Debug(log).Log("msg", "the min time of the query to ingesters has been manipulated", "original", origMinT, "updated", minT)
118+
}
119+
120+
if minT > maxT {
121+
level.Debug(log).Log("msg", "empty query time range after min time manipulation")
122+
return storage.EmptySeriesSet()
123+
}
124+
}
125+
126+
// In the recent versions of Prometheus, we pass in the hint but with Func set to "series".
103127
// See: https://github.com/prometheus/prometheus/pull/8050
104128
if sp != nil && sp.Func == "series" {
105129
var (
@@ -119,25 +143,6 @@ func (q *distributorQuerier) Select(sortSeries bool, sp *storage.SelectHints, ma
119143
return series.MetricsToSeriesSet(sortSeries, ms)
120144
}
121145

122-
// If queryIngestersWithin is enabled, we do manipulate the query mint to query samples up until
123-
// now - queryIngestersWithin, because older time ranges are covered by the storage. This
124-
// optimization is particularly important for the blocks storage where the blocks retention in the
125-
// ingesters could be way higher than queryIngestersWithin.
126-
if q.queryIngestersWithin > 0 {
127-
now := time.Now()
128-
origMinT := minT
129-
minT = math.Max64(minT, util.TimeToMillis(now.Add(-q.queryIngestersWithin)))
130-
131-
if origMinT != minT {
132-
level.Debug(log).Log("msg", "the min time of the query to ingesters has been manipulated", "original", origMinT, "updated", minT)
133-
}
134-
135-
if minT > maxT {
136-
level.Debug(log).Log("msg", "empty query time range after min time manipulation")
137-
return storage.EmptySeriesSet()
138-
}
139-
}
140-
141146
if q.streaming {
142147
return q.streamingSelect(ctx, sortSeries, minT, maxT, matchers)
143148
}

pkg/querier/distributor_queryable_test.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func TestDistributorQuerier(t *testing.T) {
4646
},
4747
nil)
4848

49-
queryable := newDistributorQueryable(d, false, false, nil, 0)
49+
queryable := newDistributorQueryable(d, false, false, nil, 0, false)
5050
querier, err := queryable.Querier(context.Background(), mint, maxt)
5151
require.NoError(t, err)
5252

@@ -70,6 +70,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
7070

7171
tests := map[string]struct {
7272
querySeries bool
73+
queryStoreForLabels bool
7374
queryIngestersWithin time.Duration
7475
queryMinT int64
7576
queryMaxT int64
@@ -112,6 +113,15 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
112113
expectedMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
113114
expectedMaxT: util.TimeToMillis(now.Add(-90 * time.Minute)),
114115
},
116+
"should manipulate query time range if queryIngestersWithin is enabled and queryStoreForLabels is enabled": {
117+
querySeries: true,
118+
queryStoreForLabels: true,
119+
queryIngestersWithin: time.Hour,
120+
queryMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
121+
queryMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
122+
expectedMinT: util.TimeToMillis(now.Add(-60 * time.Minute)),
123+
expectedMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
124+
},
115125
}
116126

117127
for _, streamingEnabled := range []bool{false, true} {
@@ -124,7 +134,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
124134
distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]metric.Metric{}, nil)
125135

126136
ctx := user.InjectOrgID(context.Background(), "test")
127-
queryable := newDistributorQueryable(distributor, streamingEnabled, streamingEnabled, nil, testData.queryIngestersWithin)
137+
queryable := newDistributorQueryable(distributor, streamingEnabled, streamingEnabled, nil, testData.queryIngestersWithin, testData.queryStoreForLabels)
128138
querier, err := queryable.Querier(ctx, testData.queryMinT, testData.queryMaxT)
129139
require.NoError(t, err)
130140

@@ -161,7 +171,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
161171

162172
func TestDistributorQueryableFilter(t *testing.T) {
163173
d := &MockDistributor{}
164-
dq := newDistributorQueryable(d, false, false, nil, 1*time.Hour)
174+
dq := newDistributorQueryable(d, false, false, nil, 1*time.Hour, true)
165175

166176
now := time.Now()
167177

@@ -209,7 +219,7 @@ func TestIngesterStreaming(t *testing.T) {
209219
nil)
210220

211221
ctx := user.InjectOrgID(context.Background(), "0")
212-
queryable := newDistributorQueryable(d, true, true, mergeChunks, 0)
222+
queryable := newDistributorQueryable(d, true, true, mergeChunks, 0, true)
213223
querier, err := queryable.Querier(ctx, mint, maxt)
214224
require.NoError(t, err)
215225

@@ -285,7 +295,7 @@ func TestIngesterStreamingMixedResults(t *testing.T) {
285295
nil)
286296

287297
ctx := user.InjectOrgID(context.Background(), "0")
288-
queryable := newDistributorQueryable(d, true, true, mergeChunks, 0)
298+
queryable := newDistributorQueryable(d, true, true, mergeChunks, 0, true)
289299
querier, err := queryable.Querier(ctx, mint, maxt)
290300
require.NoError(t, err)
291301

@@ -336,7 +346,7 @@ func TestDistributorQuerier_LabelNames(t *testing.T) {
336346
d.On("MetricsForLabelMatchersStream", mock.Anything, model.Time(mint), model.Time(maxt), someMatchers).
337347
Return(metrics, nil)
338348

339-
queryable := newDistributorQueryable(d, false, streamingEnabled, nil, 0)
349+
queryable := newDistributorQueryable(d, false, streamingEnabled, nil, 0, true)
340350
querier, err := queryable.Querier(context.Background(), mint, maxt)
341351
require.NoError(t, err)
342352

pkg/querier/querier.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ func getChunksIteratorFunction(cfg Config) chunkIteratorFunc {
146146
func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, tombstonesLoader purger.TombstonesLoader, reg prometheus.Registerer, logger log.Logger) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, v1.QueryEngine) {
147147
iteratorFunc := getChunksIteratorFunction(cfg)
148148

149-
distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterStreaming, cfg.IngesterMetadataStreaming, iteratorFunc, cfg.QueryIngestersWithin)
149+
distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterStreaming, cfg.IngesterMetadataStreaming, iteratorFunc, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels)
150150

151151
ns := make([]QueryableWithFilter, len(stores))
152152
for ix, s := range stores {

pkg/querier/querier_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,8 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) {
213213

214214
distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&unorderedResponse, nil)
215215
distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(unorderedResponseMatrix, nil)
216-
distributorQueryableStreaming := newDistributorQueryable(distributor, true, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin)
217-
distributorQueryable := newDistributorQueryable(distributor, false, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin)
216+
distributorQueryableStreaming := newDistributorQueryable(distributor, true, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels)
217+
distributorQueryable := newDistributorQueryable(distributor, false, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels)
218218

219219
tCases := []struct {
220220
name string

0 commit comments

Comments
 (0)