Skip to content

Commit c167f4a

Browse files
committed
Refactor how partial data info is passed + apply to series and label methods as well
Signed-off-by: Justin Jung <[email protected]>
1 parent 8d50cea commit c167f4a

10 files changed

+142
-107
lines changed

pkg/distributor/distributor.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,8 +1177,8 @@ func getErrorStatus(err error) string {
11771177
}
11781178

11791179
// ForReplicationSet runs f, in parallel, for all ingesters in the input replication set.
1180-
func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, zoneResultsQuorum bool, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
1181-
return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, zoneResultsQuorum, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
1180+
func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, zoneResultsQuorum bool, partialDataEnabled bool, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error) {
1181+
return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, zoneResultsQuorum, partialDataEnabled, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
11821182
client, err := d.ingesterPool.GetClientFor(ing.Addr)
11831183
if err != nil {
11841184
return nil, err
@@ -1228,9 +1228,9 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t
12281228
}
12291229

12301230
// LabelValuesForLabelName returns all the label values that are associated with a given label name.
1231-
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
1232-
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
1233-
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1231+
func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, label model.LabelName, hint *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
1232+
return d.LabelValuesForLabelNameCommon(ctx, from, to, label, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
1233+
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
12341234
resp, err := client.LabelValues(ctx, req)
12351235
if err != nil {
12361236
return nil, err
@@ -1241,9 +1241,9 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode
12411241
}
12421242

12431243
// LabelValuesForLabelNameStream returns all the label values that are associated with a given label name.
1244-
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, hint *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
1245-
return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
1246-
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1244+
func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, label model.LabelName, hint *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
1245+
return d.LabelValuesForLabelNameCommon(ctx, from, to, label, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) {
1246+
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
12471247
stream, err := client.LabelValuesStream(ctx, req)
12481248
if err != nil {
12491249
return nil, err
@@ -1307,9 +1307,9 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time,
13071307
return r, nil
13081308
}
13091309

1310-
func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
1310+
func (d *Distributor) LabelNamesStream(ctx context.Context, from model.Time, to model.Time, hints *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
13111311
return d.LabelNamesCommon(ctx, from, to, hints, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
1312-
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1312+
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
13131313
stream, err := client.LabelNamesStream(ctx, req)
13141314
if err != nil {
13151315
return nil, err
@@ -1333,9 +1333,9 @@ func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time,
13331333
}
13341334

13351335
// LabelNames returns all the label names.
1336-
func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint *storage.LabelHints, matchers ...*labels.Matcher) ([]string, error) {
1336+
func (d *Distributor) LabelNames(ctx context.Context, from model.Time, to model.Time, hint *storage.LabelHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]string, error) {
13371337
return d.LabelNamesCommon(ctx, from, to, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) {
1338-
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1338+
return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, partialDataEnabled, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
13391339
resp, err := client.LabelNames(ctx, req)
13401340
if err != nil {
13411341
return nil, err
@@ -1346,9 +1346,9 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time, hint
13461346
}
13471347

13481348
// MetricsForLabelMatchers gets the metrics that match said matchers
1349-
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, hint *storage.SelectHints, matchers ...*labels.Matcher) ([]model.Metric, error) {
1349+
func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, hint *storage.SelectHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]model.Metric, error) {
13501350
return d.metricsForLabelMatchersCommon(ctx, from, through, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
1351-
_, err := d.ForReplicationSet(ctx, rs, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1351+
_, err := d.ForReplicationSet(ctx, rs, false, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
13521352
resp, err := client.MetricsForLabelMatchers(ctx, req)
13531353
if err != nil {
13541354
return nil, err
@@ -1375,9 +1375,9 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through
13751375
}, matchers...)
13761376
}
13771377

1378-
func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, hint *storage.SelectHints, matchers ...*labels.Matcher) ([]model.Metric, error) {
1378+
func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, hint *storage.SelectHints, partialDataEnabled bool, matchers ...*labels.Matcher) ([]model.Metric, error) {
13791379
return d.metricsForLabelMatchersCommon(ctx, from, through, hint, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error {
1380-
_, err := d.ForReplicationSet(ctx, rs, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1380+
_, err := d.ForReplicationSet(ctx, rs, false, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
13811381
stream, err := client.MetricsForLabelMatchersStream(ctx, req)
13821382
if err != nil {
13831383
return nil, err
@@ -1453,7 +1453,7 @@ func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetad
14531453

14541454
req := &ingester_client.MetricsMetadataRequest{}
14551455
// TODO(gotjosh): We only need to look in all the ingesters if shardByAllLabels is enabled.
1456-
resps, err := d.ForReplicationSet(ctx, replicationSet, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1456+
resps, err := d.ForReplicationSet(ctx, replicationSet, d.cfg.ZoneResultsQuorumMetadata, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
14571457
return client.MetricsMetadata(ctx, req)
14581458
})
14591459
if err != nil {
@@ -1495,7 +1495,7 @@ func (d *Distributor) UserStats(ctx context.Context) (*ingester.UserStats, error
14951495
replicationSet.MaxErrors = 0
14961496

14971497
req := &ingester_client.UserStatsRequest{}
1498-
resps, err := d.ForReplicationSet(ctx, replicationSet, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
1498+
resps, err := d.ForReplicationSet(ctx, replicationSet, false, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) {
14991499
return client.UserStats(ctx, req)
15001500
})
15011501
if err != nil {

pkg/distributor/distributor_test.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1316,7 +1316,7 @@ func TestDistributor_PushQuery(t *testing.T) {
13161316
assert.Nil(t, err)
13171317

13181318
var response model.Matrix
1319-
series, err := ds[0].QueryStream(ctx, 0, 10, tc.matchers...)
1319+
series, err := ds[0].QueryStream(ctx, 0, 10, false, tc.matchers...)
13201320
assert.Equal(t, tc.expectedError, err)
13211321

13221322
if series == nil {
@@ -1378,7 +1378,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac
13781378

13791379
// Since the number of series (and thus chunks) is equal to the limit (but doesn't
13801380
// exceed it), we expect a query running on all series to succeed.
1381-
queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
1381+
queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, false, allSeriesMatchers...)
13821382
require.NoError(t, err)
13831383
assert.Len(t, queryRes.Chunkseries, initialSeries)
13841384

@@ -1396,7 +1396,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunksPerQueryLimitIsReac
13961396

13971397
// Since the number of series (and thus chunks) is exceeding to the limit, we expect
13981398
// a query running on all series to fail.
1399-
_, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
1399+
_, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, false, allSeriesMatchers...)
14001400
require.Error(t, err)
14011401
assert.Contains(t, err.Error(), "the query hit the max number of chunks limit")
14021402
}
@@ -1440,7 +1440,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac
14401440

14411441
// Since the number of series is equal to the limit (but doesn't
14421442
// exceed it), we expect a query running on all series to succeed.
1443-
queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
1443+
queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, false, allSeriesMatchers...)
14441444
require.NoError(t, err)
14451445
assert.Len(t, queryRes.Chunkseries, initialSeries)
14461446

@@ -1456,7 +1456,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxSeriesPerQueryLimitIsReac
14561456

14571457
// Since the number of series is exceeding the limit, we expect
14581458
// a query running on all series to fail.
1459-
_, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
1459+
_, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, false, allSeriesMatchers...)
14601460
require.Error(t, err)
14611461
assert.Contains(t, err.Error(), "max number of series limit")
14621462
}
@@ -1494,7 +1494,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
14941494
writeRes, err := ds[0].Push(ctx, writeReq)
14951495
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
14961496
assert.Nil(t, err)
1497-
chunkSizeResponse, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
1497+
chunkSizeResponse, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, false, allSeriesMatchers...)
14981498
require.NoError(t, err)
14991499

15001500
// Use the resulting chunks size to calculate the limit as (series to add + our test series) * the response chunk size.
@@ -1516,7 +1516,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
15161516

15171517
// Since the number of chunk bytes is equal to the limit (but doesn't
15181518
// exceed it), we expect a query running on all series to succeed.
1519-
queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
1519+
queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, false, allSeriesMatchers...)
15201520
require.NoError(t, err)
15211521
assert.Len(t, queryRes.Chunkseries, seriesToAdd)
15221522

@@ -1532,7 +1532,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxChunkBytesPerQueryLimitIs
15321532

15331533
// Since the aggregated chunk size is exceeding the limit, we expect
15341534
// a query running on all series to fail.
1535-
_, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
1535+
_, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, false, allSeriesMatchers...)
15361536
require.Error(t, err)
15371537
assert.Equal(t, err, validation.LimitError(fmt.Sprintf(limiter.ErrMaxChunkBytesHit, maxBytesLimit)))
15381538
}
@@ -1571,7 +1571,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxDataBytesPerQueryLimitIsR
15711571
writeRes, err := ds[0].Push(ctx, writeReq)
15721572
assert.Equal(t, &cortexpb.WriteResponse{}, writeRes)
15731573
assert.Nil(t, err)
1574-
dataSizeResponse, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
1574+
dataSizeResponse, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, false, allSeriesMatchers...)
15751575
require.NoError(t, err)
15761576

15771577
// Use the resulting chunks size to calculate the limit as (series to add + our test series) * the response chunk size.
@@ -1593,7 +1593,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxDataBytesPerQueryLimitIsR
15931593

15941594
// Since the number of chunk bytes is equal to the limit (but doesn't
15951595
// exceed it), we expect a query running on all series to succeed.
1596-
queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
1596+
queryRes, err := ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, false, allSeriesMatchers...)
15971597
require.NoError(t, err)
15981598
assert.Len(t, queryRes.Chunkseries, seriesToAdd)
15991599

@@ -1609,7 +1609,7 @@ func TestDistributor_QueryStream_ShouldReturnErrorIfMaxDataBytesPerQueryLimitIsR
16091609

16101610
// Since the aggregated chunk size is exceeding the limit, we expect
16111611
// a query running on all series to fail.
1612-
_, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, allSeriesMatchers...)
1612+
_, err = ds[0].QueryStream(ctx, math.MinInt32, math.MaxInt32, false, allSeriesMatchers...)
16131613
require.Error(t, err)
16141614
assert.Equal(t, err, validation.LimitError(fmt.Sprintf(limiter.ErrMaxDataBytesHit, maxBytesLimit)))
16151615
}
@@ -2065,7 +2065,7 @@ func BenchmarkDistributor_GetLabelsValues(b *testing.B) {
20652065
b.ResetTimer()
20662066
b.ReportAllocs()
20672067
for i := 0; i < b.N; i++ {
2068-
_, err := ds[0].LabelValuesForLabelName(ctx, model.Time(time.Now().UnixMilli()), model.Time(time.Now().UnixMilli()), "__name__", nil)
2068+
_, err := ds[0].LabelValuesForLabelName(ctx, model.Time(time.Now().UnixMilli()), model.Time(time.Now().UnixMilli()), "__name__", nil, false)
20692069
require.NoError(b, err)
20702070
}
20712071
})
@@ -2397,7 +2397,7 @@ func TestSlowQueries(t *testing.T) {
23972397
shardByAllLabels: shardByAllLabels,
23982398
})
23992399

