Skip to content

Commit 0342eac

Browse files
committed
Overriding chunksQuerier
1 parent 8a95aed commit 0342eac

File tree

2 files changed

+175
-10
lines changed

2 files changed

+175
-10
lines changed

pkg/ingester/ingester.go

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ type Config struct {
137137
AdminLimitMessage string `yaml:"admin_limit_message"`
138138

139139
LabelsStringInterningEnabled bool `yaml:"labels_string_interning_enabled"`
140+
141+
PostingsCache cortex_tsdb.PostingsCacheConfig `yaml:"postings_cache"`
140142
}
141143

142144
// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -163,6 +165,12 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
163165
f.StringVar(&cfg.AdminLimitMessage, "ingester.admin-limit-message", "please contact administrator to raise it", "Customize the message contained in limit errors")
164166

165167
f.BoolVar(&cfg.LabelsStringInterningEnabled, "ingester.labels-string-interning-enabled", false, "Experimental: Enable string interning for metrics labels.")
168+
169+
f.BoolVar(&cfg.LabelsStringInterningEnabled, "ingester.labels-string-interning-enabled", false, "Experimental: Enable string interning for metrics labels.")
170+
f.Int64Var(&cfg.PostingsCache.MaxBytes, "ingester.postings-cache.max-bytes", 10*1024*1024, "Max bytes for postings cache")
171+
f.IntVar(&cfg.PostingsCache.MaxItems, "ingester.postings-cache.max-items", 10000, "Max items for postings cache")
172+
f.DurationVar(&cfg.PostingsCache.Ttl, "ingester.postings-cache.ttl", 10*time.Minute, "TTL for postings cache")
173+
f.BoolVar(&cfg.PostingsCache.Enabled, "ingester.postings-cache.enabled", false, "Whether the postings cache is enabled or not")
166174
}
167175

168176
func (cfg *Config) Validate() error {
@@ -319,10 +327,6 @@ func (u *userTSDB) Appender(ctx context.Context) storage.Appender {
319327
return u.db.Appender(ctx)
320328
}
321329

322-
func (u *userTSDB) Querier(mint, maxt int64) (storage.Querier, error) {
323-
return u.db.Querier(mint, maxt)
324-
}
325-
326330
func (u *userTSDB) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
327331
return u.db.ChunkQuerier(mint, maxt)
328332
}
@@ -1531,7 +1535,7 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
15311535
return nil, cleanup, err
15321536
}
15331537

1534-
q, err := db.Querier(mint, maxt)
1538+
q, err := db.ChunkQuerier(mint, maxt)
15351539
if err != nil {
15361540
return nil, cleanup, err
15371541
}
@@ -1621,7 +1625,7 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
16211625
return nil, cleanup, err
16221626
}
16231627

1624-
q, err := db.Querier(mint, maxt)
1628+
q, err := db.ChunkQuerier(mint, maxt)
16251629
if err != nil {
16261630
return nil, cleanup, err
16271631
}
@@ -1710,7 +1714,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien
17101714
return nil, cleanup, err
17111715
}
17121716

1713-
q, err := db.Querier(mint, maxt)
1717+
q, err := db.ChunkQuerier(mint, maxt)
17141718
if err != nil {
17151719
return nil, cleanup, err
17161720
}
@@ -1721,8 +1725,8 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien
17211725

17221726
// Run a query for each matchers set and collect all the results.
17231727
var (
1724-
sets []storage.SeriesSet
1725-
mergedSet storage.SeriesSet
1728+
sets []storage.ChunkSeriesSet
1729+
mergedSet storage.ChunkSeriesSet
17261730
)
17271731

