Skip to content

Commit 88994ff

Browse files
committed
Fixing Query Store After on parquet -> we need to change the hints to we filter out the right chunks
Signed-off-by: alanprot <[email protected]>
1 parent dc697f1 commit 88994ff

File tree

2 files changed

+72
-28
lines changed

2 files changed

+72
-28
lines changed

pkg/querier/parquet_queryable.go

Lines changed: 54 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ func (q *parquetQuerierWithFallback) LabelNames(ctx context.Context, hints *stor
356356
return result, rAnnotations, nil
357357
}
358358

359-
func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
359+
func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool, h *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
360360
userID, err := tenant.TenantID(ctx)
361361
if err != nil {
362362
storage.ErrSeriesSet(err)
@@ -366,68 +366,99 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
366366
uLogger := util_log.WithUserID(userID, q.logger)
367367
level.Warn(uLogger).Log("msg", "parquet queryable enabled but vertical sharding > 1. Falling back to the block storage")
368368

369-
return q.blocksStoreQuerier.Select(ctx, sortSeries, hints, matchers...)
369+
return q.blocksStoreQuerier.Select(ctx, sortSeries, h, matchers...)
370370
}
371371

372-
mint, maxt, limit := q.minT, q.maxT, 0
372+
hints := storage.SelectHints{
373+
Start: q.minT,
374+
End: q.maxT,
375+
}
373376

