Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions pkg/querier/distributor_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type distributorQuerier struct {

// Select implements storage.Querier interface.
// The bool passed is ignored because the series is always sorted.
func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
func (q *distributorQuerier) Select(sortSeries bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
log, ctx := spanlogger.New(q.ctx, "distributorQuerier.Select")
defer log.Span.Finish()

Expand Down Expand Up @@ -116,7 +116,7 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers ..
if err != nil {
return storage.ErrSeriesSet(err)
}
return series.MetricsToSeriesSet(ms)
return series.MetricsToSeriesSet(sortSeries, ms)
}

// If queryIngestersWithin is enabled, we do manipulate the query mint to query samples up until
Expand All @@ -139,7 +139,7 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers ..
}

if q.streaming {
return q.streamingSelect(ctx, minT, maxT, matchers)
return q.streamingSelect(ctx, sortSeries, minT, maxT, matchers)
}

matrix, err := q.distributor.Query(ctx, model.Time(minT), model.Time(maxT), matchers...)
Expand All @@ -148,18 +148,20 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers ..
}

// Using MatrixToSeriesSet (and in turn NewConcreteSeriesSet), sorts the series.
return series.MatrixToSeriesSet(matrix)
return series.MatrixToSeriesSet(sortSeries, matrix)
}

func (q *distributorQuerier) streamingSelect(ctx context.Context, minT, maxT int64, matchers []*labels.Matcher) storage.SeriesSet {
func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries bool, minT, maxT int64, matchers []*labels.Matcher) storage.SeriesSet {
results, err := q.distributor.QueryStream(ctx, model.Time(minT), model.Time(maxT), matchers...)
if err != nil {
return storage.ErrSeriesSet(err)
}

// we should sort the series if we need to merge them even if sortSeries is not required by the querier
sortSeries = sortSeries || (len(results.Timeseries) > 0 && len(results.Chunkseries) > 0)
sets := []storage.SeriesSet(nil)
if len(results.Timeseries) > 0 {
sets = append(sets, newTimeSeriesSeriesSet(results.Timeseries))
sets = append(sets, newTimeSeriesSeriesSet(sortSeries, results.Timeseries))
}

serieses := make([]storage.Series, 0, len(results.Chunkseries))
Expand All @@ -170,7 +172,6 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, minT, maxT int
}

ls := cortexpb.FromLabelAdaptersToLabels(result.Labels)
sort.Sort(ls)

chunks, err := chunkcompat.FromChunks(ls, result.Chunks)
if err != nil {
Expand All @@ -187,7 +188,7 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, minT, maxT int
}

if len(serieses) > 0 {
sets = append(sets, series.NewConcreteSeriesSet(serieses))
sets = append(sets, series.NewConcreteSeriesSet(sortSeries || len(sets) > 0, serieses))
}

if len(sets) == 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/duplicates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func dedupeSorted(samples []cortexpb.Sample) []cortexpb.Sample {
}

func runPromQLAndGetJSONResult(t *testing.T, query string, ts cortexpb.TimeSeries, step time.Duration) string {
tq := &testQueryable{ts: newTimeSeriesSeriesSet([]cortexpb.TimeSeries{ts})}
tq := &testQueryable{ts: newTimeSeriesSeriesSet(true, []cortexpb.TimeSeries{ts})}

engine := promql.NewEngine(promql.EngineOpts{
Logger: log.NewNopLogger(),
Expand Down
7 changes: 4 additions & 3 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ type querier struct {

// Select implements storage.Querier interface.
// The bool passed is ignored because the series is always sorted.
func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
func (q querier) Select(sortSeries bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
log, ctx := spanlogger.New(q.ctx, "querier.Select")
defer log.Span.Finish()

Expand Down Expand Up @@ -347,7 +347,7 @@ func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Mat
}

if len(q.queriers) == 1 {
seriesSet := q.queriers[0].Select(true, sp, matchers...)
seriesSet := q.queriers[0].Select(sortSeries, sp, matchers...)

if tombstones.Len() != 0 {
seriesSet = series.NewDeletedSeriesSet(seriesSet, tombstones, model.Interval{Start: startTime, End: endTime})
Expand All @@ -359,6 +359,7 @@ func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Mat
sets := make(chan storage.SeriesSet, len(q.queriers))
for _, querier := range q.queriers {
go func(querier storage.Querier) {
// We should always select sorted here as we will need to merge the series
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is a bit misleading because I think what you meant that is "if we need to query multiple backend, then we need to merge results which depends on the result from each querier being sorted. We are not sure if each result is sorted default, so it's safer to force sort to ensure the merge can work properly"

sets <- querier.Select(true, sp, matchers...)
}(querier)
}
Expand Down Expand Up @@ -655,5 +656,5 @@ func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkI
})
}

return seriesset.NewConcreteSeriesSet(series)
return seriesset.NewConcreteSeriesSet(true, series)
}
208 changes: 188 additions & 20 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,28 @@ const (
samplesPerChunk = chunkLength / sampleRate
)

type wrappedQuerier struct {
storage.Querier
selectCallsArgs [][]interface{}
}

func (q *wrappedQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
q.selectCallsArgs = append(q.selectCallsArgs, []interface{}{sortSeries, hints, matchers})
return q.Querier.Select(sortSeries, hints, matchers...)
}

type wrappedSampleAndChunkQueryable struct {
QueryableWithFilter
queriers []*wrappedQuerier
}

func (q *wrappedSampleAndChunkQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
querier, err := q.QueryableWithFilter.Querier(ctx, mint, maxt)
wQuerier := &wrappedQuerier{Querier: querier}
q.queriers = append(q.queriers, wQuerier)
return wQuerier, err
}

type query struct {
query string
labels labels.Labels
Expand Down Expand Up @@ -132,14 +154,159 @@ var (
}
)

func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) {
start := time.Now().Add(-2 * time.Hour)
end := time.Now()
ctx := user.InjectOrgID(context.Background(), "0")
var cfg Config
flagext.DefaultValues(&cfg)
overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil)
const chunks = 1
require.NoError(t, err)

labelsSets := []labels.Labels{
{
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "order", Value: "1"},
},
{
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "order", Value: "2"},
},
}

