Skip to content

Commit 01dc235

Browse files
authored
Using sortSeries from Select to decided to sort the response on DistributorQueryable (#4869)
* respecting sortSeries Signed-off-by: Alan Protasio <[email protected]> * pass down gs Signed-off-by: Alan Protasio <[email protected]> * We should pass the flag onq querier only if we are not merging different queriables Signed-off-by: Alan Protasio <[email protected]> * Improving test Signed-off-by: Alan Protasio <[email protected]> * Addressing comments Signed-off-by: Alan Protasio <[email protected]> * Tests with query streaming off Signed-off-by: Alan Protasio <[email protected]> Signed-off-by: Alan Protasio <[email protected]>
1 parent afbcd21 commit 01dc235

File tree

12 files changed

+227
-52
lines changed

12 files changed

+227
-52
lines changed

pkg/querier/distributor_queryable.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ type distributorQuerier struct {
8585

8686
// Select implements storage.Querier interface.
8787
// The bool passed is ignored because the series is always sorted.
88-
func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
88+
func (q *distributorQuerier) Select(sortSeries bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
8989
log, ctx := spanlogger.New(q.ctx, "distributorQuerier.Select")
9090
defer log.Span.Finish()
9191

@@ -116,7 +116,7 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers ..
116116
if err != nil {
117117
return storage.ErrSeriesSet(err)
118118
}
119-
return series.MetricsToSeriesSet(ms)
119+
return series.MetricsToSeriesSet(sortSeries, ms)
120120
}
121121

122122
// If queryIngestersWithin is enabled, we do manipulate the query mint to query samples up until
@@ -139,7 +139,7 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers ..
139139
}
140140

141141
if q.streaming {
142-
return q.streamingSelect(ctx, minT, maxT, matchers)
142+
return q.streamingSelect(ctx, sortSeries, minT, maxT, matchers)
143143
}
144144

145145
matrix, err := q.distributor.Query(ctx, model.Time(minT), model.Time(maxT), matchers...)
@@ -148,18 +148,20 @@ func (q *distributorQuerier) Select(_ bool, sp *storage.SelectHints, matchers ..
148148
}
149149

150150
// Using MatrixToSeriesSet (and in turn NewConcreteSeriesSet), sorts the series.
151-
return series.MatrixToSeriesSet(matrix)
151+
return series.MatrixToSeriesSet(sortSeries, matrix)
152152
}
153153

