diff --git a/go.mod b/go.mod index c64c150b833..fe03be51c3e 100644 --- a/go.mod +++ b/go.mod @@ -83,7 +83,7 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 github.com/oklog/ulid/v2 v2.1.1 github.com/parquet-go/parquet-go v0.25.1 - github.com/prometheus-community/parquet-common v0.0.0-20250708150752-0811a700a852 + github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994 github.com/prometheus/procfs v0.16.1 github.com/sercand/kuberesolver/v5 v5.1.1 github.com/tjhop/slog-gokit v0.1.4 diff --git a/go.sum b/go.sum index 30a810fd7ad..26277164a42 100644 --- a/go.sum +++ b/go.sum @@ -814,8 +814,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s= github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g= github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U= -github.com/prometheus-community/parquet-common v0.0.0-20250708150752-0811a700a852 h1:GNUP6g2eSqZbzGTdFK9D1RLQLjZxCkuuA/MkgfB/enQ= -github.com/prometheus-community/parquet-common v0.0.0-20250708150752-0811a700a852/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is= +github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994 h1:xHR2Xex5XWYl5rQKObX8sVqykPXzlL0Rytd9mKo0sss= +github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is= github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4= github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis= github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA= diff --git a/pkg/parquetconverter/converter.go b/pkg/parquetconverter/converter.go index 09abf21e055..4eca20ac0a5 100644 --- a/pkg/parquetconverter/converter.go +++ b/pkg/parquetconverter/converter.go @@ -27,6 +27,8 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/logutil" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/storage/bucket" @@ -36,6 +38,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/tsdb/users" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" + cortex_errors "github.com/cortexproject/cortex/pkg/util/errors" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" @@ -247,6 +250,10 @@ func (c *Converter) running(ctx context.Context) error { if ctx.Err() != nil { return ctx.Err() } + if c.isCausedByPermissionDenied(err) { + level.Warn(userLogger).Log("msg", "skipping convert user due to PermissionDenied", "err", err) + continue + } level.Error(userLogger).Log("msg", "failed to convert user", "err", err) } } @@ -474,7 +481,7 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin } func (c *Converter) checkConvertError(userID string, err error) (terminate bool) { - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || c.isCausedByPermissionDenied(err) { terminate = true } else { c.metrics.convertBlockFailures.WithLabelValues(userID).Inc() @@ -482,6 +489,25 @@ func (c *Converter) checkConvertError(userID string, err error) (terminate bool) return } +func (c *Converter) isCausedByPermissionDenied(err error) bool { + cause := errors.Cause(err) + res := cortex_errors.ErrorIs(cause, func(err error) bool { + return c.isPermissionDeniedErr(err) + }) + return res +} + +func (c *Converter) isPermissionDeniedErr(err error) bool { + if c.bkt.IsAccessDeniedErr(err) { + return true + } + s, ok := status.FromError(err) + if !ok { + return false + } + return s.Code() == codes.PermissionDenied +} + func (c *Converter) ownUser(r ring.ReadRing, userId string) (bool, error) { if userId == tenant.GlobalMarkersDir { // __markers__ is reserved for global markers and no tenant should be allowed to have that name. diff --git a/pkg/parquetconverter/converter_test.go b/pkg/parquetconverter/converter_test.go index 1198d4e2dc5..fc8f6e99805 100644 --- a/pkg/parquetconverter/converter_test.go +++ b/pkg/parquetconverter/converter_test.go @@ -2,10 +2,12 @@ package parquetconverter import ( "context" + "errors" "fmt" "io" "math/rand" "path" + "strings" "testing" "time" @@ -21,6 +23,8 @@ import ( "github.com/thanos-io/objstore/providers/filesystem" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" "github.com/cortexproject/cortex/integration/e2e" "github.com/cortexproject/cortex/pkg/ring" @@ -31,6 +35,7 @@ import ( "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/storage/tsdb/users" "github.com/cortexproject/cortex/pkg/util/concurrency" + cortex_errors "github.com/cortexproject/cortex/pkg/util/errors" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/test" @@ -269,7 +274,8 @@ func TestConverter_BlockConversionFailure(t *testing.T) { // Create a mock bucket that wraps the filesystem bucket but fails uploads mockBucket := &mockBucket{ - Bucket: fsBucket, + Bucket: fsBucket, + uploadFailure: fmt.Errorf("mock upload failure"), } converter := newConverter(cfg, objstore.WithNoopInstr(mockBucket), storageCfg, []int64{3600000, 7200000}, nil, reg, overrides, nil) @@ -284,13 +290,94 @@ func TestConverter_BlockConversionFailure(t *testing.T) { assert.Equal(t, 1.0, testutil.ToFloat64(converter.metrics.convertBlockFailures.WithLabelValues(userID))) } +func TestConverter_ShouldNotFailOnAccessDenyError(t *testing.T) { + // Create a new registry for testing + reg := prometheus.NewRegistry() + + // Create a new converter with test configuration + cfg := Config{ + MetaSyncConcurrency: 1, + DataDir: t.TempDir(), + } + logger := log.NewNopLogger() + storageCfg := cortex_tsdb.BlocksStorageConfig{} + flagext.DefaultValues(&storageCfg) + limits := &validation.Limits{} + flagext.DefaultValues(limits) + overrides := validation.NewOverrides(*limits, nil) + limits.ParquetConverterEnabled = true + + // Create a filesystem bucket for initial block upload + fsBucket, err := filesystem.NewBucket(t.TempDir()) + require.NoError(t, err) + + // Create test labels + lbls := labels.Labels{labels.Label{ + Name: "__name__", + Value: "test", + }} + + // Create a real TSDB block + dir := t.TempDir() + rnd := rand.New(rand.NewSource(time.Now().Unix())) + blockID, err := e2e.CreateBlock(context.Background(), rnd, dir, []labels.Labels{lbls}, 2, 0, 2*time.Hour.Milliseconds(), time.Minute.Milliseconds(), 10) + require.NoError(t, err) + bdir := path.Join(dir, blockID.String()) + + userID := "test-user" + + // Upload the block to filesystem bucket + err = block.Upload(context.Background(), logger, bucket.NewPrefixedBucketClient(fsBucket, userID), bdir, metadata.NoneFunc) + require.NoError(t, err) + + var mb *mockBucket + t.Run("get failure", func(t *testing.T) { + // Create a mock bucket that wraps the filesystem bucket but fails with permission denied error. + mb = &mockBucket{ + Bucket: fsBucket, + getFailure: cortex_errors.WithCause(errors.New("dummy error"), status.Error(codes.PermissionDenied, "dummy")), + } + }) + + t.Run("upload failure", func(t *testing.T) { + // Create a mock bucket that wraps the filesystem bucket but fails with permission denied error. + mb = &mockBucket{ + Bucket: fsBucket, + uploadFailure: cortex_errors.WithCause(errors.New("dummy error"), status.Error(codes.PermissionDenied, "dummy")), + } + }) + + converter := newConverter(cfg, objstore.WithNoopInstr(mb), storageCfg, []int64{3600000, 7200000}, nil, reg, overrides, nil) + converter.ringLifecycler = &ring.Lifecycler{ + Addr: "1.2.3.4", + } + + err = converter.convertUser(context.Background(), logger, &RingMock{ReadRing: &ring.Ring{}}, userID) + require.Error(t, err) + + // Verify the failure metric was not incremented + assert.Equal(t, 0.0, testutil.ToFloat64(converter.metrics.convertBlockFailures.WithLabelValues(userID))) +} + // mockBucket implements objstore.Bucket for testing type mockBucket struct { objstore.Bucket + uploadFailure error + getFailure error } func (m *mockBucket) Upload(ctx context.Context, name string, r io.Reader) error { - return fmt.Errorf("mock upload failure") + if m.uploadFailure != nil { + return m.uploadFailure + } + return m.Bucket.Upload(ctx, name, r) +} + +func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + if m.getFailure != nil && strings.Contains(name, "index") { + return nil, m.getFailure + } + return m.Bucket.Get(ctx, name) } type RingMock struct { diff --git a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go index 227e1e30741..703ba862905 100644 --- a/vendor/github.com/prometheus-community/parquet-common/convert/convert.go +++ b/vendor/github.com/prometheus-community/parquet-common/convert/convert.go @@ -226,25 +226,25 @@ func NewTsdbRowReader(ctx context.Context, mint, maxt, colDuration int64, blks [ for _, blk := range blks { indexr, err := blk.Index() if err != nil { - return nil, fmt.Errorf("unable to get index reader from block: %s", err) + return nil, fmt.Errorf("unable to get index reader from block: %w", err) } closers = append(closers, indexr) chunkr, err := blk.Chunks() if err != nil { - return nil, fmt.Errorf("unable to get chunk reader from block: %s", err) + return nil, fmt.Errorf("unable to get chunk reader from block: %w", err) } closers = append(closers, chunkr) tombsr, err := blk.Tombstones() if err != nil { - return nil, fmt.Errorf("unable to get tombstone reader from block: %s", err) + return nil, fmt.Errorf("unable to get tombstone reader from block: %w", err) } closers = append(closers, tombsr) lblns, err := indexr.LabelNames(ctx) if err != nil { - return nil, fmt.Errorf("unable to get label names from block: %s", err) + return nil, fmt.Errorf("unable to get label names from block: %w", err) } postings := sortedPostings(ctx, indexr, compareFunc, ops.sortedLabels...) @@ -258,7 +258,7 @@ func NewTsdbRowReader(ctx context.Context, mint, maxt, colDuration int64, blks [ s, err := b.Build() if err != nil { - return nil, fmt.Errorf("unable to build index reader from block: %s", err) + return nil, fmt.Errorf("unable to build index reader from block: %w", err) } return &TsdbRowReader{ diff --git a/vendor/github.com/prometheus-community/parquet-common/convert/writer.go b/vendor/github.com/prometheus-community/parquet-common/convert/writer.go index ddc0ab41758..63b4b304fee 100644 --- a/vendor/github.com/prometheus-community/parquet-common/convert/writer.go +++ b/vendor/github.com/prometheus-community/parquet-common/convert/writer.go @@ -77,7 +77,7 @@ func (c *ShardedWriter) Write(ctx context.Context) error { func (c *ShardedWriter) convertShards(ctx context.Context) error { for { if ok, err := c.convertShard(ctx); err != nil { - return fmt.Errorf("unable to convert shard: %s", err) + return fmt.Errorf("unable to convert shard: %w", err) } else if !ok { break } @@ -125,17 +125,17 @@ func (c *ShardedWriter) writeFile(ctx context.Context, schema *schema.TSDBSchema ctx, schema.Schema, c.outSchemasForCurrentShard(), c.pipeReaderWriter, fileOpts..., ) if err != nil { - return 0, fmt.Errorf("unable to create row writer: %s", err) + return 0, fmt.Errorf("unable to create row writer: %w", err) } n, err := parquet.CopyRows(writer, newBufferedReader(ctx, newLimitReader(c.rr, rowsToWrite))) if err != nil { - return 0, fmt.Errorf("unable to copy rows: %s", err) + return 0, fmt.Errorf("unable to copy rows: %w", err) } err = writer.Close() if err != nil { - return 0, fmt.Errorf("unable to close writer: %s", err) + return 0, fmt.Errorf("unable to close writer: %w", err) } return n, nil @@ -261,11 +261,11 @@ func (s *splitPipeFileWriter) WriteRows(rows []parquet.Row) (int, error) { convertedRows := util.CloneRows(rows) _, err := writer.conv.Convert(convertedRows) if err != nil { - return fmt.Errorf("unable to convert rows: %d", err) + return fmt.Errorf("unable to convert rows: %w", err) } n, err := writer.pw.WriteRows(convertedRows) if err != nil { - return fmt.Errorf("unable to write rows: %d", err) + return fmt.Errorf("unable to write rows: %w", err) } if n != len(rows) { return fmt.Errorf("unable to write rows: %d != %d", n, len(rows)) diff --git a/vendor/modules.txt b/vendor/modules.txt index 47c1dd06fd0..58a73b08224 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -947,7 +947,7 @@ github.com/planetscale/vtprotobuf/types/known/wrapperspb # github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 ## explicit github.com/pmezard/go-difflib/difflib -# github.com/prometheus-community/parquet-common v0.0.0-20250708150752-0811a700a852 +# github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994 ## explicit; go 1.23.4 github.com/prometheus-community/parquet-common/convert github.com/prometheus-community/parquet-common/queryable