Skip to content

Ingester: Metadata APIs should honour QueryIngestersWithin when QueryStoreForLabels is enabled #5027

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* [ENHANCEMENT] Query-tee: Add `/api/v1/query_exemplars` API endpoint support. #5010
* [ENHANCEMENT] Query Frontend/Query Scheduler: Increase upper bound to 60s for queue duration histogram metric. #5029
* [ENHANCEMENT] Query Frontend: Log Vertical sharding information when `query_stats_enabled` is enabled. #5037
* [ENHANCEMENT] Ingester: The metadata APIs should honour `querier.query-ingesters-within` when `querier.query-store-for-labels-enabled` is true. #5027
* [FEATURE] Querier/Query Frontend: support Prometheus /api/v1/status/buildinfo API. #4978
* [FEATURE] Ingester: Add active series to all_user_stats page. #4972
* [FEATURE] Ingester: Added `-blocks-storage.tsdb.head-chunks-write-queue-size` allowing to configure the size of the in-memory queue used before flushing chunks to the disk . #5000
Expand Down
5 changes: 3 additions & 2 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,9 @@ querier:
# CLI flag: -querier.query-ingesters-within
[query_ingesters_within: <duration> | default = 0s]

# Query long-term store for series, label values and label names APIs. Works
# only with blocks engine.
# Deprecated (Querying long-term store for labels will be always enabled in
# the future.): Query long-term store for series, label values and label names
# APIs.
# CLI flag: -querier.query-store-for-labels-enabled
[query_store_for_labels_enabled: <boolean> | default = false]

Expand Down
4 changes: 2 additions & 2 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -810,8 +810,8 @@ The `querier_config` configures the Cortex querier.
# CLI flag: -querier.query-ingesters-within
[query_ingesters_within: <duration> | default = 0s]

# Query long-term store for series, label values and label names APIs. Works
# only with blocks engine.
# Deprecated (Querying long-term store for labels will be always enabled in the
# future.): Query long-term store for series, label values and label names APIs.
# CLI flag: -querier.query-store-for-labels-enabled
[query_store_for_labels_enabled: <boolean> | default = false]

Expand Down
2 changes: 2 additions & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,8 @@ func (t *Cortex) initIngesterService() (serv services.Service, err error) {
t.Cfg.Ingester.DistributorShardingStrategy = t.Cfg.Distributor.ShardingStrategy
t.Cfg.Ingester.DistributorShardByAllLabels = t.Cfg.Distributor.ShardByAllLabels
t.Cfg.Ingester.InstanceLimitsFn = ingesterInstanceLimits(t.RuntimeConfig)
t.Cfg.Ingester.QueryStoreForLabels = t.Cfg.Querier.QueryStoreForLabels
t.Cfg.Ingester.QueryIngestersWithin = t.Cfg.Querier.QueryIngestersWithin
t.tsdbIngesterConfig()

t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Overrides, prometheus.DefaultRegisterer, util_log.Logger)
Expand Down
18 changes: 14 additions & 4 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ type Config struct {
DistributorShardingStrategy string `yaml:"-"`
DistributorShardByAllLabels bool `yaml:"-"`

// Injected at runtime and read from querier config.
QueryStoreForLabels bool `yaml:"-"`
QueryIngestersWithin time.Duration `yaml:"-"`

DefaultLimits InstanceLimits `yaml:"instance_limits"`
InstanceLimitsFn func() *InstanceLimits `yaml:"-"`

Expand Down Expand Up @@ -1303,7 +1307,7 @@ func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesReque
return &client.LabelValuesResponse{}, nil
}

mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db)
mint, maxt, err := metadataQueryRange(startTimestampMs, endTimestampMs, db, i.cfg.QueryStoreForLabels, i.cfg.QueryIngestersWithin)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1364,7 +1368,7 @@ func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest
return &client.LabelNamesResponse{}, nil
}

mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db)
mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db, i.cfg.QueryStoreForLabels, i.cfg.QueryIngestersWithin)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1432,7 +1436,7 @@ func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.Metr
return nil, err
}

mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db)
mint, maxt, err := metadataQueryRange(req.StartTimestampMs, req.EndTimestampMs, db, i.cfg.QueryStoreForLabels, i.cfg.QueryIngestersWithin)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2564,7 +2568,13 @@ func (i *Ingester) flushHandler(w http.ResponseWriter, r *http.Request) {
}