2400-
_, err := ds[0].QueryStream(ctx, 0, 10, nameMatcher)
2400+
_, err := ds[0].QueryStream(ctx, 0, 10, false, nameMatcher)
24012401
assert.Equal(t, expectedErr, err)
24022402
})
24032403
}
@@ -2431,7 +2431,7 @@ func TestDistributor_MetricsForLabelMatchers_SingleSlowIngester(t *testing.T) {
24312431
}
24322432

24332433
for i := 0; i < 50; i++ {
2434-
_, err := ds[0].MetricsForLabelMatchers(ctx, now, now, nil, mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test"))
2434+
_, err := ds[0].MetricsForLabelMatchers(ctx, now, now, nil, false, mustNewMatcher(labels.MatchEqual, model.MetricNameLabel, "test"))
24352435
require.NoError(t, err)
24362436
}
24372437
}
@@ -2600,7 +2600,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
26002600
}
26012601

26022602
{
2603-
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, nil, testData.matchers...)
2603+
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, nil, false, testData.matchers...)
26042604

26052605
if testData.expectedErr != nil {
26062606
assert.ErrorIs(t, err, testData.expectedErr)
@@ -2618,7 +2618,7 @@ func TestDistributor_MetricsForLabelMatchers(t *testing.T) {
26182618
}
26192619

26202620
{
2621-
metrics, err := ds[0].MetricsForLabelMatchersStream(ctx, now, now, nil, testData.matchers...)
2621+
metrics, err := ds[0].MetricsForLabelMatchersStream(ctx, now, now, nil, false, testData.matchers...)
26222622
if testData.expectedErr != nil {
26232623
assert.ErrorIs(t, err, testData.expectedErr)
26242624
return
@@ -2705,7 +2705,7 @@ func BenchmarkDistributor_MetricsForLabelMatchers(b *testing.B) {
27052705

27062706
for n := 0; n < b.N; n++ {
27072707
now := model.Now()
2708-
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, nil, testData.matchers...)
2708+
metrics, err := ds[0].MetricsForLabelMatchers(ctx, now, now, nil, false, testData.matchers...)
27092709

27102710
if testData.expectedErr != nil {
27112711
assert.EqualError(b, err, testData.expectedErr.Error())

0 commit comments

Comments
 (0)