Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/oklog/ulid/v2 v2.1.1
github.com/parquet-go/parquet-go v0.25.1
github.com/prometheus-community/parquet-common v0.0.0-20250708150752-0811a700a852
github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994
github.com/prometheus/procfs v0.16.1
github.com/sercand/kuberesolver/v5 v5.1.1
github.com/tjhop/slog-gokit v0.1.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -814,8 +814,8 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
github.com/prometheus-community/parquet-common v0.0.0-20250708150752-0811a700a852 h1:GNUP6g2eSqZbzGTdFK9D1RLQLjZxCkuuA/MkgfB/enQ=
github.com/prometheus-community/parquet-common v0.0.0-20250708150752-0811a700a852/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is=
github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994 h1:xHR2Xex5XWYl5rQKObX8sVqykPXzlL0Rytd9mKo0sss=
github.com/prometheus-community/parquet-common v0.0.0-20250708210438-f89902fcd994/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is=
github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4=
github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis=
github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA=
Expand Down
28 changes: 27 additions & 1 deletion pkg/parquetconverter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/logutil"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/storage/bucket"
Expand All @@ -36,6 +38,7 @@ import (
"github.com/cortexproject/cortex/pkg/storage/tsdb/users"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
cortex_errors "github.com/cortexproject/cortex/pkg/util/errors"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/validation"
Expand Down Expand Up @@ -247,6 +250,10 @@ func (c *Converter) running(ctx context.Context) error {
if ctx.Err() != nil {
return ctx.Err()
}
if c.isCausedByPermissionDenied(err) {
level.Warn(userLogger).Log("msg", "skipping convert user due to PermissionDenied", "err", err)
continue
}
level.Error(userLogger).Log("msg", "failed to convert user", "err", err)
}
}
Expand Down Expand Up @@ -474,14 +481,33 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
}

func (c *Converter) checkConvertError(userID string, err error) (terminate bool) {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || c.isCausedByPermissionDenied(err) {
terminate = true
} else {
c.metrics.convertBlockFailures.WithLabelValues(userID).Inc()
}
return
}

func (c *Converter) isCausedByPermissionDenied(err error) bool {
cause := errors.Cause(err)
res := cortex_errors.ErrorIs(cause, func(err error) bool {
return c.isPermissionDeniedErr(err)
})
return res
}

func (c *Converter) isPermissionDeniedErr(err error) bool {
if c.bkt.IsAccessDeniedErr(err) {
return true
}
s, ok := status.FromError(err)
if !ok {
return false
}
return s.Code() == codes.PermissionDenied
}

func (c *Converter) ownUser(r ring.ReadRing, userId string) (bool, error) {
if userId == tenant.GlobalMarkersDir {
// __markers__ is reserved for global markers and no tenant should be allowed to have that name.
Expand Down
91 changes: 89 additions & 2 deletions pkg/parquetconverter/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package parquetconverter

import (
"context"
"errors"
"fmt"
"io"
"math/rand"
"path"
"strings"
"testing"
"time"

Expand All @@ -21,6 +23,8 @@ import (
"github.com/thanos-io/objstore/providers/filesystem"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/cortexproject/cortex/integration/e2e"
"github.com/cortexproject/cortex/pkg/ring"
Expand All @@ -31,6 +35,7 @@ import (
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/storage/tsdb/users"
"github.com/cortexproject/cortex/pkg/util/concurrency"
cortex_errors "github.com/cortexproject/cortex/pkg/util/errors"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/cortexproject/cortex/pkg/util/test"
Expand Down Expand Up @@ -269,7 +274,8 @@ func TestConverter_BlockConversionFailure(t *testing.T) {

// Create a mock bucket that wraps the filesystem bucket but fails uploads
mockBucket := &mockBucket{
Bucket: fsBucket,
Bucket: fsBucket,
uploadFailure: fmt.Errorf("mock upload failure"),
}

converter := newConverter(cfg, objstore.WithNoopInstr(mockBucket), storageCfg, []int64{3600000, 7200000}, nil, reg, overrides, nil)
Expand All @@ -284,13 +290,94 @@ func TestConverter_BlockConversionFailure(t *testing.T) {
assert.Equal(t, 1.0, testutil.ToFloat64(converter.metrics.convertBlockFailures.WithLabelValues(userID)))
}

func TestConverter_ShouldNotFailOnAccessDenyError(t *testing.T) {
// Create a new registry for testing
reg := prometheus.NewRegistry()

// Create a new converter with test configuration
cfg := Config{
MetaSyncConcurrency: 1,
DataDir: t.TempDir(),
}
logger := log.NewNopLogger()
storageCfg := cortex_tsdb.BlocksStorageConfig{}
flagext.DefaultValues(&storageCfg)
limits := &validation.Limits{}
flagext.DefaultValues(limits)
overrides := validation.NewOverrides(*limits, nil)
limits.ParquetConverterEnabled = true

// Create a filesystem bucket for initial block upload
fsBucket, err := filesystem.NewBucket(t.TempDir())
require.NoError(t, err)

// Create test labels
lbls := labels.Labels{labels.Label{
Name: "__name__",
Value: "test",
}}

// Create a real TSDB block
dir := t.TempDir()
rnd := rand.New(rand.NewSource(time.Now().Unix()))
blockID, err := e2e.CreateBlock(context.Background(), rnd, dir, []labels.Labels{lbls}, 2, 0, 2*time.Hour.Milliseconds(), time.Minute.Milliseconds(), 10)
require.NoError(t, err)
bdir := path.Join(dir, blockID.String())

userID := "test-user"

// Upload the block to filesystem bucket
err = block.Upload(context.Background(), logger, bucket.NewPrefixedBucketClient(fsBucket, userID), bdir, metadata.NoneFunc)
require.NoError(t, err)

var mb *mockBucket
t.Run("get failure", func(t *testing.T) {
// Create a mock bucket that wraps the filesystem bucket but fails with permission denied error.
mb = &mockBucket{
Bucket: fsBucket,
getFailure: cortex_errors.WithCause(errors.New("dummy error"), status.Error(codes.PermissionDenied, "dummy")),
}
})

t.Run("upload failure", func(t *testing.T) {
// Create a mock bucket that wraps the filesystem bucket but fails with permission denied error.
mb = &mockBucket{
Bucket: fsBucket,
uploadFailure: cortex_errors.WithCause(errors.New("dummy error"), status.Error(codes.PermissionDenied, "dummy")),
}
})

converter := newConverter(cfg, objstore.WithNoopInstr(mb), storageCfg, []int64{3600000, 7200000}, nil, reg, overrides, nil)
converter.ringLifecycler = &ring.Lifecycler{
Addr: "1.2.3.4",
}

err = converter.convertUser(context.Background(), logger, &RingMock{ReadRing: &ring.Ring{}}, userID)
require.Error(t, err)

// Verify the failure metric was not incremented
assert.Equal(t, 0.0, testutil.ToFloat64(converter.metrics.convertBlockFailures.WithLabelValues(userID)))
}

// mockBucket implements objstore.Bucket for testing
type mockBucket struct {
objstore.Bucket
uploadFailure error
getFailure error
}

func (m *mockBucket) Upload(ctx context.Context, name string, r io.Reader) error {
return fmt.Errorf("mock upload failure")
if m.uploadFailure != nil {
return m.uploadFailure
}
return m.Bucket.Upload(ctx, name, r)
}

func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
if m.getFailure != nil && strings.Contains(name, "index") {
return nil, m.getFailure
}
return m.Bucket.Get(ctx, name)
}

type RingMock struct {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/modules.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading