Skip to content

[WIP] Overriding chunks querier. #6293

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ type Config struct {
AdminLimitMessage string `yaml:"admin_limit_message"`

LabelsStringInterningEnabled bool `yaml:"labels_string_interning_enabled"`

PostingsCache cortex_tsdb.PostingsCacheConfig `yaml:"postings_cache"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
Expand All @@ -163,6 +165,12 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.AdminLimitMessage, "ingester.admin-limit-message", "please contact administrator to raise it", "Customize the message contained in limit errors")

f.BoolVar(&cfg.LabelsStringInterningEnabled, "ingester.labels-string-interning-enabled", false, "Experimental: Enable string interning for metrics labels.")

f.BoolVar(&cfg.LabelsStringInterningEnabled, "ingester.labels-string-interning-enabled", false, "Experimental: Enable string interning for metrics labels.")
f.Int64Var(&cfg.PostingsCache.MaxBytes, "ingester.postings-cache.max-bytes", 10*1024*1024, "Max bytes for postings cache")
f.IntVar(&cfg.PostingsCache.MaxItems, "ingester.postings-cache.max-items", 10000, "Max items for postings cache")
f.DurationVar(&cfg.PostingsCache.Ttl, "ingester.postings-cache.ttl", 10*time.Minute, "TTL for postings cache")
f.BoolVar(&cfg.PostingsCache.Enabled, "ingester.postings-cache.enabled", false, "Whether the postings cache is enabled or not")
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -319,10 +327,6 @@ func (u *userTSDB) Appender(ctx context.Context) storage.Appender {
return u.db.Appender(ctx)
}

func (u *userTSDB) Querier(mint, maxt int64) (storage.Querier, error) {
return u.db.Querier(mint, maxt)
}

func (u *userTSDB) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
return u.db.ChunkQuerier(mint, maxt)
}
Expand Down Expand Up @@ -1531,7 +1535,7 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
return nil, cleanup, err
}

q, err := db.Querier(mint, maxt)
q, err := db.ChunkQuerier(mint, maxt)
if err != nil {
return nil, cleanup, err
}
Expand Down Expand Up @@ -1621,7 +1625,7 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
return nil, cleanup, err
}

q, err := db.Querier(mint, maxt)
q, err := db.ChunkQuerier(mint, maxt)
if err != nil {
return nil, cleanup, err
}
Expand Down Expand Up @@ -1710,7 +1714,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien
return nil, cleanup, err
}

q, err := db.Querier(mint, maxt)
q, err := db.ChunkQuerier(mint, maxt)
if err != nil {
return nil, cleanup, err
}
Expand All @@ -1721,8 +1725,8 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien

// Run a query for each matchers set and collect all the results.
var (
sets []storage.SeriesSet
mergedSet storage.SeriesSet
sets []storage.ChunkSeriesSet
mergedSet storage.ChunkSeriesSet
)

hints := &storage.SelectHints{
Expand All @@ -1741,7 +1745,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien
seriesSet := q.Select(ctx, true, hints, matchers...)
sets = append(sets, seriesSet)
}
mergedSet = storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge)
mergedSet = storage.NewMergeChunkSeriesSet(sets, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge))
} else {
mergedSet = q.Select(ctx, false, hints, matchersSet[0]...)
}
Expand Down Expand Up @@ -2204,6 +2208,13 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax,
EnableOverlappingCompaction: false, // Always let compactors handle overlapped blocks, e.g. OOO blocks.
EnableNativeHistograms: i.cfg.BlocksStorageConfig.TSDB.EnableNativeHistograms,
BlockChunkQuerierFunc: func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
if i.cfg.PostingsCache.Enabled {
return cortex_tsdb.NewCachedBlockChunkQuerier(i.cfg.PostingsCache, b, mint, maxt)
}

return tsdb.NewBlockChunkQuerier(b, mint, maxt)
},
}, nil)
if err != nil {
return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir)
Expand Down
154 changes: 154 additions & 0 deletions pkg/storage/tsdb/cached_chunks_querier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package tsdb

import (
"context"
"errors"
"fmt"
"time"

"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
prom_tsdb "github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/tombstones"
"github.com/prometheus/prometheus/util/annotations"
)

type PostingsCacheConfig struct {
MaxBytes int64 `yaml:"max_bytes"`
MaxItems int `yaml:"max_items"`
Ttl time.Duration `yaml:"ttl"`
Enabled bool `yaml:"enabled"`
}

type blockBaseQuerier struct {
blockID ulid.ULID
index prom_tsdb.IndexReader
chunks prom_tsdb.ChunkReader
tombstones tombstones.Reader

closed bool

mint, maxt int64
}

func newBlockBaseQuerier(b prom_tsdb.BlockReader, mint, maxt int64) (*blockBaseQuerier, error) {
indexr, err := b.Index()
if err != nil {
return nil, fmt.Errorf("open index reader: %w", err)
}
chunkr, err := b.Chunks()
if err != nil {
indexr.Close()
return nil, fmt.Errorf("open chunk reader: %w", err)
}
tombsr, err := b.Tombstones()
if err != nil {
indexr.Close()
chunkr.Close()
return nil, fmt.Errorf("open tombstone reader: %w", err)
}

if tombsr == nil {
tombsr = tombstones.NewMemTombstones()
}
return &blockBaseQuerier{
blockID: b.Meta().ULID,
mint: mint,
maxt: maxt,
index: indexr,
chunks: chunkr,
tombstones: tombsr,
}, nil
}

func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
res, err := q.index.SortedLabelValues(ctx, name, matchers...)
return res, nil, err
}

func (q *blockBaseQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
res, err := q.index.LabelNames(ctx, matchers...)
return res, nil, err
}

func (q *blockBaseQuerier) Close() error {
if q.closed {
return errors.New("block querier already closed")
}

errs := tsdb_errors.NewMulti(
q.index.Close(),
q.chunks.Close(),
q.tombstones.Close(),
)
q.closed = true
return errs.Err()
}

type cachedBlockChunkQuerier struct {
*blockBaseQuerier
}

func NewCachedBlockChunkQuerier(cfg PostingsCacheConfig, b prom_tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) {
q, err := newBlockBaseQuerier(b, mint, maxt)
if err != nil {
return nil, err
}
return &cachedBlockChunkQuerier{blockBaseQuerier: q}, nil
}

func (q *cachedBlockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet {
return selectChunkSeriesSet(ctx, sortSeries, hints, ms, q.blockID, q.index, q.chunks, q.tombstones, q.mint, q.maxt)
}

func selectChunkSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher,
blockID ulid.ULID, index prom_tsdb.IndexReader, chunks prom_tsdb.ChunkReader, tombstones tombstones.Reader, mint, maxt int64,
) storage.ChunkSeriesSet {
disableTrimming := false
sharded := hints != nil && hints.ShardCount > 0

if hints != nil {
mint = hints.Start
maxt = hints.End
disableTrimming = hints.DisableTrimming
}
p, err := prom_tsdb.PostingsForMatchers(ctx, index, ms...)
if err != nil {
return storage.ErrChunkSeriesSet(err)
}
if sharded {
p = index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount)
}
if sortSeries {
p = index.SortedPostings(p)
}

if hints != nil {
if hints.Func == "series" {
// When you're only looking up metadata (for example series API), you don't need to load any chunks.
return prom_tsdb.NewBlockChunkSeriesSet(blockID, index, NewNopChunkReader(), tombstones, p, mint, maxt, disableTrimming)
}
}

return prom_tsdb.NewBlockChunkSeriesSet(blockID, index, chunks, tombstones, p, mint, maxt, disableTrimming)
}

type nopChunkReader struct {
emptyChunk chunkenc.Chunk
}

func NewNopChunkReader() prom_tsdb.ChunkReader {
return nopChunkReader{
emptyChunk: chunkenc.NewXORChunk(),
}
}

func (cr nopChunkReader) ChunkOrIterable(chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) {
return cr.emptyChunk, nil, nil
}

func (cr nopChunkReader) Close() error { return nil }
Loading