Skip to content

Commit 22245aa

Browse files
Implement metadata API limit in Ingester (#6128)
Co-authored-by: Alan Protasio <[email protected]>
1 parent a046044 commit 22245aa

12 files changed

+419
-186
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
* [ENHANCEMENT] Compactor: Split cleaner cycle for active and deleted tenants. #6112
4545
* [ENHANCEMENT] Compactor: Introduce cleaner visit marker. #6113
4646
* [ENHANCEMENT] Query Frontend: Add cortex_query_samples_total metric. #6142
47+
* [ENHANCEMENT] Ingester: Implement metadata API limit. #6128
4748
* [BUGFIX] Configsdb: Fix endline issue in db password. #5920
4849
* [BUGFIX] Ingester: Fix `user` and `type` labels for the `cortex_ingester_tsdb_head_samples_appended_total` TSDB metric. #5952
4950
* [BUGFIX] Querier: Enforce max query length check for `/api/v1/series` API even though `ignoreMaxQueryLength` is set to true. #6018

pkg/distributor/distributor.go

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/prometheus/prometheus/model/labels"
2222
"github.com/prometheus/prometheus/model/relabel"
2323
"github.com/prometheus/prometheus/scrape"
24+
"github.com/prometheus/prometheus/storage"
2425
"github.com/weaveworks/common/httpgrpc"
2526
"github.com/weaveworks/common/instrument"
2627
"github.com/weaveworks/common/user"
@@ -671,7 +672,7 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
671672
return nil, err
672673
}
673674
// If there wasn't an error but removeReplica is false that means we didn't find both HA labels.
674-
if !removeReplica {
675+
if !removeReplica { // False, Nil
675676
d.nonHASamples.WithLabelValues(userID).Add(float64(numFloatSamples + numHistogramSamples))
676677
}
677678
}
@@ -1021,7 +1022,7 @@ func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring
10211022
})
10221023
}
10231024

1024-
func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, to model.Time, labelName model.LabelName, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
1025+
func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, to model.Time, labelName model.LabelName, hints *storage.LabelHints, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
10251026
span, ctx := opentracing.StartSpanFromContext(ctx, "Distributor.LabelValues", opentracing.Tags{
10261027
"name": labelName,
10271028
"start": from.Unix(),
@@ -1032,8 +1033,8 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
10321033
if err != nil {
10331034
return nil, err
10341035
}
1035-
1036-
req, err := ingester_client.ToLabelValuesRequest(labelName, from, to, matchers)
1036+
limit := getLimitFromLabelHints(hints)
1037+
req, err := ingester_client.ToLabelValuesRequest(labelName, from, to, limit, matchers)
10371038
if err != nil {
10381039
return nil, err
10391040
}
@@ -1053,13 +1054,16 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
10531054
if err != nil {
10541055
return nil, err
10551056
}
1057+
if limit > 0 && len(r) > limit {
1058+
r = r[:limit]
1059+
}
10561060
span.SetTag("result_length", len(r))
10571061
return r, nil
10581062
}
10591063

10601064
// LabelValuesForLabelName returns all the label values that are associated with a given label name.
1061-
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
1062-
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
1065+
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
1066+
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
10631067
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
10641068
resp, err := client.LabelValues(ctx, req)
10651069
if err != nil {
@@ -1071,8 +1075,8 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode
10711075
}
10721076

10731077
// LabelValuesForLabelNameStream returns all the label values that are associated with a given label name.
1074-
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
1075-
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
1078+
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
1079+
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
10761080
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
10771081
stream, err := client.LabelValuesStream(ctx, req)
10781082
if err != nil {
@@ -1096,7 +1100,7 @@ func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, t
10961100
}, matchers...)
10971101
}
10981102