// metadataQueryRange returns the best range to query for metadata queries based on the timerange in the ingester.
func metadataQueryRange(queryStart, queryEnd int64, db *userTSDB) (mint, maxt int64, err error) {
func metadataQueryRange(queryStart, queryEnd int64, db *userTSDB, queryStoreForLabels bool, queryIngestersWithin time.Duration) (mint, maxt int64, err error) {
if queryIngestersWithin > 0 && queryStoreForLabels {
// If the feature for querying metadata from store-gateway is enabled,
// then we don't want to manipulate the mint and maxt.
return
}

// Ingesters are run with limited retention and we don't support querying the store-gateway for labels yet.
// This means if someone loads a dashboard that is outside the range of the ingester, and we only return the
// data for the timerange requested (which will be empty), the dashboards will break. To fix this we should
Expand Down
25 changes: 20 additions & 5 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1733,10 +1733,12 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) {
}

tests := map[string]struct {
from int64
to int64
matchers []*client.LabelMatchers
expected []*cortexpb.Metric
from int64
to int64
matchers []*client.LabelMatchers
expected []*cortexpb.Metric
queryStoreForLabels bool
queryIngestersWithin time.Duration
}{
"should return an empty response if no metric match": {
from: math.MinInt64,
Expand Down Expand Up @@ -1794,6 +1796,18 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) {
{Labels: cortexpb.FromLabelsToLabelAdapters(fixtures[1].lbls)},
},
},
"should filter metrics by time range if queryStoreForLabels and queryIngestersWithin is enabled": {
from: 100,
to: 1000,
matchers: []*client.LabelMatchers{{
Matchers: []*client.LabelMatcher{
{Type: client.EQUAL, Name: model.MetricNameLabel, Value: "test_1"},
},
}},
expected: []*cortexpb.Metric{},
queryStoreForLabels: true,
queryIngestersWithin: time.Hour,
},
"should not return duplicated metrics on overlapping matchers": {
from: math.MinInt64,
to: math.MaxInt64,
Expand Down Expand Up @@ -1860,7 +1874,8 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) {
EndTimestampMs: testData.to,
MatchersSet: testData.matchers,
}

i.cfg.QueryStoreForLabels = testData.queryStoreForLabels
i.cfg.QueryIngestersWithin = testData.queryIngestersWithin
res, err := i.MetricsForLabelMatchers(ctx, req)
require.NoError(t, err)
assert.ElementsMatch(t, testData.expected, res.Metric)
Expand Down
53 changes: 30 additions & 23 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@ type Distributor interface {
MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error)
}

