Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
* [ENHANCEMENT] Querier: Support query limits in parquet queryable. #6870
* [ENHANCEMENT] Ring: Add zone label to ring_members metric. #6900
* [ENHANCEMENT] Ingester: Add new metric `cortex_ingester_push_errors_total` to track reasons for ingester request failures. #6901
* [ENHANCEMENT] Parquet Storage: Allow Parquet Queryable to disable fallback to Store Gateway. #6920
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
35 changes: 34 additions & 1 deletion pkg/querier/parquet_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package querier
import (
"context"
"fmt"
"strings"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -50,7 +51,9 @@ const (
parquetBlockStore blockStoreType = "parquet"
)

var validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore}
var (
validBlockStoreTypes = []blockStoreType{tsdbBlockStore, parquetBlockStore}
)

// AddBlockStoreTypeToContext checks HTTP header and set block store key to context if
// relevant header is set.
Expand Down Expand Up @@ -91,6 +94,7 @@ func newParquetQueryableFallbackMetrics(reg prometheus.Registerer) *parquetQuery
type parquetQueryableWithFallback struct {
services.Service

fallbackDisabled bool
queryStoreAfter time.Duration
parquetQueryable storage.Queryable
blockStorageQueryable *BlocksStoreQueryable
Expand Down Expand Up @@ -255,6 +259,7 @@ func NewParquetQueryable(
limits: limits,
logger: logger,
defaultBlockStoreType: blockStoreType(config.ParquetQueryableDefaultBlockStore),
fallbackDisabled: config.ParquetQueryableFallbackDisabled,
}

p.Service = services.NewBasicService(p.starting, p.running, p.stopping)
Expand Down Expand Up @@ -307,6 +312,7 @@ func (p *parquetQueryableWithFallback) Querier(mint, maxt int64) (storage.Querie
limits: p.limits,
logger: p.logger,
defaultBlockStoreType: p.defaultBlockStoreType,
fallbackDisabled: p.fallbackDisabled,
}, nil
}

Expand All @@ -329,6 +335,8 @@ type parquetQuerierWithFallback struct {
logger log.Logger

defaultBlockStoreType blockStoreType

fallbackDisabled bool
}

func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name string, hints *storage.LabelHints, matchers ...*labels.Matcher) ([]string, annotations.Annotations, error) {
Expand All @@ -351,6 +359,10 @@ func (q *parquetQuerierWithFallback) LabelValues(ctx context.Context, name strin
rAnnotations annotations.Annotations
)

if len(remaining) > 0 && q.fallbackDisabled {
return nil, nil, parquetConsistencyCheckError(remaining)
}

if len(parquet) > 0 {
res, ann, qErr := q.parquetQuerier.LabelValues(InjectBlocksIntoContext(ctx, parquet...), name, hints, matchers...)
if qErr != nil {
Expand Down Expand Up @@ -401,6 +413,10 @@ func (q *parquetQuerierWithFallback) LabelNames(ctx context.Context, hints *stor
rAnnotations annotations.Annotations
)

if len(remaining) > 0 && q.fallbackDisabled {
return nil, nil, parquetConsistencyCheckError(remaining)
}

if len(parquet) > 0 {
res, ann, qErr := q.parquetQuerier.LabelNames(InjectBlocksIntoContext(ctx, parquet...), hints, matchers...)
if qErr != nil {
Expand Down Expand Up @@ -466,6 +482,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
return storage.ErrSeriesSet(err)
}

if len(remaining) > 0 && q.fallbackDisabled {
err = parquetConsistencyCheckError(remaining)
return storage.ErrSeriesSet(err)
}

// Lets sort the series to merge
if len(parquet) > 0 && len(remaining) > 0 {
sortSeries = true
Expand Down Expand Up @@ -691,3 +712,15 @@ func extractShardMatcherFromContext(ctx context.Context) (*storepb.ShardMatcher,

return nil, false
}

func parquetConsistencyCheckError(blocks []*bucketindex.Block) error {
return fmt.Errorf("consistency check failed because some blocks were not available as parquet files: %s", strings.Join(convertBlockULIDToString(blocks), " "))
}

func convertBlockULIDToString(blocks []*bucketindex.Block) []string {
res := make([]string, len(blocks))
for idx, b := range blocks {
res[idx] = b.ID.String()
}
return res
}
154 changes: 154 additions & 0 deletions pkg/querier/parquet_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,3 +716,157 @@ func TestMaterializedLabelsFilterCallback(t *testing.T) {
})
}
}

func TestParquetQueryableFallbackDisabled(t *testing.T) {
block1 := ulid.MustNew(1, nil)
block2 := ulid.MustNew(2, nil)
minT := int64(10)
maxT := util.TimeToMillis(time.Now())

createStore := func() *blocksStoreSetMock {
return &blocksStoreSetMock{mockedResponses: []interface{}{
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{remoteAddr: "1.1.1.1",
mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{{Name: labels.MetricName, Value: "fromSg"}}, []cortexpb.Sample{{Value: 1, TimestampMs: minT}, {Value: 2, TimestampMs: minT + 1}}, nil, nil),
mockHintsResponse(block1, block2),
},
mockedLabelNamesResponse: &storepb.LabelNamesResponse{
Names: namesFromSeries(labels.FromMap(map[string]string{labels.MetricName: "fromSg", "fromSg": "fromSg"})),
Warnings: []string{},
Hints: mockNamesHints(block1, block2),
},
mockedLabelValuesResponse: &storepb.LabelValuesResponse{
Values: valuesFromSeries(labels.MetricName, labels.FromMap(map[string]string{labels.MetricName: "fromSg", "fromSg": "fromSg"})),
Warnings: []string{},
Hints: mockValuesHints(block1, block2),
},
}: {block1, block2}},
},
}
}

matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, labels.MetricName, "fromSg"),
}
ctx := user.InjectOrgID(context.Background(), "user-1")

t.Run("should return consistency check errors when fallback disabled and some blocks not available as parquet", func(t *testing.T) {
finder := &blocksFinderMock{}
stores := createStore()

q := &blocksStoreQuerier{
minT: minT,
maxT: maxT,
finder: finder,
stores: stores,
consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil),
logger: log.NewNopLogger(),
metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()),
limits: &blocksStoreLimitsMock{},

storeGatewayConsistencyCheckMaxAttempts: 3,
}

mParquetQuerier := &mockParquetQuerier{}
pq := &parquetQuerierWithFallback{
minT: minT,
maxT: maxT,
finder: finder,
blocksStoreQuerier: q,
parquetQuerier: mParquetQuerier,
queryStoreAfter: time.Hour,
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
limits: defaultOverrides(t, 0),
logger: log.NewNopLogger(),
defaultBlockStoreType: parquetBlockStore,
fallbackDisabled: true, // Disable fallback
}

// Set up blocks where block1 has parquet metadata but block2 doesn't
finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything).Return(bucketindex.Blocks{
&bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet
&bucketindex.Block{ID: block2}, // Not available as parquet
}, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil)

expectedError := fmt.Sprintf("consistency check failed because some blocks were not available as parquet files: %s", block2.String())

t.Run("select should return consistency check error", func(t *testing.T) {
ss := pq.Select(ctx, true, nil, matchers...)
require.Error(t, ss.Err())
require.Contains(t, ss.Err().Error(), expectedError)
})

t.Run("labelNames should return consistency check error", func(t *testing.T) {
_, _, err := pq.LabelNames(ctx, nil, matchers...)
require.Error(t, err)
require.Contains(t, err.Error(), expectedError)
})

t.Run("labelValues should return consistency check error", func(t *testing.T) {
_, _, err := pq.LabelValues(ctx, labels.MetricName, nil, matchers...)
require.Error(t, err)
require.Contains(t, err.Error(), expectedError)
})
})

t.Run("should work normally when all blocks are available as parquet and fallback disabled", func(t *testing.T) {
finder := &blocksFinderMock{}
stores := createStore()

q := &blocksStoreQuerier{
minT: minT,
maxT: maxT,
finder: finder,
stores: stores,
consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil),
logger: log.NewNopLogger(),
metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()),
limits: &blocksStoreLimitsMock{},

storeGatewayConsistencyCheckMaxAttempts: 3,
}

mParquetQuerier := &mockParquetQuerier{}
pq := &parquetQuerierWithFallback{
minT: minT,
maxT: maxT,
finder: finder,
blocksStoreQuerier: q,
parquetQuerier: mParquetQuerier,
queryStoreAfter: time.Hour,
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
limits: defaultOverrides(t, 0),
logger: log.NewNopLogger(),
defaultBlockStoreType: parquetBlockStore,
fallbackDisabled: true, // Disable fallback
}

// Set up blocks where both blocks have parquet metadata
finder.On("GetBlocks", mock.Anything, "user-1", minT, mock.Anything).Return(bucketindex.Blocks{
&bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet
&bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, // Available as parquet
}, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil)

t.Run("select should work without error", func(t *testing.T) {
mParquetQuerier.Reset()
ss := pq.Select(ctx, true, nil, matchers...)
require.NoError(t, ss.Err())
require.Len(t, mParquetQuerier.queriedBlocks, 2)
})

t.Run("labelNames should work without error", func(t *testing.T) {
mParquetQuerier.Reset()
_, _, err := pq.LabelNames(ctx, nil, matchers...)
require.NoError(t, err)
require.Len(t, mParquetQuerier.queriedBlocks, 2)
})

t.Run("labelValues should work without error", func(t *testing.T) {
mParquetQuerier.Reset()
_, _, err := pq.LabelValues(ctx, labels.MetricName, nil, matchers...)
require.NoError(t, err)
require.Len(t, mParquetQuerier.queriedBlocks, 2)
})
})
}
2 changes: 2 additions & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type Config struct {
EnableParquetQueryable bool `yaml:"enable_parquet_queryable" doc:"hidden"`
ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size" doc:"hidden"`
ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store" doc:"hidden"`
ParquetQueryableFallbackDisabled bool `yaml:"parquet_queryable_fallback_disabled" doc:"hidden"`
}

var (
Expand Down Expand Up @@ -145,6 +146,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.")
f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] [Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.")
f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.")
f.BoolVar(&cfg.ParquetQueryableFallbackDisabled, "querier.parquet-queryable-fallback-disabled", false, "[Experimental] Disable Parquet queryable to fallback queries to Store Gateway if the block is not available as Parquet files but available in TSDB. Setting this to true will disable the fallback and users can remove Store Gateway. But need to make sure Parquet files are created before it is queryable.")
}

// Validate the config
Expand Down