Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,23 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog
}
level.Info(userLogger).Log("msg", "cleaning up remaining blocks data for tenant marked for deletion")
// Let's do final cleanup of tenant.
err = c.deleteNonDataFiles(ctx, userLogger, userBucket)
if err != nil {
return err
}

if deleted, err := bucket.DeletePrefix(ctx, userBucket, bucketindex.MarkersPathname, userLogger); err != nil {
return errors.Wrap(err, "failed to delete marker files")
} else if deleted > 0 {
level.Info(userLogger).Log("msg", "deleted marker files for tenant marked for deletion", "count", deleted)
}
if err := cortex_tsdb.DeleteTenantDeletionMark(ctx, c.bucketClient, userID); err != nil {
return errors.Wrap(err, "failed to delete tenant deletion mark")
}
return nil
}

func (c *BlocksCleaner) deleteNonDataFiles(ctx context.Context, userLogger log.Logger, userBucket objstore.InstrumentedBucket) error {
if deleted, err := bucket.DeletePrefix(ctx, userBucket, block.DebugMetas, userLogger); err != nil {
return errors.Wrap(err, "failed to delete "+block.DebugMetas)
} else if deleted > 0 {
Expand All @@ -489,15 +506,6 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userLog
level.Info(userLogger).Log("msg", "deleted files under "+PartitionedGroupDirectory+" for tenant marked for deletion", "count", deleted)
}
}

if deleted, err := bucket.DeletePrefix(ctx, userBucket, bucketindex.MarkersPathname, userLogger); err != nil {
return errors.Wrap(err, "failed to delete marker files")
} else if deleted > 0 {
level.Info(userLogger).Log("msg", "deleted marker files for tenant marked for deletion", "count", deleted)
}
if err := cortex_tsdb.DeleteTenantDeletionMark(ctx, c.bucketClient, userID); err != nil {
return errors.Wrap(err, "failed to delete tenant deletion mark")
}
return nil
}

Expand Down Expand Up @@ -547,6 +555,9 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
if err := bucketindex.DeleteIndexSyncStatus(ctx, c.bucketClient, userID); err != nil {
level.Warn(userLogger).Log("msg", "error deleting index sync status when index is empty", "err", err)
}
if err := c.deleteNonDataFiles(ctx, userLogger, userBucket); err != nil {
level.Warn(userLogger).Log("msg", "error deleting non-data files", "err", err)
}
} else {
bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, idxs, userLogger)
}
Expand Down
31 changes: 30 additions & 1 deletion pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,8 @@ func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {
DeletionDelay: time.Hour,
CleanupInterval: time.Minute,
CleanupConcurrency: 1,
ShardingStrategy: util.ShardingStrategyShuffle,
CompactionStrategy: util.CompactionStrategyPartitioning,
}

ctx := context.Background()
Expand All @@ -929,14 +931,41 @@ func TestBlocksCleaner_DeleteEmptyBucketIndex(t *testing.T) {

userBucket := bucket.NewUserBucketClient(userID, bucketClient, cfgProvider)

err := cleaner.cleanUser(ctx, logger, userBucket, userID, false)
debugMetaFile := path.Join(block.DebugMetas, "meta.json")
require.NoError(t, userBucket.Upload(context.Background(), debugMetaFile, strings.NewReader("some random content here")))

partitionedGroupInfo := PartitionedGroupInfo{
PartitionedGroupID: 1234,
PartitionCount: 1,
Partitions: []Partition{
{
PartitionID: 0,
Blocks: []ulid.ULID{},
},
},
RangeStart: 0,
RangeEnd: 2,
CreationTime: time.Now().Add(-5 * time.Minute).Unix(),
Version: PartitionedGroupInfoVersion1,
}
_, err := UpdatePartitionedGroupInfo(ctx, userBucket, logger, partitionedGroupInfo)
require.NoError(t, err)
partitionedGroupFile := GetPartitionedGroupFile(partitionedGroupInfo.PartitionedGroupID)

err = cleaner.cleanUser(ctx, logger, userBucket, userID, false)
require.NoError(t, err)

_, err = bucketindex.ReadIndex(ctx, bucketClient, userID, cfgProvider, logger)
require.ErrorIs(t, err, bucketindex.ErrIndexNotFound)

_, err = userBucket.WithExpectedErrs(userBucket.IsObjNotFoundErr).Get(ctx, bucketindex.SyncStatusFile)
require.True(t, userBucket.IsObjNotFoundErr(err))

_, err = userBucket.WithExpectedErrs(userBucket.IsObjNotFoundErr).Get(ctx, debugMetaFile)
require.True(t, userBucket.IsObjNotFoundErr(err))

_, err = userBucket.WithExpectedErrs(userBucket.IsObjNotFoundErr).Get(ctx, partitionedGroupFile)
require.True(t, userBucket.IsObjNotFoundErr(err))
}

type mockConfigProvider struct {
Expand Down
Loading