func newDistributorQueryable(distributor Distributor, streaming bool, streamingMetdata bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration) QueryableWithFilter {
func newDistributorQueryable(distributor Distributor, streaming bool, streamingMetdata bool, iteratorFn chunkIteratorFunc, queryIngestersWithin time.Duration, queryStoreForLabels bool) QueryableWithFilter {
return distributorQueryable{
distributor: distributor,
streaming: streaming,
streamingMetdata: streamingMetdata,
iteratorFn: iteratorFn,
queryIngestersWithin: queryIngestersWithin,
queryStoreForLabels: queryStoreForLabels,
}
}

Expand All @@ -53,6 +54,7 @@ type distributorQueryable struct {
streamingMetdata bool
iteratorFn chunkIteratorFunc
queryIngestersWithin time.Duration
queryStoreForLabels bool
}

func (d distributorQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
Expand All @@ -65,6 +67,7 @@ func (d distributorQueryable) Querier(ctx context.Context, mint, maxt int64) (st
streamingMetadata: d.streamingMetdata,
chunkIterFn: d.iteratorFn,
queryIngestersWithin: d.queryIngestersWithin,
queryStoreForLabels: d.queryStoreForLabels,
}, nil
}

Expand All @@ -81,6 +84,7 @@ type distributorQuerier struct {
streamingMetadata bool
chunkIterFn chunkIteratorFunc
queryIngestersWithin time.Duration
queryStoreForLabels bool
}

// Select implements storage.Querier interface.
Expand All @@ -95,35 +99,18 @@ func (q *distributorQuerier) Select(sortSeries bool, sp *storage.SelectHints, ma
}

// If the querier receives a 'series' query, it means only metadata is needed.
// For this specific case we shouldn't apply the queryIngestersWithin
// time range manipulation, otherwise we'll end up returning no series at all for
// For the specific case where queryStoreForLabels is disabled
// we shouldn't apply the queryIngestersWithin time range manipulation.
// Otherwise we'll end up returning no series at all for
// older time ranges (while in Cortex we do ignore the start/end and always return
// series in ingesters).
// Also, in the recent versions of Prometheus, we pass in the hint but with Func set to "series".
// See: https://github.com/prometheus/prometheus/pull/8050
if sp != nil && sp.Func == "series" {
var (
ms []metric.Metric
err error
)

if q.streamingMetadata {
ms, err = q.distributor.MetricsForLabelMatchersStream(ctx, model.Time(minT), model.Time(maxT), matchers...)
} else {
ms, err = q.distributor.MetricsForLabelMatchers(ctx, model.Time(minT), model.Time(maxT), matchers...)
}

if err != nil {
return storage.ErrSeriesSet(err)
}
return series.MetricsToSeriesSet(sortSeries, ms)
}
shouldNotQueryStoreForMetadata := (sp != nil && sp.Func == "series" && !q.queryStoreForLabels)

// If queryIngestersWithin is enabled, we do manipulate the query mint to query samples up until
// now - queryIngestersWithin, because older time ranges are covered by the storage. This
// optimization is particularly important for the blocks storage where the blocks retention in the
// ingesters could be way higher than queryIngestersWithin.
if q.queryIngestersWithin > 0 {
if q.queryIngestersWithin > 0 && !shouldNotQueryStoreForMetadata {
now := time.Now()
origMinT := minT
minT = math.Max64(minT, util.TimeToMillis(now.Add(-q.queryIngestersWithin)))
Expand All @@ -138,6 +125,26 @@ func (q *distributorQuerier) Select(sortSeries bool, sp *storage.SelectHints, ma
}
}

// In the recent versions of Prometheus, we pass in the hint but with Func set to "series".
// See: https://github.com/prometheus/prometheus/pull/8050
if sp != nil && sp.Func == "series" {
var (
ms []metric.Metric
err error
)

if q.streamingMetadata {
ms, err = q.distributor.MetricsForLabelMatchersStream(ctx, model.Time(minT), model.Time(maxT), matchers...)
} else {
ms, err = q.distributor.MetricsForLabelMatchers(ctx, model.Time(minT), model.Time(maxT), matchers...)
}

if err != nil {
return storage.ErrSeriesSet(err)
}
return series.MetricsToSeriesSet(sortSeries, ms)
}

if q.streaming {
return q.streamingSelect(ctx, sortSeries, minT, maxT, matchers)
}
Expand Down
22 changes: 16 additions & 6 deletions pkg/querier/distributor_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestDistributorQuerier(t *testing.T) {
},
nil)

queryable := newDistributorQueryable(d, false, false, nil, 0)
queryable := newDistributorQueryable(d, false, false, nil, 0, false)
querier, err := queryable.Querier(context.Background(), mint, maxt)
require.NoError(t, err)

Expand All @@ -70,6 +70,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)

tests := map[string]struct {
querySeries bool
queryStoreForLabels bool
queryIngestersWithin time.Duration
queryMinT int64
queryMaxT int64
Expand Down Expand Up @@ -112,6 +113,15 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
expectedMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
expectedMaxT: util.TimeToMillis(now.Add(-90 * time.Minute)),
},
"should manipulate query time range if queryIngestersWithin is enabled and queryStoreForLabels is enabled": {
querySeries: true,
queryStoreForLabels: true,
queryIngestersWithin: time.Hour,
queryMinT: util.TimeToMillis(now.Add(-100 * time.Minute)),
queryMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
expectedMinT: util.TimeToMillis(now.Add(-60 * time.Minute)),
expectedMaxT: util.TimeToMillis(now.Add(-30 * time.Minute)),
},
}