db, samples := mockTSDB(t, labelsSets, model.Time(start.Unix()*1000), int(chunks*samplesPerChunk), sampleRate, chunkOffset, int(samplesPerChunk))
samplePairs := []model.SamplePair{}

for _, s := range samples {
samplePairs = append(samplePairs, model.SamplePair{Timestamp: model.Time(s.TimestampMs), Value: model.SampleValue(s.Value)})
}

distributor := &MockDistributor{}

unorderedResponse := client.QueryStreamResponse{
Timeseries: []cortexpb.TimeSeries{
{
Labels: []cortexpb.LabelAdapter{
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "order", Value: "2"},
},
Samples: samples,
},
{
Labels: []cortexpb.LabelAdapter{
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "order", Value: "1"},
},
Samples: samples,
},
},
}

unorderedResponseMatrix := model.Matrix{
{
Metric: util.LabelsToMetric(cortexpb.FromLabelAdaptersToLabels(unorderedResponse.Timeseries[0].Labels)),
Values: samplePairs,
},
{
Metric: util.LabelsToMetric(cortexpb.FromLabelAdaptersToLabels(unorderedResponse.Timeseries[1].Labels)),
Values: samplePairs,
},
}

distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&unorderedResponse, nil)
distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(unorderedResponseMatrix, nil)
distributorQueryableStreaming := newDistributorQueryable(distributor, true, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin)
distributorQueryable := newDistributorQueryable(distributor, false, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin)

tCases := []struct {
name string
distributorQueryable QueryableWithFilter
storeQueriables []QueryableWithFilter
sorted bool
}{
{
name: "should sort if querying 2 queryables",
distributorQueryable: distributorQueryableStreaming,
storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(db)},
sorted: true,
},
{
name: "should not sort if querying only ingesters",
distributorQueryable: distributorQueryableStreaming,
storeQueriables: []QueryableWithFilter{UseBeforeTimestampQueryable(db, start.Add(-1*time.Hour))},
sorted: false,
},
{
name: "should not sort if querying only stores",
distributorQueryable: UseBeforeTimestampQueryable(distributorQueryableStreaming, start.Add(-1*time.Hour)),
storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(db)},
sorted: false,
},
{
name: "should sort if querying 2 queryables with streaming off",
distributorQueryable: distributorQueryable,
storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(db)},
sorted: true,
},
{
name: "should not sort if querying only ingesters with streaming off",
distributorQueryable: distributorQueryable,
storeQueriables: []QueryableWithFilter{UseBeforeTimestampQueryable(db, start.Add(-1*time.Hour))},
sorted: false,
},
{
name: "should not sort if querying only stores with streaming off",
distributorQueryable: UseBeforeTimestampQueryable(distributorQueryable, start.Add(-1*time.Hour)),
storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(db)},
sorted: false,
},
}

