Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 116 additions & 13 deletions pkg/querier/parquet_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package querier

import (
"context"
"fmt"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/parquet-go/parquet-go"
"github.com/pkg/errors"
"github.com/prometheus-community/parquet-common/schema"
Expand Down Expand Up @@ -87,6 +89,11 @@ func NewParquetQueryable(
return nil, err
}

cache, err := newCache[*parquet_storage.ParquetShard]("parquet-shards", config.ParquetQueryableShardCacheSize, newCacheMetrics(reg))
if err != nil {
return nil, err
}

cDecoder := schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool())

parquetQueryable, err := search.NewParquetQueryable(cDecoder, func(ctx context.Context, mint, maxt int64) ([]*parquet_storage.ParquetShard, error) {
Expand All @@ -106,20 +113,30 @@ func NewParquetQueryable(

for i, block := range blocks {
errGroup.Go(func() error {
// we always only have 1 shard - shard 0
shard, err := parquet_storage.OpenParquetShard(ctx,
userBkt,
block.ID.String(),
0,
parquet_storage.WithFileOptions(
parquet.SkipMagicBytes(true),
parquet.ReadBufferSize(100*1024),
parquet.SkipBloomFilters(true),
),
parquet_storage.WithOptimisticReader(true),
)
cacheKey := fmt.Sprintf("%v-%v", userID, block.ID)
shard := cache.Get(cacheKey)
if shard == nil {
// we always only have 1 shard - shard 0
// Use context.Background() here as the file can be cached and live after the request ends.
shard, err = parquet_storage.OpenParquetShard(context.Background(),
userBkt,
block.ID.String(),
0,
parquet_storage.WithFileOptions(
parquet.SkipMagicBytes(true),
parquet.ReadBufferSize(100*1024),
parquet.SkipBloomFilters(true),
),
parquet_storage.WithOptimisticReader(true),
)
if err != nil {
return err
}
cache.Set(cacheKey, shard)
}

shards[i] = shard
return err
return nil
})
}

Expand Down Expand Up @@ -401,3 +418,89 @@ func (q *parquetQuerierWithFallback) getBlocks(ctx context.Context, minT, maxT i

return remaining, parquetBlocks, nil
}

type cacheInterface[T any] interface {
Get(path string) T
Set(path string, reader T)
}

type cacheMetrics struct {
hits *prometheus.CounterVec
misses *prometheus.CounterVec
evictions *prometheus.CounterVec
size *prometheus.GaugeVec
}

func newCacheMetrics(reg prometheus.Registerer) *cacheMetrics {
return &cacheMetrics{
hits: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_parquet_queryable_cache_hits_total",
Help: "Total number of parquet cache hits",
}, []string{"name"}),
misses: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_parquet_queryable_cache_misses_total",
Help: "Total number of parquet cache misses",
}, []string{"name"}),
evictions: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_parquet_queryable_cache_evictions_total",
Help: "Total number of parquet cache evictions",
}, []string{"name"}),
size: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_parquet_queryable_cache_item_count",
Help: "Current number of cached parquet items",
}, []string{"name"}),
}
}

type Cache[T any] struct {
cache *lru.Cache[string, T]
name string
metrics *cacheMetrics
}

func newCache[T any](name string, size int, metrics *cacheMetrics) (cacheInterface[T], error) {
if size <= 0 {
return &noopCache[T]{}, nil
}
cache, err := lru.NewWithEvict(size, func(key string, value T) {
metrics.evictions.WithLabelValues(name).Inc()
metrics.size.WithLabelValues(name).Dec()
})
if err != nil {
return nil, err
}

return &Cache[T]{
cache: cache,
name: name,
metrics: metrics,
}, nil
}

func (c *Cache[T]) Get(path string) (r T) {
if reader, ok := c.cache.Get(path); ok {
c.metrics.hits.WithLabelValues(c.name).Inc()
return reader
}
c.metrics.misses.WithLabelValues(c.name).Inc()
return
}

func (c *Cache[T]) Set(path string, reader T) {
if !c.cache.Contains(path) {
c.metrics.size.WithLabelValues(c.name).Inc()
}
c.metrics.misses.WithLabelValues(c.name).Inc()
c.cache.Add(path, reader)
}

type noopCache[T any] struct {
}

func (n noopCache[T]) Get(_ string) (r T) {
return
}

func (n noopCache[T]) Set(_ string, _ T) {

}
4 changes: 3 additions & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ type Config struct {
EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"`

// Query Parquet files if available
EnableParquetQueryable bool `yaml:"enable_parquet_queryable" doc:"hidden"`
EnableParquetQueryable bool `yaml:"enable_parquet_queryable" doc:"hidden"`
ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size" doc:"hidden"`
}

var (
Expand Down Expand Up @@ -139,6 +140,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.")
f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.")
f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.")
f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] [Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.")
}

// Validate the config
Expand Down
Loading