Skip to content

Commit 251409b

Browse files
committed
Overriding chunksQuerier
1 parent 8a95aed commit 251409b

File tree

2 files changed

+157
-10
lines changed

2 files changed

+157
-10
lines changed

pkg/ingester/ingester.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -319,10 +319,6 @@ func (u *userTSDB) Appender(ctx context.Context) storage.Appender {
319319
return u.db.Appender(ctx)
320320
}
321321

322-
func (u *userTSDB) Querier(mint, maxt int64) (storage.Querier, error) {
323-
return u.db.Querier(mint, maxt)
324-
}
325-
326322
func (u *userTSDB) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
327323
return u.db.ChunkQuerier(mint, maxt)
328324
}
@@ -1531,7 +1527,7 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
15311527
return nil, cleanup, err
15321528
}
15331529

1534-
q, err := db.Querier(mint, maxt)
1530+
q, err := db.ChunkQuerier(mint, maxt)
15351531
if err != nil {
15361532
return nil, cleanup, err
15371533
}
@@ -1621,7 +1617,7 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
16211617
return nil, cleanup, err
16221618
}
16231619

1624-
q, err := db.Querier(mint, maxt)
1620+
q, err := db.ChunkQuerier(mint, maxt)
16251621
if err != nil {
16261622
return nil, cleanup, err
16271623
}
@@ -1710,7 +1706,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien
17101706
return nil, cleanup, err
17111707
}
17121708

1713-
q, err := db.Querier(mint, maxt)
1709+
q, err := db.ChunkQuerier(mint, maxt)
17141710
if err != nil {
17151711
return nil, cleanup, err
17161712
}
@@ -1721,8 +1717,8 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien
17211717

17221718
// Run a query for each matchers set and collect all the results.
17231719
var (
1724-
sets []storage.SeriesSet
1725-
mergedSet storage.SeriesSet
1720+
sets []storage.ChunkSeriesSet
1721+
mergedSet storage.ChunkSeriesSet
17261722
)
17271723

17281724
hints := &storage.SelectHints{
@@ -1741,7 +1737,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien
17411737
seriesSet := q.Select(ctx, true, hints, matchers...)
17421738
sets = append(sets, seriesSet)
17431739
}
1744-
mergedSet = storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
1740+
mergedSet = storage.NewMergeChunkSeriesSet(sets, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge))
17451741
} else {
17461742
mergedSet = q.Select(ctx, false, hints, matchersSet[0]...)
17471743
}
@@ -2204,6 +2200,9 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
22042200
OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax,
22052201
EnableOverlappingCompaction: false, // Always let compactors handle overlapped blocks, e.g. OOO blocks.
22062202
EnableNativeHistograms: i.cfg.BlocksStorageConfig.TSDB.EnableNativeHistograms,
2203+
BlockChunkQuerierFunc: func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
2204+
return cortex_tsdb.NewBlockChunkQuerier(b, mint, maxt)
2205+
},
22072206
}, nil)
22082207
if err != nil {
22092208
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)