for _, tc := range tCases {
t.Run(tc.name, func(t *testing.T) {
wDistributorQueriable := &wrappedSampleAndChunkQueryable{QueryableWithFilter: tc.distributorQueryable}
var wQueriables []QueryableWithFilter
for _, queriable := range tc.storeQueriables {
wQueriables = append(wQueriables, &wrappedSampleAndChunkQueryable{QueryableWithFilter: queriable})
}
queryable := NewQueryable(wDistributorQueriable, wQueriables, batch.NewChunkMergeIterator, cfg, overrides, purger.NewNoopTombstonesLoader())
queryTracker := promql.NewActiveQueryTracker(t.TempDir(), 10, log.NewNopLogger())

engine := promql.NewEngine(promql.EngineOpts{
Logger: log.NewNopLogger(),
ActiveQueryTracker: queryTracker,
MaxSamples: 1e6,
Timeout: 1 * time.Minute,
})

query, err := engine.NewRangeQuery(queryable, nil, "foo", start, end, 1*time.Minute)
r := query.Exec(ctx)

require.NoError(t, err)
require.Equal(t, 2, r.Value.(promql.Matrix).Len())

for _, queryable := range append(wQueriables, wDistributorQueriable) {
var wQueryable = queryable.(*wrappedSampleAndChunkQueryable)
if wQueryable.UseQueryable(time.Now(), start.Unix()*1000, end.Unix()*1000) {
require.Equal(t, tc.sorted, wQueryable.queriers[0].selectCallsArgs[0][0])
}
}
})
}
}

func TestQuerier(t *testing.T) {
var cfg Config
flagext.DefaultValues(&cfg)

const chunks = 24

// Generate TSDB head with the same samples as makeMockChunkStore.
db := mockTSDB(t, model.Time(0), int(chunks*samplesPerChunk), sampleRate, chunkOffset, int(samplesPerChunk))
lset := labels.Labels{
{Name: model.MetricNameLabel, Value: "foo"},
}
db, _ := mockTSDB(t, []labels.Labels{lset}, model.Time(0), int(chunks*samplesPerChunk), sampleRate, chunkOffset, int(samplesPerChunk))

for _, query := range queries {
for _, encoding := range encodings {
Expand All @@ -165,7 +332,7 @@ func TestQuerier(t *testing.T) {
}
}

func mockTSDB(t *testing.T, mint model.Time, samples int, step, chunkOffset time.Duration, samplesPerChunk int) storage.Queryable {
func mockTSDB(t *testing.T, labels []labels.Labels, mint model.Time, samples int, step, chunkOffset time.Duration, samplesPerChunk int) (storage.Queryable, []cortexpb.Sample) {
opts := tsdb.DefaultHeadOptions()
opts.ChunkDirRoot = t.TempDir()
// We use TSDB head only. By using full TSDB DB, and appending samples to it, closing it would cause unnecessary HEAD compaction, which slows down the test.
Expand All @@ -176,32 +343,33 @@ func mockTSDB(t *testing.T, mint model.Time, samples int, step, chunkOffset time
})

app := head.Appender(context.Background())
rSamples := []cortexpb.Sample{}

for _, lset := range labels {
cnt := 0
chunkStartTs := mint
ts := chunkStartTs
for i := 0; i < samples; i++ {
_, err := app.Append(0, lset, int64(ts), float64(ts))
rSamples = append(rSamples, cortexpb.Sample{TimestampMs: int64(ts), Value: float64(ts)})
require.NoError(t, err)
cnt++

l := labels.Labels{
{Name: model.MetricNameLabel, Value: "foo"},
}

cnt := 0
chunkStartTs := mint
ts := chunkStartTs
for i := 0; i < samples; i++ {
_, err := app.Append(0, l, int64(ts), float64(ts))
require.NoError(t, err)
cnt++

ts = ts.Add(step)
ts = ts.Add(step)

if cnt%samplesPerChunk == 0 {
// Simulate next chunk, restart timestamp.
chunkStartTs = chunkStartTs.Add(chunkOffset)
ts = chunkStartTs
if cnt%samplesPerChunk == 0 {
// Simulate next chunk, restart timestamp.
chunkStartTs = chunkStartTs.Add(chunkOffset)
ts = chunkStartTs
}
}
}

require.NoError(t, app.Commit())

return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return tsdb.NewBlockQuerier(head, mint, maxt)
})
}), rSamples
}

func TestNoHistoricalQueryToIngester(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/remote_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ type mockQuerier struct {
matrix model.Matrix
}

func (m mockQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
func (m mockQuerier) Select(sortSeries bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
if sp == nil {
panic(fmt.Errorf("select params must be set"))
}
return series.MatrixToSeriesSet(m.matrix)
return series.MatrixToSeriesSet(sortSeries, m.matrix)
}

func (m mockQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {
Expand Down
Loading