Skip to content

Commit d3ad114

Browse files
committed
ignore permission denied error in parquet converter
Signed-off-by: yeya24 <[email protected]>
1 parent e4b0f1b commit d3ad114

File tree

2 files changed

+105
-3
lines changed

2 files changed

+105
-3
lines changed

pkg/parquetconverter/converter.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"github.com/thanos-io/thanos/pkg/block/metadata"
2828
"github.com/thanos-io/thanos/pkg/extprom"
2929
"github.com/thanos-io/thanos/pkg/logutil"
30+
"google.golang.org/grpc/codes"
31+
"google.golang.org/grpc/status"
3032

3133
"github.com/cortexproject/cortex/pkg/ring"
3234
"github.com/cortexproject/cortex/pkg/storage/bucket"
@@ -36,6 +38,7 @@ import (
3638
"github.com/cortexproject/cortex/pkg/storage/tsdb/users"
3739
"github.com/cortexproject/cortex/pkg/tenant"
3840
"github.com/cortexproject/cortex/pkg/util"
41+
cortex_errors "github.com/cortexproject/cortex/pkg/util/errors"
3942
util_log "github.com/cortexproject/cortex/pkg/util/log"
4043
"github.com/cortexproject/cortex/pkg/util/services"
4144
"github.com/cortexproject/cortex/pkg/util/validation"
@@ -247,6 +250,10 @@ func (c *Converter) running(ctx context.Context) error {
247250
if ctx.Err() != nil {
248251
return ctx.Err()
249252
}
253+
if c.isCausedByPermissionDenied(err) {
254+
level.Warn(userLogger).Log("msg", "skipping convert user due to PermissionDenied", "err", err)
255+
continue
256+
}
250257
level.Error(userLogger).Log("msg", "failed to convert user", "err", err)
251258
}
252259
}
@@ -474,14 +481,33 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
474481
}
475482

476483
func (c *Converter) checkConvertError(userID string, err error) (terminate bool) {
477-
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
484+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || c.isCausedByPermissionDenied(err) {
478485
terminate = true
479486
} else {
480487
c.metrics.convertBlockFailures.WithLabelValues(userID).Inc()
481488
}
482489
return
483490
}
484491