17281732
hints := &storage.SelectHints{
@@ -1741,7 +1745,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien
17411745
seriesSet := q.Select(ctx, true, hints, matchers...)
17421746
sets = append(sets, seriesSet)
17431747
}
1744-
mergedSet = storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
1748+
mergedSet = storage.NewMergeChunkSeriesSet(sets, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge))
17451749
} else {
17461750
mergedSet = q.Select(ctx, false, hints, matchersSet[0]...)
17471751
}
@@ -2204,6 +2208,13 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
22042208
OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax,
22052209
EnableOverlappingCompaction: false, // Always let compactors handle overlapped blocks, e.g. OOO blocks.
22062210
EnableNativeHistograms: i.cfg.BlocksStorageConfig.TSDB.EnableNativeHistograms,
2211+
BlockChunkQuerierFunc: func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
2212+
if i.cfg.PostingsCache.Enabled {
2213+
return cortex_tsdb.NewCachedBlockChunkQuerier(i.cfg.PostingsCache, b, mint, maxt)
2214+
}
2215+
2216+
return tsdb.NewBlockChunkQuerier(b, mint, maxt)
2217+
},
22072218
}, nil)
22082219
if err != nil {
22092220
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package tsdb
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"time"
8+
9+
"github.com/oklog/ulid"
10+
"github.com/prometheus/prometheus/model/labels"
11+
"github.com/prometheus/prometheus/storage"
12+
prom_tsdb "github.com/prometheus/prometheus/tsdb"
13+
"github.com/prometheus/prometheus/tsdb/chunkenc"
14+
"github.com/prometheus/prometheus/tsdb/chunks"
15+
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
16+
"github.com/prometheus/prometheus/tsdb/tombstones"
17+
"github.com/prometheus/prometheus/util/annotations"
18+
)
19+
20+
type PostingsCacheConfig struct {
21+
MaxBytes int64 `yaml:"max_bytes"`
22+
MaxItems int `yaml:"max_items"`
23+
Ttl time.Duration `yaml:"ttl"`
24+
Enabled bool `yaml:"enabled"`
25+
}
26+
27+
type blockBaseQuerier struct {
28+
blockID ulid.ULID
29+
index prom_tsdb.IndexReader
30+
chunks prom_tsdb.ChunkReader
31+
tombstones tombstones.Reader
32+
33+
closed bool
34+
35+
mint, maxt int64
36+
}
37+
38+
func newBlockBaseQuerier(b prom_tsdb.BlockReader, mint, maxt int64) (*blockBaseQuerier, error) {
39+
indexr, err := b.Index()
40+
if err != nil {
41+
return nil, fmt.Errorf("open index reader: %w", err)
42+
}
43+
chunkr, err := b.Chunks()
44+
if err != nil {
45+
indexr.Close()
46+
return nil, fmt.Errorf("open chunk reader: %w", err)
47+
}
48+
tombsr, err := b.Tombstones()
49+
if err != nil {
50+
indexr.Close()
51+
chunkr.Close()
52+
return nil, fmt.Errorf("open tombstone reader: %w", err)
53+
}
54+
55+
if tombsr == nil {
56+
tombsr = tombstones.NewMemTombstones()
57+
}
58+
return &blockBaseQuerier{
59+
blockID: b.Meta().ULID,
60+
mint: mint,
61+
maxt: maxt,
62+
index: indexr,
63+
chunks: chunkr,
64+
tombstones: tombsr,
65+
}, nil
66+
}
67+
68+
func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
69+
res, err := q.index.SortedLabelValues(ctx, name, matchers...)
70+
return res, nil, err
71+
}
72+
73+
func (q *blockBaseQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
74+
res, err := q.index.LabelNames(ctx, matchers...)
75+
return res, nil, err
76+
}
77+
78+
func (q *blockBaseQuerier) Close() error {
79+
if q.closed {
80+
return errors.New("block querier already closed")
81+
}
82+
83+
errs := tsdb_errors.NewMulti(
84+
q.index.Close(),
85+
q.chunks.Close(),
86+
q.tombstones.Close(),
87+
)
88+
q.closed = true
89+
return errs.Err()
90+
}
91+
92+
type cachedBlockChunkQuerier struct {
93+
*blockBaseQuerier
94+
}
95+
96+
func NewCachedBlockChunkQuerier(cfg PostingsCacheConfig, b prom_tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
97+
q, err := newBlockBaseQuerier(b, mint, maxt)
98+
if err != nil {
99+
return nil, err
100+
}
101+
return &cachedBlockChunkQuerier{blockBaseQuerier: q}, nil
102+
}
103+
104+
func (q *cachedBlockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
105+
return selectChunkSeriesSet(ctx, sortSeries, hints, ms, q.blockID, q.index, q.chunks, q.tombstones, q.mint, q.maxt)
106+
}
107+
108+
func selectChunkSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher,
109+
blockID ulid.ULID, index prom_tsdb.IndexReader, chunks prom_tsdb.ChunkReader, tombstones tombstones.Reader, mint, maxt int64,
110+
) storage.ChunkSeriesSet {
111+
disableTrimming := false
112+
sharded := hints != nil && hints.ShardCount > 0
113+
114+
if hints != nil {
115+
mint = hints.Start
116+
maxt = hints.End
117+
disableTrimming = hints.DisableTrimming
118+
}
119+
p, err := prom_tsdb.PostingsForMatchers(ctx, index, ms...)
120+
if err != nil {
121+
return storage.ErrChunkSeriesSet(err)
122+
}
123+
if sharded {
124+
p = index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
125+
}
126+
if sortSeries {
127+
p = index.SortedPostings(p)
128+
}
129+
130+
if hints != nil {
131+
if hints.Func == "series" {
132+
// When you're only looking up metadata (for example series API), you don't need to load any chunks.
133+
return prom_tsdb.NewBlockChunkSeriesSet(blockID, index, NewNopChunkReader(), tombstones, p, mint, maxt, disableTrimming)
134+
}
135+
}
136+
137+
return prom_tsdb.NewBlockChunkSeriesSet(blockID, index, chunks, tombstones, p, mint, maxt, disableTrimming)
138+
}
139+
140+
type nopChunkReader struct {
141+
emptyChunk chunkenc.Chunk
142+
}
143+
144+
func NewNopChunkReader() prom_tsdb.ChunkReader {
145+
return nopChunkReader{
146+
emptyChunk: chunkenc.NewXORChunk(),
147+
}
148+
}
149+
150+
func (cr nopChunkReader) ChunkOrIterable(chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
151+
return cr.emptyChunk, nil, nil
152+
}
153+
154+
func (cr nopChunkReader) Close() error { return nil }

0 commit comments

Comments
 (0)