Skip to content

Commit 289c71e

Browse files
authored
Compactor should skip retry without failure if there is permission denied error (#5727)
* Compactor should skip retry without failure if there is permission denied error Signed-off-by: Alex Le <[email protected]> * factored code Signed-off-by: Alex Le <[email protected]> --------- Signed-off-by: Alex Le <[email protected]>
1 parent 8282a2f commit 289c71e

File tree

4 files changed

+137
-3
lines changed

4 files changed

+137
-3
lines changed

pkg/compactor/compactor.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,10 @@ import (
2222
"github.com/thanos-io/thanos/pkg/block/metadata"
2323
"github.com/thanos-io/thanos/pkg/compact"
2424
"github.com/thanos-io/thanos/pkg/compact/downsample"
25+
"github.com/thanos-io/thanos/pkg/errutil"
2526
"github.com/thanos-io/thanos/pkg/extprom"
27+
"google.golang.org/grpc/codes"
28+
"google.golang.org/grpc/status"
2629

2730
"github.com/cortexproject/cortex/pkg/ring"
2831
"github.com/cortexproject/cortex/pkg/storage/bucket"
@@ -789,6 +792,10 @@ func (c *Compactor) compactUserWithRetries(ctx context.Context, userID string) e
789792
if lastErr == nil {
790793
return nil
791794
}
795+
if c.isCausedByPermissionDenied(lastErr) {
796+
level.Warn(c.logger).Log("msg", "skipping compactUser due to PermissionDenied", "user", userID, "err", lastErr)
797+
return nil
798+
}
792799

793800
retries.Wait()
794801
}
@@ -1024,3 +1031,30 @@ func (c *Compactor) listTenantsWithMetaSyncDirectories() map[string]struct{} {
10241031

10251032
return result
10261033
}
1034+
1035+
func (c *Compactor) isCausedByPermissionDenied(err error) bool {
1036+
cause := errors.Cause(err)
1037+
if compact.IsRetryError(cause) || compact.IsHaltError(cause) {
1038+
cause = errors.Unwrap(cause)
1039+
}
1040+
if multiErr, ok := cause.(errutil.NonNilMultiRootError); ok {
1041+
for _, err := range multiErr {
1042+
if c.isPermissionDeniedErr(err) {
1043+
return true
1044+
}
1045+
}
1046+
return false
1047+
}
1048+
return c.isPermissionDeniedErr(cause)
1049+
}
1050+
1051+
func (c *Compactor) isPermissionDeniedErr(err error) bool {
1052+
if c.bucketClient.IsAccessDeniedErr(err) {
1053+
return true
1054+
}
1055+
s, ok := status.FromError(err)
1056+
if !ok {
1057+
return false
1058+
}
1059+
return s.Code() == codes.PermissionDenied
1060+
}

pkg/compactor/compactor_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2028,3 +2028,103 @@ func TestCompactor_ShouldNotTreatInterruptionsAsErrors(t *testing.T) {
20282028
require.Contains(t, lines, `level=info component=compactor msg="interrupting compaction of user blocks" user=user-1`)
20292029
require.NotContains(t, logs.String(), `level=error`)
20302030
}
2031+
2032+
func TestCompactor_ShouldNotFailCompactionIfAccessDeniedErrDuringMetaSync(t *testing.T) {
2033+
t.Parallel()
2034+
2035+
ss := bucketindex.Status{Status: bucketindex.Ok, Version: bucketindex.SyncStatusFileVersion}
2036+
content, err := json.Marshal(ss)
2037+
require.NoError(t, err)
2038+
2039+
bucketClient := &bucket.ClientMock{}
2040+
bucketClient.MockIter("__markers__", []string{}, nil)
2041+
bucketClient.MockIter("", []string{"user-1"}, nil)
2042+
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil)
2043+
bucketClient.MockIter("user-1/markers/", nil, nil)
2044+
bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil)
2045+
bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil)
2046+
bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil)
2047+
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
2048+
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
2049+
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), bucket.ErrKeyPermissionDenied)
2050+
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", bucket.ErrKeyPermissionDenied)
2051+
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", bucket.ErrKeyPermissionDenied)
2052+
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), bucket.ErrKeyPermissionDenied)
2053+
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", bucket.ErrKeyPermissionDenied)
2054+
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", bucket.ErrKeyPermissionDenied)
2055+
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
2056+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", string(content), nil)
2057+
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
2058+
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
2059+
2060+
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
2061+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
2062+
2063+
cfg := prepareConfig()
2064+
cfg.ShardingEnabled = true
2065+
cfg.ShardingRing.InstanceID = "compactor-1"
2066+
cfg.ShardingRing.InstanceAddr = "1.2.3.4"
2067+
cfg.ShardingRing.KVStore.Mock = ringStore
2068+
2069+
c, _, tsdbPlanner, _, _ := prepare(t, cfg, bucketClient, nil)
2070+
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil)
2071+
2072+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
2073+
2074+
// Wait until a run has completed.
2075+
cortex_testutil.Poll(t, 20*time.Second, 1.0, func() interface{} {
2076+
return prom_testutil.ToFloat64(c.compactionRunsCompleted)
2077+
})
2078+
2079+
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
2080+
}
2081+
2082+
func TestCompactor_ShouldNotFailCompactionIfAccessDeniedErrReturnedFromBucket(t *testing.T) {
2083+
t.Parallel()
2084+
2085+
ss := bucketindex.Status{Status: bucketindex.Ok, Version: bucketindex.SyncStatusFileVersion}
2086+
content, err := json.Marshal(ss)
2087+
require.NoError(t, err)
2088+
2089+
bucketClient := &bucket.ClientMock{}
2090+
bucketClient.MockIter("__markers__", []string{}, nil)
2091+
bucketClient.MockIter("", []string{"user-1"}, nil)
2092+
bucketClient.MockIter("user-1/", []string{"user-1/01DTVP434PA9VFXSW2JKB3392D", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", "user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", "user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json"}, nil)
2093+
bucketClient.MockIter("user-1/markers/", nil, nil)
2094+
bucketClient.MockGet("user-1/markers/cleaner-visit-marker.json", "", nil)
2095+
bucketClient.MockUpload("user-1/markers/cleaner-visit-marker.json", nil)
2096+
bucketClient.MockDelete("user-1/markers/cleaner-visit-marker.json", nil)
2097+
bucketClient.MockExists(cortex_tsdb.GetGlobalDeletionMarkPath("user-1"), false, nil)
2098+
bucketClient.MockExists(cortex_tsdb.GetLocalDeletionMarkPath("user-1"), false, nil)
2099+
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", mockBlockMetaJSON("01DTVP434PA9VFXSW2JKB3392D"), nil)
2100+
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/deletion-mark.json", "", nil)
2101+
bucketClient.MockGet("user-1/01DTVP434PA9VFXSW2JKB3392D/no-compact-mark.json", "", nil)
2102+
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/meta.json", mockBlockMetaJSON("01DTW0ZCPDDNV4BV83Q2SV4QAZ"), nil)
2103+
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/deletion-mark.json", "", nil)
2104+
bucketClient.MockGet("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ/no-compact-mark.json", "", nil)
2105+
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
2106+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", string(content), nil)
2107+
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
2108+
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
2109+
2110+
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
2111+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
2112+
2113+
cfg := prepareConfig()
2114+
cfg.ShardingEnabled = true
2115+
cfg.ShardingRing.InstanceID = "compactor-1"
2116+
cfg.ShardingRing.InstanceAddr = "1.2.3.4"
2117+
cfg.ShardingRing.KVStore.Mock = ringStore
2118+
2119+
c, _, tsdbPlanner, _, _ := prepare(t, cfg, bucketClient, nil)
2120+
tsdbPlanner.On("Plan", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return([]*metadata.Meta{}, bucket.ErrKeyPermissionDenied)
2121+
2122+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
2123+
2124+
// Wait until a run has completed.
2125+
cortex_testutil.Poll(t, 20*time.Second, 1.0, func() interface{} {
2126+
return prom_testutil.ToFloat64(c.compactionRunsCompleted)
2127+
})
2128+
2129+
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
2130+
}

pkg/storage/bucket/client_mock.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414

1515
var (
1616
errObjectDoesNotExist = errors.New("object does not exist")
17-
errKeyPermissionDenied = errors.New("object key permission denied")
17+
ErrKeyPermissionDenied = errors.New("object key permission denied")
1818
)
1919

2020
// ClientMock mocks objstore.Bucket
@@ -188,7 +188,7 @@ func (m *ClientMock) IsObjNotFoundErr(err error) bool {
188188

189189
// IsAccessDeniedErr mocks objstore.Bucket.IsAccessDeniedErr()
190190
func (m *ClientMock) IsAccessDeniedErr(err error) bool {
191-
return err == errKeyPermissionDenied
191+
return err == ErrKeyPermissionDenied
192192
}
193193

194194
// ObjectSize mocks objstore.Bucket.Attributes()

pkg/storage/bucket/sse_bucket_client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ func Test_shouldWrapSSeErrors(t *testing.T) {
111111

112112
bkt := &ClientMock{}
113113

114-
bkt.MockGet("Test", "someContent", errKeyPermissionDenied)
114+
bkt.MockGet("Test", "someContent", ErrKeyPermissionDenied)
115115

116116
sseBkt := NewSSEBucketClient("user-1", bkt, cfgProvider)
117117

0 commit comments

Comments
 (0)