Skip to content

Commit 2e836c6

Browse files
committed
Implement metadata API limit in Ingester
1 parent 4e7dcfd commit 2e836c6

File tree

6 files changed

+291
-129
lines changed

6 files changed

+291
-129
lines changed

pkg/distributor/distributor.go

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1021,7 +1021,7 @@ func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring
10211021
})
10221022
}
10231023

1024-
func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, to model.Time, labelName model.LabelName, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
1024+
func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, to model.Time, labelName model.LabelName, limit int, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error), matchers ...*labels.Matcher) ([]string, error) {
10251025
span, ctx := opentracing.StartSpanFromContext(ctx, "Distributor.LabelValues", opentracing.Tags{
10261026
"name": labelName,
10271027
"start": from.Unix(),
@@ -1033,7 +1033,7 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
10331033
return nil, err
10341034
}
10351035

1036-
req, err := ingester_client.ToLabelValuesRequest(labelName, from, to, matchers)
1036+
req, err := ingester_client.ToLabelValuesRequest(labelName, from, to, limit, matchers)
10371037
if err != nil {
10381038
return nil, err
10391039
}
@@ -1058,8 +1058,8 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
10581058
}
10591059

10601060
// LabelValuesForLabelName returns all the label values that are associated with a given label name.
1061-
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
1062-
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
1061+
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, limit int, matchers ...*labels.Matcher) ([]string, error) {
1062+
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, limit, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
10631063
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
10641064
resp, err := client.LabelValues(ctx, req)
10651065
if err != nil {
@@ -1071,8 +1071,8 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode
10711071
}
10721072

10731073
// LabelValuesForLabelNameStream returns all the label values that are associated with a given label name.
1074-
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) {
1075-
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
1074+
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, limit int, matchers ...*labels.Matcher) ([]string, error) {
1075+
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, limit, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
10761076
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
10771077
stream, err := client.LabelValuesStream(ctx, req)
10781078
if err != nil {
@@ -1096,7 +1096,7 @@ func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, t
10961096
}, matchers...)
10971097
}
10981098