154-
func (q *distributorQuerier) streamingSelect(ctx context.Context, minT, maxT int64, matchers []*labels.Matcher) storage.SeriesSet {
154+
func (q *distributorQuerier) streamingSelect(ctx context.Context, sortSeries bool, minT, maxT int64, matchers []*labels.Matcher) storage.SeriesSet {
155155
results, err := q.distributor.QueryStream(ctx, model.Time(minT), model.Time(maxT), matchers...)
156156
if err != nil {
157157
return storage.ErrSeriesSet(err)
158158
}
159159

160+
// we should sort the series if we need to merge them even if sortSeries is not required by the querier
161+
sortSeries = sortSeries || (len(results.Timeseries) > 0 && len(results.Chunkseries) > 0)
160162
sets := []storage.SeriesSet(nil)
161163
if len(results.Timeseries) > 0 {
162-
sets = append(sets, newTimeSeriesSeriesSet(results.Timeseries))
164+
sets = append(sets, newTimeSeriesSeriesSet(sortSeries, results.Timeseries))
163165
}
164166

165167
serieses := make([]storage.Series, 0, len(results.Chunkseries))
@@ -170,7 +172,6 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, minT, maxT int
170172
}
171173

172174
ls := cortexpb.FromLabelAdaptersToLabels(result.Labels)
173-
sort.Sort(ls)
174175

175176
chunks, err := chunkcompat.FromChunks(ls, result.Chunks)
176177
if err != nil {
@@ -187,7 +188,7 @@ func (q *distributorQuerier) streamingSelect(ctx context.Context, minT, maxT int
187188
}
188189

189190
if len(serieses) > 0 {
190-
sets = append(sets, series.NewConcreteSeriesSet(serieses))
191+
sets = append(sets, series.NewConcreteSeriesSet(sortSeries || len(sets) > 0, serieses))
191192
}
192193

193194
if len(sets) == 0 {

pkg/querier/duplicates_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func dedupeSorted(samples []cortexpb.Sample) []cortexpb.Sample {
8585
}
8686

8787
func runPromQLAndGetJSONResult(t *testing.T, query string, ts cortexpb.TimeSeries, step time.Duration) string {
88-
tq := &testQueryable{ts: newTimeSeriesSeriesSet([]cortexpb.TimeSeries{ts})}
88+
tq := &testQueryable{ts: newTimeSeriesSeriesSet(true, []cortexpb.TimeSeries{ts})}
8989

9090
engine := promql.NewEngine(promql.EngineOpts{
9191
Logger: log.NewNopLogger(),

pkg/querier/querier.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ type querier struct {
288288

289289
// Select implements storage.Querier interface.
290290
// The bool passed is ignored because the series is always sorted.
291-
func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
291+
func (q querier) Select(sortSeries bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
292292
log, ctx := spanlogger.New(q.ctx, "querier.Select")
293293
defer log.Span.Finish()
294294

@@ -347,7 +347,7 @@ func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Mat
347347
}
348348

349349
if len(q.queriers) == 1 {
350-
seriesSet := q.queriers[0].Select(true, sp, matchers...)
350+
seriesSet := q.queriers[0].Select(sortSeries, sp, matchers...)
351351

352352
if tombstones.Len() != 0 {
353353
seriesSet = series.NewDeletedSeriesSet(seriesSet, tombstones, model.Interval{Start: startTime, End: endTime})
@@ -359,6 +359,7 @@ func (q querier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Mat
359359
sets := make(chan storage.SeriesSet, len(q.queriers))
360360
for _, querier := range q.queriers {
361361
go func(querier storage.Querier) {
362+
// We should always select sorted here as we will need to merge the series
362363
sets <- querier.Select(true, sp, matchers...)
363364
}(querier)
364365
}
@@ -655,5 +656,5 @@ func partitionChunks(chunks []chunk.Chunk, mint, maxt int64, iteratorFunc chunkI
655656
})
656657
}
657658

658-
return seriesset.NewConcreteSeriesSet(series)
659+
return seriesset.NewConcreteSeriesSet(true, series)
659660
}

pkg/querier/querier_test.go

Lines changed: 188 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,28 @@ const (
4646
samplesPerChunk = chunkLength / sampleRate
4747
)
4848

49+
type wrappedQuerier struct {
50+
storage.Querier
51+
selectCallsArgs [][]interface{}
52+
}
53+
54+
func (q *wrappedQuerier) Select(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
55+
q.selectCallsArgs = append(q.selectCallsArgs, []interface{}{sortSeries, hints, matchers})
56+
return q.Querier.Select(sortSeries, hints, matchers...)
57+
}
58+
59+
type wrappedSampleAndChunkQueryable struct {
60+
QueryableWithFilter
61+
queriers []*wrappedQuerier
62+
}
63+
64+
func (q *wrappedSampleAndChunkQueryable) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
65+
querier, err := q.QueryableWithFilter.Querier(ctx, mint, maxt)
66+
wQuerier := &wrappedQuerier{Querier: querier}
67+
q.queriers = append(q.queriers, wQuerier)
68+
return wQuerier, err
69+
}
70+
4971
type query struct {
5072
query string
5173
labels labels.Labels
@@ -132,14 +154,159 @@ var (
132154
}
133155
)
134156

157+
func TestShouldSortSeriesIfQueryingMultipleQueryables(t *testing.T) {
158+
start := time.Now().Add(-2 * time.Hour)
159+
end := time.Now()
160+
ctx := user.InjectOrgID(context.Background(), "0")
161+
var cfg Config
162+
flagext.DefaultValues(&cfg)
163+
overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil)
164+
const chunks = 1
165+
require.NoError(t, err)
166+
167+
labelsSets := []labels.Labels{
168+
{
169+
{Name: model.MetricNameLabel, Value: "foo"},
170+
{Name: "order", Value: "1"},
171+
},
172+
{
173+
{Name: model.MetricNameLabel, Value: "foo"},
174+
{Name: "order", Value: "2"},
175+
},
176+
}
177+
178+
db, samples := mockTSDB(t, labelsSets, model.Time(start.Unix()*1000), int(chunks*samplesPerChunk), sampleRate, chunkOffset, int(samplesPerChunk))
179+
samplePairs := []model.SamplePair{}
180+
181+
for _, s := range samples {
182+
samplePairs = append(samplePairs, model.SamplePair{Timestamp: model.Time(s.TimestampMs), Value: model.SampleValue(s.Value)})
183+
}
184+
185+
distributor := &MockDistributor{}
186+
187+
unorderedResponse := client.QueryStreamResponse{
188+
Timeseries: []cortexpb.TimeSeries{
189+
{
190+
Labels: []cortexpb.LabelAdapter{
191+
{Name: model.MetricNameLabel, Value: "foo"},
192+
{Name: "order", Value: "2"},
193+
},
194+
Samples: samples,
195+
},
196+
{
197+
Labels: []cortexpb.LabelAdapter{
198+
{Name: model.MetricNameLabel, Value: "foo"},
199+
{Name: "order", Value: "1"},
200+
},
201+
Samples: samples,
202+
},
203+
},
204+
}
205+
206+
unorderedResponseMatrix := model.Matrix{
207+
{
208+
Metric: util.LabelsToMetric(cortexpb.FromLabelAdaptersToLabels(unorderedResponse.Timeseries[0].Labels)),
209+
Values: samplePairs,
210+
},
211+
{
212+
Metric: util.LabelsToMetric(cortexpb.FromLabelAdaptersToLabels(unorderedResponse.Timeseries[1].Labels)),
213+
Values: samplePairs,
214+
},
215+
}
216+
217+
distributor.On("QueryStream", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&unorderedResponse, nil)
218+
distributor.On("Query", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(unorderedResponseMatrix, nil)
219+
distributorQueryableStreaming := newDistributorQueryable(distributor, true, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin)
220+
distributorQueryable := newDistributorQueryable(distributor, false, cfg.IngesterMetadataStreaming, batch.NewChunkMergeIterator, cfg.QueryIngestersWithin)
221+
222+
tCases := []struct {
223+
name string
224+
distributorQueryable QueryableWithFilter
225+
storeQueriables []QueryableWithFilter
226+
sorted bool
227+
}{
228+
{
229+
name: "should sort if querying 2 queryables",
230+
distributorQueryable: distributorQueryableStreaming,
231+
storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(db)},
232+
sorted: true,
233+
},
234+
{
235+
name: "should not sort if querying only ingesters",
236+
distributorQueryable: distributorQueryableStreaming,
237+
storeQueriables: []QueryableWithFilter{UseBeforeTimestampQueryable(db, start.Add(-1*time.Hour))},
238+
sorted: false,
239+
},
240+
{
241+
name: "should not sort if querying only stores",
242+
distributorQueryable: UseBeforeTimestampQueryable(distributorQueryableStreaming, start.Add(-1*time.Hour)),
243+
storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(db)},
244+
sorted: false,
245+
},
246+
{
247+
name: "should sort if querying 2 queryables with streaming off",
248+
distributorQueryable: distributorQueryable,
249+
storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(db)},
250+
sorted: true,
251+
},
252+
{
253+
name: "should not sort if querying only ingesters with streaming off",
254+
distributorQueryable: distributorQueryable,
255+
storeQueriables: []QueryableWithFilter{UseBeforeTimestampQueryable(db, start.Add(-1*time.Hour))},
256+
sorted: false,
257+
},
258+
{
259+
name: "should not sort if querying only stores with streaming off",
260+
distributorQueryable: UseBeforeTimestampQueryable(distributorQueryable, start.Add(-1*time.Hour)),
261+
storeQueriables: []QueryableWithFilter{UseAlwaysQueryable(db)},
262+
sorted: false,
263+
},
264+
}
265+
266+
for _, tc := range tCases {
267+
t.Run(tc.name, func(t *testing.T) {
268+
wDistributorQueriable := &wrappedSampleAndChunkQueryable{QueryableWithFilter: tc.distributorQueryable}
269+
var wQueriables []QueryableWithFilter
270+
for _, queriable := range tc.storeQueriables {
271+
wQueriables = append(wQueriables, &wrappedSampleAndChunkQueryable{QueryableWithFilter: queriable})
272+
}
273+
queryable := NewQueryable(wDistributorQueriable, wQueriables, batch.NewChunkMergeIterator, cfg, overrides, purger.NewNoopTombstonesLoader())
274+
queryTracker := promql.NewActiveQueryTracker(t.TempDir(), 10, log.NewNopLogger())
275+
276+
engine := promql.NewEngine(promql.EngineOpts{
277+
Logger: log.NewNopLogger(),
278+
ActiveQueryTracker: queryTracker,
279+
MaxSamples: 1e6,
280+
Timeout: 1 * time.Minute,
281+
})
282+
283+
query, err := engine.NewRangeQuery(queryable, nil, "foo", start, end, 1*time.Minute)
284+
r := query.Exec(ctx)
285+
286+
require.NoError(t, err)
287+
require.Equal(t, 2, r.Value.(promql.Matrix).Len())
288+
289+
for _, queryable := range append(wQueriables, wDistributorQueriable) {
290+
var wQueryable = queryable.(*wrappedSampleAndChunkQueryable)
291+
if wQueryable.UseQueryable(time.Now(), start.Unix()*1000, end.Unix()*1000) {
292+
require.Equal(t, tc.sorted, wQueryable.queriers[0].selectCallsArgs[0][0])
293+
}
294+
}
295+
})
296+
}
297+
}
298+
135299
func TestQuerier(t *testing.T) {
136300
var cfg Config
137301
flagext.DefaultValues(&cfg)
138302

139303
const chunks = 24
140304

141305
// Generate TSDB head with the same samples as makeMockChunkStore.
142-
db := mockTSDB(t, model.Time(0), int(chunks*samplesPerChunk), sampleRate, chunkOffset, int(samplesPerChunk))
306+
lset := labels.Labels{
307+
{Name: model.MetricNameLabel, Value: "foo"},
308+
}
309+
db, _ := mockTSDB(t, []labels.Labels{lset}, model.Time(0), int(chunks*samplesPerChunk), sampleRate, chunkOffset, int(samplesPerChunk))
143310

144311
for _, query := range queries {
145312
for _, encoding := range encodings {
@@ -165,7 +332,7 @@ func TestQuerier(t *testing.T) {
165332
}
166333
}
167334

168-
func mockTSDB(t *testing.T, mint model.Time, samples int, step, chunkOffset time.Duration, samplesPerChunk int) storage.Queryable {
335+
func mockTSDB(t *testing.T, labels []labels.Labels, mint model.Time, samples int, step, chunkOffset time.Duration, samplesPerChunk int) (storage.Queryable, []cortexpb.Sample) {
169336
opts := tsdb.DefaultHeadOptions()
170337
opts.ChunkDirRoot = t.TempDir()
171338
// We use TSDB head only. By using full TSDB DB, and appending samples to it, closing it would cause unnecessary HEAD compaction, which slows down the test.
@@ -176,32 +343,33 @@ func mockTSDB(t *testing.T, mint model.Time, samples int, step, chunkOffset time
176343
})
177344

178345
app := head.Appender(context.Background())
346+
rSamples := []cortexpb.Sample{}
347+
348+
for _, lset := range labels {
349+
cnt := 0
350+
chunkStartTs := mint
351+
ts := chunkStartTs
352+
for i := 0; i < samples; i++ {
353+
_, err := app.Append(0, lset, int64(ts), float64(ts))
354+
rSamples = append(rSamples, cortexpb.Sample{TimestampMs: int64(ts), Value: float64(ts)})
355+
require.NoError(t, err)
356+
cnt++
179357

180-
l := labels.Labels{
181-
{Name: model.MetricNameLabel, Value: "foo"},
182-
}
183-
184-
cnt := 0
185-
chunkStartTs := mint
186-
ts := chunkStartTs
187-
for i := 0; i < samples; i++ {
188-
_, err := app.Append(0, l, int64(ts), float64(ts))
189-
require.NoError(t, err)
190-
cnt++
191-
192-
ts = ts.Add(step)
358+
ts = ts.Add(step)
193359

194-
if cnt%samplesPerChunk == 0 {
195-
// Simulate next chunk, restart timestamp.
196-
chunkStartTs = chunkStartTs.Add(chunkOffset)
197-
ts = chunkStartTs
360+
if cnt%samplesPerChunk == 0 {
361+
// Simulate next chunk, restart timestamp.
362+
chunkStartTs = chunkStartTs.Add(chunkOffset)
363+
ts = chunkStartTs
364+
}
198365
}
199366
}
200367

201368
require.NoError(t, app.Commit())
369+
202370
return storage.QueryableFunc(func(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
203371
return tsdb.NewBlockQuerier(head, mint, maxt)
204-
})
372+
}), rSamples
205373
}
206374

207375
func TestNoHistoricalQueryToIngester(t *testing.T) {

pkg/querier/remote_read_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,11 @@ type mockQuerier struct {
9090
matrix model.Matrix
9191
}
9292

93-
func (m mockQuerier) Select(_ bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
93+
func (m mockQuerier) Select(sortSeries bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
9494
if sp == nil {
9595
panic(fmt.Errorf("select params must be set"))
9696
}
97-
return series.MatrixToSeriesSet(m.matrix)
97+
return series.MatrixToSeriesSet(sortSeries, m.matrix)
9898
}
9999

100100
func (m mockQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]string, storage.Warnings, error) {

0 commit comments

Comments
 (0)