1099-
func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error)) ([]string, error) {
1103+
func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, hints *storage.LabelHints, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error)) ([]string, error) {
11001104
span, ctx := opentracing.StartSpanFromContext(ctx, "Distributor.LabelNames", opentracing.Tags{
11011105
"start": from.Unix(),
11021106
"end": to.Unix(),
@@ -1107,9 +1111,11 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
11071111
return nil, err
11081112
}
11091113

1114+
limit := getLimitFromLabelHints(hints)
11101115
req := &ingester_client.LabelNamesRequest{
11111116
StartTimestampMs: int64(from),
11121117
EndTimestampMs: int64(to),
1118+
Limit: int64(limit),
11131119
}
11141120
resps, err := f(ctx, replicationSet, req)
11151121
if err != nil {
@@ -1126,13 +1132,17 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
11261132
if err != nil {
11271133
return nil, err
11281134
}
1135+
if limit > 0 && len(r) > limit {
1136+
r = r[:limit]
1137+
}
1138+
11291139
span.SetTag("result_length", len(r))
11301140

11311141
return r, nil
11321142
}
11331143

1134-
func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time) ([]string, error) {
1135-
return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
1144+
func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time, hints *storage.LabelHints) ([]string, error) {
1145+
return d.LabelNamesCommon(ctx, from, to, hints, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
11361146
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
11371147
stream, err := client.LabelNamesStream(ctx, req)
11381148
if err != nil {
@@ -1157,8 +1167,8 @@ func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time)
11571167
}
11581168

11591169
// LabelNames returns all the label names.
1160-
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]string, error) {
1161-
return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
1170+
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint *storage.LabelHints) ([]string, error) {
1171+
return d.LabelNamesCommon(ctx, from, to, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
11621172
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
11631173
resp, err := client.LabelNames(ctx, req)
11641174
if err != nil {
@@ -1170,8 +1180,8 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]st
11701180
}
11711181

11721182
// MetricsForLabelMatchers gets the metrics that match said matchers
1173-
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]model.Metric, error) {
1174-
return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
1183+
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, hint *storage.SelectHints, matchers ...*labels.Matcher) ([]model.Metric, error) {
1184+
return d.metricsForLabelMatchersCommon(ctx, from, through, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
11751185
_, err := d.ForReplicationSet(ctx, rs, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
11761186
resp, err := client.MetricsForLabelMatchers(ctx, req)
11771187
if err != nil {
@@ -1199,8 +1209,8 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
11991209
}, matchers...)
12001210
}
12011211

1202-
func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]model.Metric, error) {
1203-
return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
1212+
func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, hint *storage.SelectHints, matchers ...*labels.Matcher) ([]model.Metric, error) {
1213+
return d.metricsForLabelMatchersCommon(ctx, from, through, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
12041214
_, err := d.ForReplicationSet(ctx, rs, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
12051215
stream, err := client.MetricsForLabelMatchersStream(ctx, req)
12061216
if err != nil {
@@ -1239,14 +1249,14 @@ func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, t
12391249
}, matchers...)
12401250
}
12411251

1242-
func (d *Distributor) metricsForLabelMatchersCommon(ctx context.Context, from, through model.Time, f func(context.Context, ring.ReplicationSet, *ingester_client.MetricsForLabelMatchersRequest, *map[model.Fingerprint]model.Metric, *sync.Mutex, *limiter.QueryLimiter) error, matchers ...*labels.Matcher) ([]model.Metric, error) {
1252+
func (d *Distributor) metricsForLabelMatchersCommon(ctx context.Context, from, through model.Time, hints *storage.SelectHints, f func(context.Context, ring.ReplicationSet, *ingester_client.MetricsForLabelMatchersRequest, *map[model.Fingerprint]model.Metric, *sync.Mutex, *limiter.QueryLimiter) error, matchers ...*labels.Matcher) ([]model.Metric, error) {
12431253
replicationSet, err := d.GetIngestersForMetadata(ctx)
12441254
queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx)
12451255
if err != nil {
12461256
return nil, err
12471257
}
12481258

1249-
req, err := ingester_client.ToMetricsForLabelMatchersRequest(from, through, matchers)
1259+
req, err := ingester_client.ToMetricsForLabelMatchersRequest(from, through, getLimitFromSelectHints(hints), matchers)
12501260
if err != nil {
12511261
return nil, err
12521262
}
@@ -1438,3 +1448,17 @@ func findHALabels(replicaLabel, clusterLabel string, labels []cortexpb.LabelAdap
14381448

14391449
return cluster, replica
14401450
}
1451+
1452+
func getLimitFromLabelHints(hints *storage.LabelHints) int {
1453+
if hints != nil {
1454+
return hints.Limit
1455+
}
1456+
return 0
1457+
}
1458+
1459+
func getLimitFromSelectHints(hints *storage.SelectHints) int {
1460+
if hints != nil {
1461+
return hints.Limit
1462+
}
1463+
return 0
1464+
}

pkg/distributor/distributor_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1904,7 +1904,7 @@ func BenchmarkDistributor_GetLabelsValues(b *testing.B) {
19041904
b.ResetTimer()
19051905
b.ReportAllocs()
19061906
for i := 0; i < b.N; i++ {
1907-
_, err := ds[0].LabelValuesForLabelName(ctx, model.Time(time.Now().UnixMilli()), model.Time(time.Now().UnixMilli()), "__name__")
1907+
_, err := ds[0].LabelValuesForLabelName(ctx, model.Time(time.Now().UnixMilli()), model.Time(time.Now().UnixMilli()), "__name__", nil)
19081908
require.NoError(b, err)
19091909
}
19101910
})
@@ -2270,7 +2270,7 @@ func TestDistributor_MetricsForLabelMatchers_SingleSlowIngester(t *testing.T) {
22702270
}
22712271

22722272
for i := 0; i < 50; i++ {
2273-
_, err := ds[0].MetricsForLabelMatchers(ctx, now, now, mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test"))
2273+
_, err := ds[0].MetricsForLabelMatchers(ctx, now, now, nil, mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test"))
22742274
require.NoError(t, err)
22752275
}
22762276
}
@@ -2439,7 +2439,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
24392439
}
24402440

24412441
{
2442-
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...)
2442+
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, nil, testData.matchers...)
24432443

24442444
if testData.expectedErr != nil {
24452445
assert.ErrorIs(t, err, testData.expectedErr)
@@ -2457,7 +2457,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
24572457
}
24582458