1099-
func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error)) ([]string, error) {
1099+
func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, limit int, f func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error)) ([]string, error) {
11001100
span, ctx := opentracing.StartSpanFromContext(ctx, "Distributor.LabelNames", opentracing.Tags{
11011101
"start": from.Unix(),
11021102
"end": to.Unix(),
@@ -1110,6 +1110,7 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
11101110
req := &ingester_client.LabelNamesRequest{
11111111
StartTimestampMs: int64(from),
11121112
EndTimestampMs: int64(to),
1113+
Limit: int64(limit),
11131114
}
11141115
resps, err := f(ctx, replicationSet, req)
11151116
if err != nil {
@@ -1131,8 +1132,8 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
11311132
return r, nil
11321133
}
11331134

1134-
func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time) ([]string, error) {
1135-
return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
1135+
func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time, limit int) ([]string, error) {
1136+
return d.LabelNamesCommon(ctx, from, to, limit, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
11361137
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
11371138
stream, err := client.LabelNamesStream(ctx, req)
11381139
if err != nil {
@@ -1157,8 +1158,8 @@ func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time)
11571158
}
11581159

11591160
// LabelNames returns all the label names.
1160-
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]string, error) {
1161-
return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
1161+
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, limit int) ([]string, error) {
1162+
return d.LabelNamesCommon(ctx, from, to, limit, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
11621163
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
11631164
resp, err := client.LabelNames(ctx, req)
11641165
if err != nil {
@@ -1170,8 +1171,8 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]st
11701171
}
11711172

11721173
// MetricsForLabelMatchers gets the metrics that match said matchers
1173-
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]model.Metric, error) {
1174-
return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
1174+
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, limit int, matchers ...*labels.Matcher) ([]model.Metric, error) {
1175+
return d.metricsForLabelMatchersCommon(ctx, from, through, limit, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
11751176
_, err := d.ForReplicationSet(ctx, rs, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
11761177
resp, err := client.MetricsForLabelMatchers(ctx, req)
11771178
if err != nil {
@@ -1199,8 +1200,8 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
11991200
}, matchers...)
12001201
}
12011202

1202-
func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]model.Metric, error) {
1203-
return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
1203+
func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, limit int, matchers ...*labels.Matcher) ([]model.Metric, error) {
1204+
return d.metricsForLabelMatchersCommon(ctx, from, through, limit, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
12041205
_, err := d.ForReplicationSet(ctx, rs, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
12051206
stream, err := client.MetricsForLabelMatchersStream(ctx, req)
12061207
if err != nil {
@@ -1239,14 +1240,14 @@ func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, t
12391240
}, matchers...)
12401241
}
12411242

1242-
func (d *Distributor) metricsForLabelMatchersCommon(ctx context.Context, from, through model.Time, f func(context.Context, ring.ReplicationSet, *ingester_client.MetricsForLabelMatchersRequest, *map[model.Fingerprint]model.Metric, *sync.Mutex, *limiter.QueryLimiter) error, matchers ...*labels.Matcher) ([]model.Metric, error) {
1243+
func (d *Distributor) metricsForLabelMatchersCommon(ctx context.Context, from, through model.Time, limit int, f func(context.Context, ring.ReplicationSet, *ingester_client.MetricsForLabelMatchersRequest, *map[model.Fingerprint]model.Metric, *sync.Mutex, *limiter.QueryLimiter) error, matchers ...*labels.Matcher) ([]model.Metric, error) {
12431244
replicationSet, err := d.GetIngestersForMetadata(ctx)
12441245
queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx)
12451246
if err != nil {
12461247
return nil, err
12471248
}
12481249

1249-
req, err := ingester_client.ToMetricsForLabelMatchersRequest(from, through, matchers)
1250+
req, err := ingester_client.ToMetricsForLabelMatchersRequest(from, through, limit, matchers)
12501251
if err != nil {
12511252
return nil, err
12521253
}

pkg/ingester/client/compat.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func ToQueryResponse(matrix model.Matrix) *QueryResponse {
114114
}
115115

116116
// ToMetricsForLabelMatchersRequest builds a MetricsForLabelMatchersRequest proto
117-
func ToMetricsForLabelMatchersRequest(from, to model.Time, matchers []*labels.Matcher) (*MetricsForLabelMatchersRequest, error) {
117+
func ToMetricsForLabelMatchersRequest(from, to model.Time, limit int, matchers []*labels.Matcher) (*MetricsForLabelMatchersRequest, error) {
118118
ms, err := toLabelMatchers(matchers)
119119
if err != nil {
120120
return nil, err
@@ -124,6 +124,7 @@ func ToMetricsForLabelMatchersRequest(from, to model.Time, matchers []*labels.Ma
124124
StartTimestampMs: int64(from),
125125
EndTimestampMs: int64(to),
126126
MatchersSet: []*LabelMatchers{{Matchers: ms}},
127+
Limit: int64(limit),
127128
}, nil
128129
}
129130

@@ -174,22 +175,22 @@ func SeriesSetToQueryResponse(s storage.SeriesSet) (*QueryResponse, error) {
174175
}
175176

176177
// FromMetricsForLabelMatchersRequest unpacks a MetricsForLabelMatchersRequest proto
177-
func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (model.Time, model.Time, [][]*labels.Matcher, error) {
178+
func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (model.Time, model.Time, int, [][]*labels.Matcher, error) {
178179
matchersSet := make([][]*labels.Matcher, 0, len(req.MatchersSet))
179180
for _, matchers := range req.MatchersSet {
180181
matchers, err := FromLabelMatchers(matchers.Matchers)
181182
if err != nil {
182-
return 0, 0, nil, err
183+
return 0, 0, 0, nil, err
183184
}
184185
matchersSet = append(matchersSet, matchers)
185186
}
186187
from := model.Time(req.StartTimestampMs)
187188
to := model.Time(req.EndTimestampMs)
188-
return from, to, matchersSet, nil
189+
return from, to, int(req.Limit), matchersSet, nil
189190
}
190191

191192
// ToLabelValuesRequest builds a LabelValuesRequest proto
192-
func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, matchers []*labels.Matcher) (*LabelValuesRequest, error) {
193+
func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, limit int, matchers []*labels.Matcher) (*LabelValuesRequest, error) {
193194
ms, err := toLabelMatchers(matchers)
194195
if err != nil {
195196
return nil, err
@@ -200,22 +201,23 @@ func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, matche
200201
StartTimestampMs: int64(from),
201202
EndTimestampMs: int64(to),
202203
Matchers: &LabelMatchers{Matchers: ms},
204+
Limit: int64(limit),
203205
}, nil
204206
}
205207

206208
// FromLabelValuesRequest unpacks a LabelValuesRequest proto
207-
func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, []*labels.Matcher, error) {
209+
func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, int, []*labels.Matcher, error) {
208210
var err error
209211
var matchers []*labels.Matcher
210212

211213
if req.Matchers != nil {
212214
matchers, err = FromLabelMatchers(req.Matchers.Matchers)
213215
if err != nil {
214-
return "", 0, 0, nil, err
216+
return "", 0, 0, 0, nil, err
215217
}
216218
}
217219

218-
return req.LabelName, req.StartTimestampMs, req.EndTimestampMs, matchers, nil
220+
return req.LabelName, req.StartTimestampMs, req.EndTimestampMs, int(req.Limit), matchers, nil
219221
}
220222

221223
func toLabelMatchers(matchers []*labels.Matcher) ([]*LabelMatcher, error) {

0 commit comments

Comments
 (0)