Skip to content

Commit 263e7e8

Browse files
committed
Fallback to the blocks queryable if vertical sharding is enabled
Signed-off-by: alanprot <[email protected]>
1 parent 5101dfe commit 263e7e8

File tree

8 files changed

+224
-51
lines changed

8 files changed

+224
-51
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ require (
8282
github.com/hashicorp/golang-lru/v2 v2.0.7
8383
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
8484
github.com/parquet-go/parquet-go v0.25.0
85-
github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73
85+
github.com/prometheus-community/parquet-common v0.0.0-20250527060536-18d3dd36c09e
8686
github.com/prometheus/procfs v0.15.1
8787
github.com/sercand/kuberesolver/v5 v5.1.1
8888
github.com/tjhop/slog-gokit v0.1.3

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1573,8 +1573,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr
15731573
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
15741574
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
15751575
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
1576-
github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73 h1:AogORrmarkYfUOI7/lqOhz9atYmLZo69vPQ/SFkPSxE=
1577-
github.com/prometheus-community/parquet-common v0.0.0-20250522182606-e046c038dc73/go.mod h1:zRW/xXBlELf8v9h9uqWvDkjOr3N5BtQGZ6LsDX9Ea/A=
1576+
github.com/prometheus-community/parquet-common v0.0.0-20250527060536-18d3dd36c09e h1:paj+lHT5gwRPg+vKgL5Crr1lwmS3TdA1EePCFEVfaD4=
1577+
github.com/prometheus-community/parquet-common v0.0.0-20250527060536-18d3dd36c09e/go.mod h1:zRW/xXBlELf8v9h9uqWvDkjOr3N5BtQGZ6LsDX9Ea/A=
15781578
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0 h1:owfYHh79h8Y5HvNMGyww+DaVwo10CKiRW1RQrrZzIwg=
15791579
github.com/prometheus-community/prom-label-proxy v0.8.1-0.20240127162815-c1195f9aabc0/go.mod h1:rT989D4UtOcfd9tVqIZRVIM8rkg+9XbreBjFNEKXvVI=
15801580
github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA=

pkg/parquetconverter/converter.go

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ import (
55
"flag"
66
"fmt"
77
"hash/fnv"
8+
"math/rand"
89
"os"
910
"path/filepath"
11+
"sort"
1012
"strings"
1113
"time"
1214

@@ -47,6 +49,8 @@ var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
4749
type Config struct {
4850
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
4951
ConversionInterval time.Duration `yaml:"conversion_interval"`
52+
MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"`
53+
FileBufferEnabled bool `yaml:"file_buffer_enabled"`
5054

5155
DataDir string `yaml:"data_dir"`
5256

@@ -85,7 +89,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
8589

8690
f.StringVar(&cfg.DataDir, "parquet-converter.data-dir", "./data", "Data directory in which to cache blocks and process conversions.")
8791
f.IntVar(&cfg.MetaSyncConcurrency, "parquet-converter.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from the long term storage.")
92+
f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Max number of rows per parquet row group.")
8893
f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "The frequency at which the conversion job runs.")
94+
f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file_buffer_enabled", true, "Whether to enable buffering the writes in disk to reduce memory utilization.")
8995
}
9096

9197
func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Converter, error) {
@@ -163,6 +169,10 @@ func (c *Converter) running(ctx context.Context) error {
163169
continue
164170
}
165171
ownedUsers := map[string]struct{}{}
172+
rand.Shuffle(len(users), func(i, j int) {
173+
users[i], users[j] = users[j], users[i]
174+
})
175+
166176
for _, userID := range users {
167177
if !c.limits.ParquetConverterEnabled(userID) {
168178
continue
@@ -293,11 +303,20 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
293303
return errors.Wrap(err, "error creating block fetcher")
294304
}
295305

296-
blocks, _, err := fetcher.Fetch(ctx)
306+
blks, _, err := fetcher.Fetch(ctx)
297307
if err != nil {
298308
return errors.Wrapf(err, "failed to fetch blocks for user %s", userID)
299309
}
300310

311+
blocks := make([]*metadata.Meta, 0, len(blks))
312+
for _, blk := range blks {
313+
blocks = append(blocks, blk)
314+
}
315+
316+
sort.Slice(blocks, func(i, j int) bool {
317+
return blocks[i].MinTime > blocks[j].MinTime
318+
})
319+
301320
for _, b := range blocks {
302321
ok, err := c.ownBlock(ring, b.ULID.String())
303322
if err != nil {
@@ -345,22 +364,31 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
345364
}
346365

347366
level.Info(logger).Log("msg", "converting block", "block", b.ULID.String(), "dir", bdir)
367+
extraOpts := []convert.ConvertOption{
368+
convert.WithSortBy(labels.MetricName),
369+
convert.WithColDuration(time.Hour * 8),
370+
convert.WithRowGroupSize(c.cfg.MaxRowsPerRowGroup),
371+
convert.WithName(b.ULID.String()),
372+
}
373+
374+
if c.cfg.FileBufferEnabled {
375+
extraOpts = append(extraOpts, convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*")))
376+
}
377+
348378
_, err = convert.ConvertTSDBBlock(
349379
ctx,
350380
uBucket,
351381
tsdbBlock.MinTime(),
352382
tsdbBlock.MaxTime(),
353383
[]convert.Convertible{tsdbBlock},
354-
convert.WithSortBy(labels.MetricName),
355-
convert.WithColDuration(time.Hour*8),
356-
convert.WithName(b.ULID.String()),
357-
convert.WithColumnPageBuffers(parquet.NewFileBufferPool(bdir, "buffers.*")),
384+
extraOpts...,
358385
)
359386

360387
_ = tsdbBlock.Close()
361388

362389
if err != nil {
363390
level.Error(logger).Log("msg", "Error converting block", "err", err)
391+
continue
364392
}
365393

366394
err = cortex_parquet.WriteConverterMark(ctx, b.ULID, uBucket)

pkg/querier/parquet_queryable.go

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"time"
66

77
"github.com/go-kit/log"
8+
"github.com/go-kit/log/level"
89
"github.com/pkg/errors"
910
"github.com/prometheus-community/parquet-common/schema"
1011
"github.com/prometheus-community/parquet-common/search"
@@ -22,8 +23,10 @@ import (
2223
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
2324
"github.com/cortexproject/cortex/pkg/tenant"
2425
"github.com/cortexproject/cortex/pkg/util"
26+
util_log "github.com/cortexproject/cortex/pkg/util/log"
2527
"github.com/cortexproject/cortex/pkg/util/multierror"
2628
"github.com/cortexproject/cortex/pkg/util/services"
29+
"github.com/cortexproject/cortex/pkg/util/validation"
2730
)
2831

2932
type parquetQueryableFallbackMetrics struct {
@@ -59,12 +62,15 @@ type parquetQueryableWithFallback struct {
5962

6063
// metrics
6164
metrics *parquetQueryableFallbackMetrics
65+
66+
limits *validation.Overrides
67+
logger log.Logger
6268
}
6369

6470
func NewParquetQueryable(
6571
config Config,
6672
storageCfg cortex_tsdb.BlocksStorageConfig,
67-
limits BlocksStoreLimits,
73+
limits *validation.Overrides,
6874
blockStorageQueryable *BlocksStoreQueryable,
6975
logger log.Logger,
7076
reg prometheus.Registerer,
@@ -115,6 +121,8 @@ func NewParquetQueryable(
115121
subservicesWatcher: services.NewFailureWatcher(),
116122
finder: blockStorageQueryable.finder,
117123
metrics: newParquetQueryableFallbackMetrics(reg),
124+
limits: limits,
125+
logger: logger,
118126
}
119127

120128
p.Service = services.NewBasicService(p.starting, p.running, p.stopping)
@@ -164,6 +172,8 @@ func (p *parquetQueryableWithFallback) Querier(mint, maxt int64) (storage.Querie
164172
blocksStoreQuerier: bsq,
165173
finder: p.finder,
166174
metrics: p.metrics,
175+
limits: p.limits,
176+
logger: p.logger,
167177
}, nil
168178
}
169179

@@ -181,9 +191,24 @@ type parquetQuerierWithFallback struct {
181191

182192
// metrics
183193
metrics *parquetQueryableFallbackMetrics
194+
195+
limits *validation.Overrides
196+
logger log.Logger
184197
}
185198

186199
func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
200+
userID, err := tenant.TenantID(ctx)
201+
if err != nil {
202+
return nil, nil, err
203+
}
204+
205+
if q.limits.QueryVerticalShardSize(userID) > 1 {
206+
uLogger := util_log.WithUserID(userID, q.logger)
207+
level.Warn(uLogger).Log("msg", "parquet queryable enabled but vertival sharding > 0. Falling back to the block storage")
208+
209+
return q.blocksStoreQuerier.LabelValues(ctx, name, hints, matchers...)
210+
}
211+
187212
remaining, parquet, err := q.getBlocks(ctx, q.minT, q.maxT)
188213
if err != nil {
189214
return nil, nil, err
@@ -229,6 +254,18 @@ func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name strin
229254
}
230255

231256
func (q *parquetQuerierWithFallback) LabelNames(ctx context.Context, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
257+
userID, err := tenant.TenantID(ctx)
258+
if err != nil {
259+
return nil, nil, err
260+
}
261+
262+
if q.limits.QueryVerticalShardSize(userID) > 1 {
263+
uLogger := util_log.WithUserID(userID, q.logger)
264+
level.Warn(uLogger).Log("msg", "parquet queryable enabled but vertival sharding > 0. Falling back to the block storage")
265+
266+
return q.blocksStoreQuerier.LabelNames(ctx, hints, matchers...)
267+
}
268+
232269
remaining, parquet, err := q.getBlocks(ctx, q.minT, q.maxT)
233270
if err != nil {
234271
return nil, nil, err
@@ -275,6 +312,18 @@ func (q *parquetQuerierWithFallback) LabelNames(ctx context.Context, hints *stor
275312
}
276313

277314
func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
315+
userID, err := tenant.TenantID(ctx)
316+
if err != nil {
317+
storage.ErrSeriesSet(err)
318+
}
319+
320+
if q.limits.QueryVerticalShardSize(userID) > 1 {
321+
uLogger := util_log.WithUserID(userID, q.logger)
322+
level.Warn(uLogger).Log("msg", "parquet queryable enabled but vertival sharding > 0. Falling back to the block storage")
323+
324+
return q.blocksStoreQuerier.Select(ctx, sortSeries, hints, matchers...)
325+
}
326+
278327
mint, maxt, limit := q.minT, q.maxT, 0
279328

280329
if hints != nil {
@@ -288,6 +337,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
288337

289338
serieSets := []storage.SeriesSet{}
290339

340+
// Lets sort the series to merge
341+
if len(parquet) > 0 && len(remaining) > 0 {
342+
sortSeries = true
343+
}
344+
291345
if len(parquet) > 0 {
292346
serieSets = append(serieSets, q.parquetQuerier.Select(InjectBlocksIntoContext(ctx, parquet...), sortSeries, hints, matchers...))
293347
}

0 commit comments

Comments
 (0)