From 0342eacf9d09972d1611fc034989c90ffc69014b Mon Sep 17 00:00:00 2001 From: alanprot Date: Tue, 29 Oct 2024 09:35:40 -0700 Subject: [PATCH] Overriding chunksQuerier --- pkg/ingester/ingester.go | 31 +++-- pkg/storage/tsdb/cached_chunks_querier.go | 154 ++++++++++++++++++++++ 2 files changed, 175 insertions(+), 10 deletions(-) create mode 100644 pkg/storage/tsdb/cached_chunks_querier.go diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 2a3187a282..b9749a0cc9 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -137,6 +137,8 @@ type Config struct { AdminLimitMessage string `yaml:"admin_limit_message"` LabelsStringInterningEnabled bool `yaml:"labels_string_interning_enabled"` + + PostingsCache cortex_tsdb.PostingsCacheConfig `yaml:"postings_cache"` } // RegisterFlags adds the flags required to config this to the given FlagSet @@ -163,6 +165,12 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.AdminLimitMessage, "ingester.admin-limit-message", "please contact administrator to raise it", "Customize the message contained in limit errors") f.BoolVar(&cfg.LabelsStringInterningEnabled, "ingester.labels-string-interning-enabled", false, "Experimental: Enable string interning for metrics labels.") + + f.BoolVar(&cfg.LabelsStringInterningEnabled, "ingester.labels-string-interning-enabled", false, "Experimental: Enable string interning for metrics labels.") + f.Int64Var(&cfg.PostingsCache.MaxBytes, "ingester.postings-cache.max-bytes", 10*1024*1024, "Max bytes for postings cache") + f.IntVar(&cfg.PostingsCache.MaxItems, "ingester.postings-cache.max-items", 10000, "Max items for postings cache") + f.DurationVar(&cfg.PostingsCache.Ttl, "ingester.postings-cache.ttl", 10*time.Minute, "TTL for postings cache") + f.BoolVar(&cfg.PostingsCache.Enabled, "ingester.postings-cache.enabled", false, "Whether the postings cache is enabled or not") } func (cfg *Config) Validate() error { @@ -319,10 +327,6 @@ func (u *userTSDB) Appender(ctx context.Context) storage.Appender { return u.db.Appender(ctx) } -func (u *userTSDB) Querier(mint, maxt int64) (storage.Querier, error) { - return u.db.Querier(mint, maxt) -} - func (u *userTSDB) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) { return u.db.ChunkQuerier(mint, maxt) } @@ -1531,7 +1535,7 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu return nil, cleanup, err } - q, err := db.Querier(mint, maxt) + q, err := db.ChunkQuerier(mint, maxt) if err != nil { return nil, cleanup, err } @@ -1621,7 +1625,7 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR return nil, cleanup, err } - q, err := db.Querier(mint, maxt) + q, err := db.ChunkQuerier(mint, maxt) if err != nil { return nil, cleanup, err } @@ -1710,7 +1714,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien return nil, cleanup, err } - q, err := db.Querier(mint, maxt) + q, err := db.ChunkQuerier(mint, maxt) if err != nil { return nil, cleanup, err } @@ -1721,8 +1725,8 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien // Run a query for each matchers set and collect all the results. var ( - sets []storage.SeriesSet - mergedSet storage.SeriesSet + sets []storage.ChunkSeriesSet + mergedSet storage.ChunkSeriesSet ) hints := &storage.SelectHints{ @@ -1741,7 +1745,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien seriesSet := q.Select(ctx, true, hints, matchers...) sets = append(sets, seriesSet) } - mergedSet = storage.NewMergeSeriesSet(sets, storage.ChainedSeriesMerge) + mergedSet = storage.NewMergeChunkSeriesSet(sets, storage.NewCompactingChunkSeriesMerger(storage.ChainedSeriesMerge)) } else { mergedSet = q.Select(ctx, false, hints, matchersSet[0]...) } @@ -2204,6 +2208,13 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { OutOfOrderCapMax: i.cfg.BlocksStorageConfig.TSDB.OutOfOrderCapMax, EnableOverlappingCompaction: false, // Always let compactors handle overlapped blocks, e.g. OOO blocks. EnableNativeHistograms: i.cfg.BlocksStorageConfig.TSDB.EnableNativeHistograms, + BlockChunkQuerierFunc: func(b tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) { + if i.cfg.PostingsCache.Enabled { + return cortex_tsdb.NewCachedBlockChunkQuerier(i.cfg.PostingsCache, b, mint, maxt) + } + + return tsdb.NewBlockChunkQuerier(b, mint, maxt) + }, }, nil) if err != nil { return nil, errors.Wrapf(err, "failed to open TSDB: %s", udir) diff --git a/pkg/storage/tsdb/cached_chunks_querier.go b/pkg/storage/tsdb/cached_chunks_querier.go new file mode 100644 index 0000000000..78f45311f8 --- /dev/null +++ b/pkg/storage/tsdb/cached_chunks_querier.go @@ -0,0 +1,154 @@ +package tsdb + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/oklog/ulid" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + prom_tsdb "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/prometheus/prometheus/tsdb/chunks" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/tombstones" + "github.com/prometheus/prometheus/util/annotations" +) + +type PostingsCacheConfig struct { + MaxBytes int64 `yaml:"max_bytes"` + MaxItems int `yaml:"max_items"` + Ttl time.Duration `yaml:"ttl"` + Enabled bool `yaml:"enabled"` +} + +type blockBaseQuerier struct { + blockID ulid.ULID + index prom_tsdb.IndexReader + chunks prom_tsdb.ChunkReader + tombstones tombstones.Reader + + closed bool + + mint, maxt int64 +} + +func newBlockBaseQuerier(b prom_tsdb.BlockReader, mint, maxt int64) (*blockBaseQuerier, error) { + indexr, err := b.Index() + if err != nil { + return nil, fmt.Errorf("open index reader: %w", err) + } + chunkr, err := b.Chunks() + if err != nil { + indexr.Close() + return nil, fmt.Errorf("open chunk reader: %w", err) + } + tombsr, err := b.Tombstones() + if err != nil { + indexr.Close() + chunkr.Close() + return nil, fmt.Errorf("open tombstone reader: %w", err) + } + + if tombsr == nil { + tombsr = tombstones.NewMemTombstones() + } + return &blockBaseQuerier{ + blockID: b.Meta().ULID, + mint: mint, + maxt: maxt, + index: indexr, + chunks: chunkr, + tombstones: tombsr, + }, nil +} + +func (q *blockBaseQuerier) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + res, err := q.index.SortedLabelValues(ctx, name, matchers...) + return res, nil, err +} + +func (q *blockBaseQuerier) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) { + res, err := q.index.LabelNames(ctx, matchers...) + return res, nil, err +} + +func (q *blockBaseQuerier) Close() error { + if q.closed { + return errors.New("block querier already closed") + } + + errs := tsdb_errors.NewMulti( + q.index.Close(), + q.chunks.Close(), + q.tombstones.Close(), + ) + q.closed = true + return errs.Err() +} + +type cachedBlockChunkQuerier struct { + *blockBaseQuerier +} + +func NewCachedBlockChunkQuerier(cfg PostingsCacheConfig, b prom_tsdb.BlockReader, mint, maxt int64) (storage.ChunkQuerier, error) { + q, err := newBlockBaseQuerier(b, mint, maxt) + if err != nil { + return nil, err + } + return &cachedBlockChunkQuerier{blockBaseQuerier: q}, nil +} + +func (q *cachedBlockChunkQuerier) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms ...*labels.Matcher) storage.ChunkSeriesSet { + return selectChunkSeriesSet(ctx, sortSeries, hints, ms, q.blockID, q.index, q.chunks, q.tombstones, q.mint, q.maxt) +} + +func selectChunkSeriesSet(ctx context.Context, sortSeries bool, hints *storage.SelectHints, ms []*labels.Matcher, + blockID ulid.ULID, index prom_tsdb.IndexReader, chunks prom_tsdb.ChunkReader, tombstones tombstones.Reader, mint, maxt int64, +) storage.ChunkSeriesSet { + disableTrimming := false + sharded := hints != nil && hints.ShardCount > 0 + + if hints != nil { + mint = hints.Start + maxt = hints.End + disableTrimming = hints.DisableTrimming + } + p, err := prom_tsdb.PostingsForMatchers(ctx, index, ms...) + if err != nil { + return storage.ErrChunkSeriesSet(err) + } + if sharded { + p = index.ShardedPostings(p, hints.ShardIndex, hints.ShardCount) + } + if sortSeries { + p = index.SortedPostings(p) + } + + if hints != nil { + if hints.Func == "series" { + // When you're only looking up metadata (for example series API), you don't need to load any chunks. + return prom_tsdb.NewBlockChunkSeriesSet(blockID, index, NewNopChunkReader(), tombstones, p, mint, maxt, disableTrimming) + } + } + + return prom_tsdb.NewBlockChunkSeriesSet(blockID, index, chunks, tombstones, p, mint, maxt, disableTrimming) +} + +type nopChunkReader struct { + emptyChunk chunkenc.Chunk +} + +func NewNopChunkReader() prom_tsdb.ChunkReader { + return nopChunkReader{ + emptyChunk: chunkenc.NewXORChunk(), + } +} + +func (cr nopChunkReader) ChunkOrIterable(chunks.Meta) (chunkenc.Chunk, chunkenc.Iterable, error) { + return cr.emptyChunk, nil, nil +} + +func (cr nopChunkReader) Close() error { return nil }