pkg/storage/tsdb/querier.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package tsdb
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"github.com/oklog/ulid"
8+
"github.com/prometheus/prometheus/tsdb/chunkenc"
9+
"github.com/prometheus/prometheus/tsdb/chunks"
10+
11+
"github.com/prometheus/prometheus/model/labels"
12+
"github.com/prometheus/prometheus/storage"
13+
prom_tsdb "github.com/prometheus/prometheus/tsdb"
14+
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
15+
"github.com/prometheus/prometheus/tsdb/tombstones"
16+
"github.com/prometheus/prometheus/util/annotations"
17+
)
18+
19+
type blockBaseQuerier struct {
20+
blockID ulid.ULID
21+
index prom_tsdb.IndexReader
22+
chunks prom_tsdb.ChunkReader
23+
tombstones tombstones.Reader
24+
25+
closed bool
26+
27+
mint, maxt int64
28+
}
29+
30+
func newBlockBaseQuerier(b prom_tsdb.BlockReader, mint, maxt int64) (*blockBaseQuerier, error) {
31+
indexr, err := b.Index()
32+
if err != nil {
33+
return nil, fmt.Errorf("open index reader: %w", err)
34+
}
35+
chunkr, err := b.Chunks()
36+
if err != nil {
37+
indexr.Close()
38+
return nil, fmt.Errorf("open chunk reader: %w", err)
39+
}
40+
tombsr, err := b.Tombstones()
41+
if err != nil {
42+
indexr.Close()
43+
chunkr.Close()
44+
return nil, fmt.Errorf("open tombstone reader: %w", err)
45+
}
46+
47+
if tombsr == nil {
48+
tombsr = tombstones.NewMemTombstones()
49+
}
50+
return &blockBaseQuerier{
51+
blockID: b.Meta().ULID,
52+
mint: mint,
53+
maxt: maxt,
54+
index: indexr,
55+
chunks: chunkr,
56+
tombstones: tombsr,
57+
}, nil
58+
}
59+
60+
func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
61+
res, err := q.index.SortedLabelValues(ctx, name, matchers...)
62+
return res, nil, err
63+
}
64+
65+
func (q *blockBaseQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
66+
res, err := q.index.LabelNames(ctx, matchers...)
67+
return res, nil, err
68+
}
69+
70+
func (q *blockBaseQuerier) Close() error {
71+
if q.closed {
72+
return errors.New("block querier already closed")
73+
}
74+
75+
errs := tsdb_errors.NewMulti(
76+
q.index.Close(),
77+
q.chunks.Close(),
78+
q.tombstones.Close(),
79+
)
80+
q.closed = true
81+
return errs.Err()
82+
}
83+
84+
// blockChunkQuerier provides chunk querying access to a single block database.
85+
type blockChunkQuerier struct {
86+
*blockBaseQuerier
87+
}
88+
89+
// NewBlockChunkQuerier returns a chunk querier against the block reader and requested min and max time range.
90+
func NewBlockChunkQuerier(b prom_tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
91+
q, err := newBlockBaseQuerier(b, mint, maxt)
92+
if err != nil {
93+
return nil, err
94+
}
95+
return &blockChunkQuerier{blockBaseQuerier: q}, nil
96+
}
97+
98+
func (q *blockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
99+
return selectChunkSeriesSet(ctx, sortSeries, hints, ms, q.blockID, q.index, q.chunks, q.tombstones, q.mint, q.maxt)
100+
}
101+
102+
func selectChunkSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher,
103+
blockID ulid.ULID, index prom_tsdb.IndexReader, chunks prom_tsdb.ChunkReader, tombstones tombstones.Reader, mint, maxt int64,
104+
) storage.ChunkSeriesSet {
105+
disableTrimming := false
106+
sharded := hints != nil && hints.ShardCount > 0
107+
108+
if hints != nil {
109+
mint = hints.Start
110+
maxt = hints.End
111+
disableTrimming = hints.DisableTrimming
112+
}
113+
p, err := prom_tsdb.PostingsForMatchers(ctx, index, ms...)
114+
if err != nil {
115+
return storage.ErrChunkSeriesSet(err)
116+
}
117+
if sharded {
118+
p = index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
119+
}
120+
if sortSeries {
121+
p = index.SortedPostings(p)
122+
}
123+
124+
if hints != nil {
125+
if hints.Func == "series" {
126+
// When you're only looking up metadata (for example series API), you don't need to load any chunks.
127+
return prom_tsdb.NewBlockChunkSeriesSet(blockID, index, NewNopChunkReader(), tombstones, p, mint, maxt, disableTrimming)
128+
}
129+
}
130+
131+
return prom_tsdb.NewBlockChunkSeriesSet(blockID, index, chunks, tombstones, p, mint, maxt, disableTrimming)
132+
}
133+
134+
type nopChunkReader struct {
135+
emptyChunk chunkenc.Chunk
136+
}
137+
138+
func NewNopChunkReader() prom_tsdb.ChunkReader {
139+
return nopChunkReader{
140+
emptyChunk: chunkenc.NewXORChunk(),
141+
}
142+
}
143+
144+
func (cr nopChunkReader) ChunkOrIterable(chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
145+
return cr.emptyChunk, nil, nil
146+
}
147+
148+
func (cr nopChunkReader) Close() error { return nil }

0 commit comments

Comments
 (0)