374-
if hints != nil {
377+
mint, maxt, limit := q.minT, q.maxT, 0
378+
if h != nil {
379+
// let copy the hints here as we wanna potentially modify it
380+
hints = *h
375381
mint, maxt, limit = hints.Start, hints.End, hints.Limit
376382
}
377383

384+
maxt = q.adjustMaxT(maxt)
385+
hints.End = maxt
386+
387+
if maxt < mint {
388+
return nil
389+
}
390+
378391
remaining, parquet, err := q.getBlocks(ctx, mint, maxt)
379392
if err != nil {
380393
return storage.ErrSeriesSet(err)
381394
}
382395

383-
serieSets := []storage.SeriesSet{}
384-
385396
// Lets sort the series to merge
386397
if len(parquet) > 0 && len(remaining) > 0 {
387398
sortSeries = true
388399
}
389400

401+
promises := make([]chan storage.SeriesSet, 0, 2)
402+
390403
if len(parquet) > 0 {
391-
serieSets = append(serieSets, q.parquetQuerier.Select(InjectBlocksIntoContext(ctx, parquet...), sortSeries, hints, matchers...))
404+
p := make(chan storage.SeriesSet, 1)
405+
promises = append(promises, p)
406+
go func() {
407+
p <- q.parquetQuerier.Select(InjectBlocksIntoContext(ctx, parquet...), sortSeries, &hints, matchers...)
408+
}()
392409
}
393410

394411
if len(remaining) > 0 {
395-
serieSets = append(serieSets, q.blocksStoreQuerier.Select(InjectBlocksIntoContext(ctx, remaining...), sortSeries, hints, matchers...))
412+
p := make(chan storage.SeriesSet, 1)
413+
promises = append(promises, p)
414+
go func() {
415+
p <- q.blocksStoreQuerier.Select(InjectBlocksIntoContext(ctx, remaining...), sortSeries, &hints, matchers...)
416+
}()
396417
}
397418

398-
if len(serieSets) == 1 {
399-
return serieSets[0]
419+
if len(promises) == 1 {
420+
return <-promises[0]
400421
}
401422

402-
return storage.NewMergeSeriesSet(serieSets, limit, storage.ChainedSeriesMerge)
403-
}
423+
seriesSets := make([]storage.SeriesSet, len(promises))
424+
for i, promise := range promises {
425+
seriesSets[i] = <-promise
426+
}
404427

405-
func (q *parquetQuerierWithFallback) Close() error {
406-
mErr := multierror.MultiError{}
407-
mErr.Add(q.parquetQuerier.Close())
408-
mErr.Add(q.blocksStoreQuerier.Close())
409-
return mErr.Err()
428+
return storage.NewMergeSeriesSet(seriesSets, limit, storage.ChainedSeriesMerge)
410429
}
411430

412-
func (q *parquetQuerierWithFallback) getBlocks(ctx context.Context, minT, maxT int64) ([]*bucketindex.Block, []*bucketindex.Block, error) {
431+
func (q *parquetQuerierWithFallback) adjustMaxT(maxt int64) int64 {
413432
// If queryStoreAfter is enabled, we do manipulate the query maxt to query samples up until
414433
// now - queryStoreAfter, because the most recent time range is covered by ingesters. This
415434
// optimization is particularly important for the blocks storage because can be used to skip
416435
// querying most recent not-compacted-yet blocks from the storage.
417436
if q.queryStoreAfter > 0 {
418437
now := time.Now()
419-
maxT = min(maxT, util.TimeToMillis(now.Add(-q.queryStoreAfter)))
420-
421-
if maxT < minT {
422-
return nil, nil, nil
423-
}
438+
maxt = min(maxt, util.TimeToMillis(now.Add(-q.queryStoreAfter)))
424439
}
440+
return maxt
441+
}
425442

443+
func (q *parquetQuerierWithFallback) Close() error {
444+
mErr := multierror.MultiError{}
445+
mErr.Add(q.parquetQuerier.Close())
446+
mErr.Add(q.blocksStoreQuerier.Close())
447+
return mErr.Err()
448+
}
449+
450+
func (q *parquetQuerierWithFallback) getBlocks(ctx context.Context, minT, maxT int64) ([]*bucketindex.Block, []*bucketindex.Block, error) {
426451
userID, err := tenant.TenantID(ctx)
427452
if err != nil {
428453
return nil, nil, err
429454
}
430455

456+
maxT = q.adjustMaxT(maxT)
457+
458+
if maxT < minT {
459+
return nil, nil, nil
460+
}
461+
431462
blocks, _, err := q.finder.GetBlocks(ctx, userID, minT, maxT)
432463
if err != nil {
433464
return nil, nil, err

pkg/querier/parquet_queryable_test.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package querier
33
import (
44
"context"
55
"testing"
6+
"time"
67

78
"github.com/go-kit/log"
89
"github.com/oklog/ulid"
@@ -19,6 +20,7 @@ import (
1920
"github.com/cortexproject/cortex/pkg/querier/series"
2021
"github.com/cortexproject/cortex/pkg/storage/parquet"
2122
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
23+
"github.com/cortexproject/cortex/pkg/util"
2224
"github.com/cortexproject/cortex/pkg/util/flagext"
2325
"github.com/cortexproject/cortex/pkg/util/validation"
2426
)
@@ -27,7 +29,7 @@ func TestParquetQueryableFallbackLogic(t *testing.T) {
2729
block1 := ulid.MustNew(1, nil)
2830
block2 := ulid.MustNew(2, nil)
2931
minT := int64(10)
30-
maxT := int64(20)
32+
maxT := util.TimeToMillis(time.Now())
3133

3234
createStore := func() *blocksStoreSetMock {
3335
return &blocksStoreSetMock{mockedResponses: []interface{}{
@@ -124,13 +126,14 @@ func TestParquetQueryableFallbackLogic(t *testing.T) {
124126
finder: finder,
125127
blocksStoreQuerier: q,
126128
parquetQuerier: mParquetQuerier,
129+
queryStoreAfter: time.Hour,
127130
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
128131
limits: defaultOverrides(t, 0),
129132
logger: log.NewNopLogger(),
130133
defaultBlockStoreType: parquetBlockStore,
131134
}
132135

133-
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
136+
finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything).Return(bucketindex.Blocks{
134137
&bucketindex.Block{ID: block1},
135138
&bucketindex.Block{ID: block2},
136139
}, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil)
@@ -246,30 +249,39 @@ func TestParquetQueryableFallbackLogic(t *testing.T) {
246249
}
247250

248251
mParquetQuerier := &mockParquetQuerier{}
252+
queryStoreAfter := time.Hour
249253
pq := &parquetQuerierWithFallback{
250254
minT: minT,
251255
maxT: maxT,
252256
finder: finder,
253257
blocksStoreQuerier: q,
254258
parquetQuerier: mParquetQuerier,
259+
queryStoreAfter: queryStoreAfter,
255260
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
256261
limits: defaultOverrides(t, 0),
257262
logger: log.NewNopLogger(),
258263
defaultBlockStoreType: parquetBlockStore,
259264
}
260265

261-
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
266+
finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything).Return(bucketindex.Blocks{
262267
&bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}},
263268
&bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}},
264269
}, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil)
265270

266271
t.Run("select", func(t *testing.T) {
267272
stores.Reset()
268273
mParquetQuerier.Reset()
269-
ss := pq.Select(ctx, true, nil, matchers...)
274+
hints := storage.SelectHints{
275+
Start: minT,
276+
End: maxT,
277+
}
278+
ss := pq.Select(ctx, true, &hints, matchers...)
270279
require.NoError(t, ss.Err())
271280
require.Len(t, stores.queriedBlocks, 0)
272281
require.Len(t, mParquetQuerier.queriedBlocks, 2)
282+
require.Equal(t, mParquetQuerier.queriedHints.Start, minT)
283+
queriedDelta := time.Duration(maxT-mParquetQuerier.queriedHints.End) * time.Millisecond
284+
require.InDeltaf(t, queriedDelta.Minutes(), queryStoreAfter.Minutes(), 0.1, "query after not set")
273285
})
274286

275287
t.Run("labelNames", func(t *testing.T) {
@@ -409,13 +421,14 @@ func defaultOverrides(t *testing.T, queryVerticalShardSize int) *validation.Over
409421

410422
type mockParquetQuerier struct {
411423
queriedBlocks []*bucketindex.Block
424+
queriedHints *storage.SelectHints
412425
}
413426

414427
func (m *mockParquetQuerier) Select(ctx context.Context, sortSeries bool, sp *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
415428
if blocks, ok := ExtractBlocksFromContext(ctx); ok {
416429
m.queriedBlocks = append(m.queriedBlocks, blocks...)
417430
}
418-
431+
m.queriedHints = sp
419432
return series.NewConcreteSeriesSet(sortSeries, nil)
420433
}
421434

0 commit comments

Comments
 (0)