for _, streamingEnabled := range []bool{false, true} {
Expand All @@ -124,7 +134,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)
distributor.On("MetricsForLabelMatchersStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]metric.Metric{}, nil)

ctx := user.InjectOrgID(context.Background(), "test")
queryable := newDistributorQueryable(distributor, streamingEnabled, streamingEnabled, nil, testData.queryIngestersWithin)
queryable := newDistributorQueryable(distributor, streamingEnabled, streamingEnabled, nil, testData.queryIngestersWithin, testData.queryStoreForLabels)
querier, err := queryable.Querier(ctx, testData.queryMinT, testData.queryMaxT)
require.NoError(t, err)

Expand Down Expand Up @@ -161,7 +171,7 @@ func TestDistributorQuerier_SelectShouldHonorQueryIngestersWithin(t *testing.T)

func TestDistributorQueryableFilter(t *testing.T) {
d := &MockDistributor{}
dq := newDistributorQueryable(d, false, false, nil, 1*time.Hour)
dq := newDistributorQueryable(d, false, false, nil, 1*time.Hour, true)

now := time.Now()

Expand Down Expand Up @@ -209,7 +219,7 @@ func TestIngesterStreaming(t *testing.T) {
nil)

ctx := user.InjectOrgID(context.Background(), "0")
queryable := newDistributorQueryable(d, true, true, mergeChunks, 0)
queryable := newDistributorQueryable(d, true, true, mergeChunks, 0, true)
querier, err := queryable.Querier(ctx, mint, maxt)
require.NoError(t, err)

Expand Down Expand Up @@ -285,7 +295,7 @@ func TestIngesterStreamingMixedResults(t *testing.T) {
nil)

ctx := user.InjectOrgID(context.Background(), "0")
queryable := newDistributorQueryable(d, true, true, mergeChunks, 0)
queryable := newDistributorQueryable(d, true, true, mergeChunks, 0, true)
querier, err := queryable.Querier(ctx, mint, maxt)
require.NoError(t, err)

Expand Down Expand Up @@ -336,7 +346,7 @@ func TestDistributorQuerier_LabelNames(t *testing.T) {
d.On("MetricsForLabelMatchersStream", mock.Anything, model.Time(mint), model.Time(maxt), someMatchers).
Return(metrics, nil)

queryable := newDistributorQueryable(d, false, streamingEnabled, nil, 0)
queryable := newDistributorQueryable(d, false, streamingEnabled, nil, 0, true)
querier, err := queryable.Querier(context.Background(), mint, maxt)
require.NoError(t, err)

Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.IngesterMetadataStreaming, "querier.ingester-metadata-streaming", false, "Use streaming RPCs for metadata APIs from ingester.")
f.IntVar(&cfg.MaxSamples, "querier.max-samples", 50e6, "Maximum number of samples a single query can load into memory.")
f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.")
f.BoolVar(&cfg.QueryStoreForLabels, "querier.query-store-for-labels-enabled", false, "Query long-term store for series, label values and label names APIs. Works only with blocks engine.")
f.BoolVar(&cfg.QueryStoreForLabels, "querier.query-store-for-labels-enabled", false, "Deprecated (Querying long-term store for labels will be always enabled in the future.): Query long-term store for series, label values and label names APIs.")
f.BoolVar(&cfg.EnablePerStepStats, "querier.per-step-stats-enabled", false, "Enable returning samples stats per steps in query response.")
f.DurationVar(&cfg.MaxQueryIntoFuture, "querier.max-query-into-future", 10*time.Minute, "Maximum duration into the future you can query. 0 to disable.")
f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.")
Expand Down Expand Up @@ -146,7 +146,7 @@ func getChunksIteratorFunction(cfg Config) chunkIteratorFunc {
func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, tombstonesLoader purger.TombstonesLoader, reg prometheus.Registerer, logger log.Logger) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, v1.QueryEngine) {
iteratorFunc := getChunksIteratorFunction(cfg)

distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterStreaming, cfg.IngesterMetadataStreaming, iteratorFunc, cfg.QueryIngestersWithin)
distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterStreaming, cfg.IngesterMetadataStreaming, iteratorFunc, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels)

ns := make([]QueryableWithFilter, len(stores))
for ix, s := range stores {
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) {

distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&unorderedResponse, nil)
distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(unorderedResponseMatrix, nil)
distributorQueryableStreaming := newDistributorQueryable(distributor, true, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin)
distributorQueryable := newDistributorQueryable(distributor, false, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin)
distributorQueryableStreaming := newDistributorQueryable(distributor, true, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels)
distributorQueryable := newDistributorQueryable(distributor, false, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin, cfg.QueryStoreForLabels)

tCases := []struct {
name string
Expand Down