24592459
{
2460-
metrics, err := ds[0].MetricsForLabelMatchersStream(ctx, now, now, testData.matchers...)
2460+
metrics, err := ds[0].MetricsForLabelMatchersStream(ctx, now, now, nil, testData.matchers...)
24612461
if testData.expectedErr != nil {
24622462
assert.ErrorIs(t, err, testData.expectedErr)
24632463
return
@@ -2544,7 +2544,7 @@ func BenchmarkDistributor_MetricsForLabelMatchers(b *testing.B) {
25442544

25452545
for n := 0; n < b.N; n++ {
25462546
now := model.Now()
2547-
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, testData.matchers...)
2547+
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, nil, testData.matchers...)
25482548

25492549
if testData.expectedErr != nil {
25502550
assert.EqualError(b, err, testData.expectedErr.Error())
@@ -3197,7 +3197,7 @@ func (i *mockIngester) MetricsForLabelMatchersStream(ctx context.Context, req *c
31973197
return nil, errFail
31983198
}
31993199

3200-
_, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req)
3200+
_, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req)
32013201
if err != nil {
32023202
return nil, err
32033203
}
@@ -3229,7 +3229,7 @@ func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client.
32293229
return nil, errFail
32303230
}
32313231

3232-
_, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req)
3232+
_, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req)
32333233
if err != nil {
32343234
return nil, err
32353235
}

pkg/ingester/client/compat.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func ToQueryResponse(matrix model.Matrix) *QueryResponse {
114114
}
115115

116116
// ToMetricsForLabelMatchersRequest builds a MetricsForLabelMatchersRequest proto
117-
func ToMetricsForLabelMatchersRequest(from, to model.Time, matchers []*labels.Matcher) (*MetricsForLabelMatchersRequest, error) {
117+
func ToMetricsForLabelMatchersRequest(from, to model.Time, limit int, matchers []*labels.Matcher) (*MetricsForLabelMatchersRequest, error) {
118118
ms, err := toLabelMatchers(matchers)
119119
if err != nil {
120120
return nil, err
@@ -124,6 +124,7 @@ func ToMetricsForLabelMatchersRequest(from, to model.Time, matchers []*labels.Ma
124124
StartTimestampMs: int64(from),
125125
EndTimestampMs: int64(to),
126126
MatchersSet: []*LabelMatchers{{Matchers: ms}},
127+
Limit: int64(limit),
127128
}, nil
128129
}
129130

@@ -174,22 +175,22 @@ func SeriesSetToQueryResponse(s storage.SeriesSet) (*QueryResponse, error) {
174175
}
175176

176177
// FromMetricsForLabelMatchersRequest unpacks a MetricsForLabelMatchersRequest proto
177-
func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (model.Time, model.Time, [][]*labels.Matcher, error) {
178+
func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (model.Time, model.Time, int, [][]*labels.Matcher, error) {
178179
matchersSet := make([][]*labels.Matcher, 0, len(req.MatchersSet))
179180
for _, matchers := range req.MatchersSet {
180181
matchers, err := FromLabelMatchers(matchers.Matchers)
181182
if err != nil {
182-
return 0, 0, nil, err
183+
return 0, 0, 0, nil, err
183184
}
184185
matchersSet = append(matchersSet, matchers)
185186
}
186187
from := model.Time(req.StartTimestampMs)
187188
to := model.Time(req.EndTimestampMs)
188-
return from, to, matchersSet, nil
189+
return from, to, int(req.Limit), matchersSet, nil
189190
}
190191

191192
// ToLabelValuesRequest builds a LabelValuesRequest proto
192-
func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, matchers []*labels.Matcher) (*LabelValuesRequest, error) {
193+
func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, limit int, matchers []*labels.Matcher) (*LabelValuesRequest, error) {
193194
ms, err := toLabelMatchers(matchers)
194195
if err != nil {
195196
return nil, err
@@ -200,22 +201,23 @@ func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, matche
200201
StartTimestampMs: int64(from),
201202
EndTimestampMs: int64(to),
202203
Matchers: &LabelMatchers{Matchers: ms},
204+
Limit: int64(limit),
203205
}, nil
204206
}
205207

206208
// FromLabelValuesRequest unpacks a LabelValuesRequest proto
207-
func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, []*labels.Matcher, error) {
209+
func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, int, []*labels.Matcher, error) {
208210
var err error
209211
var matchers []*labels.Matcher
210212

211213
if req.Matchers != nil {
212214
matchers, err = FromLabelMatchers(req.Matchers.Matchers)
213215
if err != nil {
214-
return "", 0, 0, nil, err
216+
return "", 0, 0, 0, nil, err
215217
}
216218
}
217219

218-
return req.LabelName, req.StartTimestampMs, req.EndTimestampMs, matchers, nil
220+
return req.LabelName, req.StartTimestampMs, req.EndTimestampMs, int(req.Limit), matchers, nil
219221
}
220222

221223
func toLabelMatchers(matchers []*labels.Matcher) ([]*LabelMatcher, error) {

0 commit comments

Comments
 (0)