492+
func (c *Converter) isCausedByPermissionDenied(err error) bool {
493+
cause := errors.Cause(err)
494+
res := cortex_errors.ErrorIs(cause, func(err error) bool {
495+
return c.isPermissionDeniedErr(err)
496+
})
497+
return res
498+
}
499+
500+
func (c *Converter) isPermissionDeniedErr(err error) bool {
501+
if c.bkt.IsAccessDeniedErr(err) {
502+
return true
503+
}
504+
s, ok := status.FromError(err)
505+
if !ok {
506+
return false
507+
}
508+
return s.Code() == codes.PermissionDenied
509+
}
510+
485511
func (c *Converter) ownUser(r ring.ReadRing, userId string) (bool, error) {
486512
if userId == tenant.GlobalMarkersDir {
487513
// __markers__ is reserved for global markers and no tenant should be allowed to have that name.

pkg/parquetconverter/converter_test.go

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ package parquetconverter
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"io"
78
"math/rand"
89
"path"
10+
"strings"
911
"testing"
1012
"time"
1113

@@ -21,6 +23,8 @@ import (
2123
"github.com/thanos-io/objstore/providers/filesystem"
2224
"github.com/thanos-io/thanos/pkg/block"
2325
"github.com/thanos-io/thanos/pkg/block/metadata"
26+
"google.golang.org/grpc/codes"
27+
"google.golang.org/grpc/status"
2428

2529
"github.com/cortexproject/cortex/integration/e2e"
2630
"github.com/cortexproject/cortex/pkg/ring"
@@ -31,6 +35,7 @@ import (
3135
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
3236
"github.com/cortexproject/cortex/pkg/storage/tsdb/users"
3337
"github.com/cortexproject/cortex/pkg/util/concurrency"
38+
cortex_errors "github.com/cortexproject/cortex/pkg/util/errors"
3439
"github.com/cortexproject/cortex/pkg/util/flagext"
3540
"github.com/cortexproject/cortex/pkg/util/services"
3641
"github.com/cortexproject/cortex/pkg/util/test"
@@ -269,7 +274,8 @@ func TestConverter_BlockConversionFailure(t *testing.T) {
269274

270275
// Create a mock bucket that wraps the filesystem bucket but fails uploads
271276
mockBucket := &mockBucket{
272-
Bucket: fsBucket,
277+
Bucket: fsBucket,
278+
uploadFailure: fmt.Errorf("mock upload failure"),
273279
}
274280

275281
converter := newConverter(cfg, objstore.WithNoopInstr(mockBucket), storageCfg, []int64{3600000, 7200000}, nil, reg, overrides, nil)
@@ -284,13 +290,83 @@ func TestConverter_BlockConversionFailure(t *testing.T) {
284290
assert.Equal(t, 1.0, testutil.ToFloat64(converter.metrics.convertBlockFailures.WithLabelValues(userID)))
285291
}
286292

293+
func TestConverter_ShouldNotFailOnAccessDenyError(t *testing.T) {
294+
// Create a new registry for testing
295+
reg := prometheus.NewRegistry()
296+
297+
// Create a new converter with test configuration
298+
cfg := Config{
299+
MetaSyncConcurrency: 1,
300+
DataDir: t.TempDir(),
301+
}
302+
logger := log.NewNopLogger()
303+
storageCfg := cortex_tsdb.BlocksStorageConfig{}
304+
flagext.DefaultValues(&storageCfg)
305+
limits := &validation.Limits{}
306+
flagext.DefaultValues(limits)
307+
overrides := validation.NewOverrides(*limits, nil)
308+
limits.ParquetConverterEnabled = true
309+
310+
// Create a filesystem bucket for initial block upload
311+
fsBucket, err := filesystem.NewBucket(t.TempDir())
312+
require.NoError(t, err)
313+
314+
// Create test labels
315+
lbls := labels.Labels{labels.Label{
316+
Name: "__name__",
317+
Value: "test",
318+
}}
319+
320+
// Create a real TSDB block
321+
dir := t.TempDir()
322+
rnd := rand.New(rand.NewSource(time.Now().Unix()))
323+
blockID, err := e2e.CreateBlock(context.Background(), rnd, dir, []labels.Labels{lbls}, 2, 0, 2*time.Hour.Milliseconds(), time.Minute.Milliseconds(), 10)
324+
require.NoError(t, err)
325+
bdir := path.Join(dir, blockID.String())
326+
327+
userID := "test-user"
328+
329+
// Upload the block to filesystem bucket
330+
err = block.Upload(context.Background(), logger, bucket.NewPrefixedBucketClient(fsBucket, userID), bdir, metadata.NoneFunc)
331+
require.NoError(t, err)
332+
333+
// Create a mock bucket that wraps the filesystem bucket but fails with permission denied error.
334+
mockBucket := &mockBucket{
335+
Bucket: fsBucket,
336+
getFailure: cortex_errors.WithCause(errors.New("dummy error"), status.Error(codes.PermissionDenied, "dummy")),
337+
}
338+
339+
converter := newConverter(cfg, objstore.WithNoopInstr(mockBucket), storageCfg, []int64{3600000, 7200000}, nil, reg, overrides, nil)
340+
converter.ringLifecycler = &ring.Lifecycler{
341+
Addr: "1.2.3.4",
342+
}
343+
344+
err = converter.convertUser(context.Background(), logger, &RingMock{ReadRing: &ring.Ring{}}, userID)
345+
require.Error(t, err)
346+
347+
// Verify the failure metric was not incremented
348+
assert.Equal(t, 0.0, testutil.ToFloat64(converter.metrics.convertBlockFailures.WithLabelValues(userID)))
349+
}
350+
287351
// mockBucket implements objstore.Bucket for testing
288352
type mockBucket struct {
289353
objstore.Bucket
354+
uploadFailure error
355+
getFailure error
290356
}
291357

292358
func (m *mockBucket) Upload(ctx context.Context, name string, r io.Reader) error {
293-
return fmt.Errorf("mock upload failure")
359+
if m.uploadFailure != nil {
360+
return m.uploadFailure
361+
}
362+
return m.Bucket.Upload(ctx, name, r)
363+
}
364+
365+
func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
366+
if m.getFailure != nil && strings.Contains(name, "index") {
367+
return nil, m.getFailure
368+
}
369+
return m.Bucket.Get(ctx, name)
294370
}
295371

296372
type RingMock struct {

0 commit comments

Comments
 (0)