Skip to content
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
* [BUGFIX] Distributor: update defaultReplicationStrategy to not fail with extend-write when a single instance is unhealthy. #4636
* [BUGFIX] Distributor: Fix race condition on `/series` introduced by #4683. #4716
* [BUGFIX] Distributor: Fix a memory leak in distributor due to the cluster label. #4739
* [ENHANCEMENT] Compactor: uploading blocks no compaction marks to the global location and introduce a new metric #4729
* `cortex_bucket_blocks_marked_for_no_compaction_count`: Total number of blocks marked for no compaction in the bucket.

## 1.11.0 2021-11-25

Expand Down
44 changes: 26 additions & 18 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,18 @@ type BlocksCleaner struct {
lastOwnedUsers []string

// Metrics.
runsStarted prometheus.Counter
runsCompleted prometheus.Counter
runsFailed prometheus.Counter
runsLastSuccess prometheus.Gauge
blocksCleanedTotal prometheus.Counter
blocksFailedTotal prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
tenantBlocks *prometheus.GaugeVec
tenantMarkedBlocks *prometheus.GaugeVec
tenantPartialBlocks *prometheus.GaugeVec
tenantBucketIndexLastUpdate *prometheus.GaugeVec
runsStarted prometheus.Counter
runsCompleted prometheus.Counter
runsFailed prometheus.Counter
runsLastSuccess prometheus.Gauge
blocksCleanedTotal prometheus.Counter
blocksFailedTotal prometheus.Counter
blocksMarkedForDeletion prometheus.Counter
tenantBlocks *prometheus.GaugeVec
tenantBlocksMarkedForDelete *prometheus.GaugeVec
tenantBlocksMarkedForNoCompaction *prometheus.GaugeVec
tenantPartialBlocks *prometheus.GaugeVec
tenantBucketIndexLastUpdate *prometheus.GaugeVec
}

func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner {
Expand Down Expand Up @@ -102,10 +103,14 @@ func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, use
Name: "cortex_bucket_blocks_count",
Help: "Total number of blocks in the bucket. Includes blocks marked for deletion, but not partial blocks.",
}, []string{"user"}),
tenantMarkedBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
tenantBlocksMarkedForDelete: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_bucket_blocks_marked_for_deletion_count",
Help: "Total number of blocks marked for deletion in the bucket.",
}, []string{"user"}),
tenantBlocksMarkedForNoCompaction: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_bucket_blocks_marked_for_no_compaction_count",
Help: "Total number of blocks marked for no compaction in the bucket.",
}, []string{"user"}),
tenantPartialBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_bucket_blocks_partials_count",
Help: "Total number of partial blocks.",
Expand Down Expand Up @@ -168,7 +173,8 @@ func (c *BlocksCleaner) cleanUsers(ctx context.Context, firstRun bool) error {
for _, userID := range c.lastOwnedUsers {
if !isActive[userID] && !isDeleted[userID] {
c.tenantBlocks.DeleteLabelValues(userID)
c.tenantMarkedBlocks.DeleteLabelValues(userID)
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
c.tenantPartialBlocks.DeleteLabelValues(userID)
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)
}
Expand Down Expand Up @@ -231,15 +237,16 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID
// to delete. We also consider them all marked for deletion given the next run will try
// to delete them again.
c.tenantBlocks.WithLabelValues(userID).Set(float64(failed))
c.tenantMarkedBlocks.WithLabelValues(userID).Set(float64(failed))
c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(failed))
c.tenantPartialBlocks.WithLabelValues(userID).Set(0)

return errors.Errorf("failed to delete %d blocks", failed)
}

// Given all blocks have been deleted, we can also remove the metrics.
c.tenantBlocks.DeleteLabelValues(userID)
c.tenantMarkedBlocks.DeleteLabelValues(userID)
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
c.tenantPartialBlocks.DeleteLabelValues(userID)

if deletedBlocks > 0 {
Expand Down Expand Up @@ -330,7 +337,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b

// Generate an updated in-memory version of the bucket index.
w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger)
idx, partials, err := w.UpdateIndex(ctx, idx)
idx, partials, totalBlocksBlocksMarkedForNoCompaction, err := w.UpdateIndex(ctx, idx)
if err != nil {
return err
}
Expand Down Expand Up @@ -367,9 +374,10 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
}

c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks)))
c.tenantMarkedBlocks.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks)))
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials)))
c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks)))
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction))
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials)))

return nil
}
Expand Down
16 changes: 15 additions & 1 deletion pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
user4DebugMetaFile := path.Join("user-4", block.DebugMetas, "meta.json")
require.NoError(t, bucketClient.Upload(context.Background(), user4DebugMetaFile, strings.NewReader("some random content here")))

// No Compact blocks marker
createTSDBBlock(t, bucketClient, "user-5", 10, 30, nil)
block12 := createTSDBBlock(t, bucketClient, "user-5", 30, 50, nil)
createNoCompactionMark(t, bucketClient, "user-5", block12)

// The fixtures have been created. If the bucket client wasn't wrapped to write
// deletion marks to the global location too, then this is the right time to do it.
if options.markersMigrationEnabled {
Expand Down Expand Up @@ -202,17 +207,26 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
# TYPE cortex_bucket_blocks_count gauge
cortex_bucket_blocks_count{user="user-1"} 2
cortex_bucket_blocks_count{user="user-2"} 1
cortex_bucket_blocks_count{user="user-5"} 2
# HELP cortex_bucket_blocks_marked_for_deletion_count Total number of blocks marked for deletion in the bucket.
# TYPE cortex_bucket_blocks_marked_for_deletion_count gauge
cortex_bucket_blocks_marked_for_deletion_count{user="user-1"} 1
cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0
cortex_bucket_blocks_marked_for_deletion_count{user="user-5"} 0
# HELP cortex_bucket_blocks_marked_for_no_compaction_count Total number of blocks marked for no compaction in the bucket.
# TYPE cortex_bucket_blocks_marked_for_no_compaction_count gauge
cortex_bucket_blocks_marked_for_no_compaction_count{user="user-1"} 0
cortex_bucket_blocks_marked_for_no_compaction_count{user="user-2"} 0
cortex_bucket_blocks_marked_for_no_compaction_count{user="user-5"} 1
# HELP cortex_bucket_blocks_partials_count Total number of partial blocks.
# TYPE cortex_bucket_blocks_partials_count gauge
cortex_bucket_blocks_partials_count{user="user-1"} 2
cortex_bucket_blocks_partials_count{user="user-2"} 0
cortex_bucket_blocks_partials_count{user="user-5"} 0
`),
"cortex_bucket_blocks_count",
"cortex_bucket_blocks_marked_for_deletion_count",
"cortex_bucket_blocks_marked_for_no_compaction_count",
"cortex_bucket_blocks_partials_count",
))
}
Expand Down Expand Up @@ -421,7 +435,7 @@ func TestBlocksCleaner_ListBlocksOutsideRetentionPeriod(t *testing.T) {
id3 := createTSDBBlock(t, bucketClient, "user-1", 7000, 8000, nil)

w := bucketindex.NewUpdater(bucketClient, "user-1", nil, logger)
idx, _, err := w.UpdateIndex(ctx, nil)
idx, _, _, err := w.UpdateIndex(ctx, nil)
require.NoError(t, err)

assert.ElementsMatch(t, []ulid.ULID{id1, id2, id3}, idx.Blocks.GetULIDs())
Expand Down
8 changes: 8 additions & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1378,6 +1378,14 @@ func createDeletionMark(t *testing.T, bkt objstore.Bucket, userID string, blockI
require.NoError(t, bkt.Upload(context.Background(), markPath, strings.NewReader(content)))
}

func createNoCompactionMark(t *testing.T, bkt objstore.Bucket, userID string, blockID ulid.ULID) {
content := mockNoCompactBlockJSON(blockID.String())
blockPath := path.Join(userID, blockID.String())
markPath := path.Join(blockPath, metadata.NoCompactMarkFilename)

require.NoError(t, bkt.Upload(context.Background(), markPath, strings.NewReader(content)))
}

func findCompactorByUserID(compactors []*Compactor, logs []*concurrency.SyncBuffer, userID string) (*Compactor, *concurrency.SyncBuffer, error) {
var compactor *Compactor
var log *concurrency.SyncBuffer
Expand Down
63 changes: 48 additions & 15 deletions pkg/storage/tsdb/bucketindex/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,25 @@ const (
MarkersPathname = "markers"
)

var (
MarkersMap = map[string]func(ulid.ULID) string{
metadata.DeletionMarkFilename: BlockDeletionMarkFilepath,
metadata.NoCompactMarkFilename: NoCompactMarkFilenameMarkFilepath,
}
)

// BlockDeletionMarkFilepath returns the path, relative to the tenant's bucket location,
// of a block deletion mark in the bucket markers location.
func BlockDeletionMarkFilepath(blockID ulid.ULID) string {
return fmt.Sprintf("%s/%s-%s", MarkersPathname, blockID.String(), metadata.DeletionMarkFilename)
}

// NoCompactMarkFilenameMarkFilepath returns the path, relative to the tenant's bucket location,
// of a block no compact mark in the bucket markers location.
func NoCompactMarkFilenameMarkFilepath(blockID ulid.ULID) string {
return fmt.Sprintf("%s/%s-%s", MarkersPathname, blockID.String(), metadata.NoCompactMarkFilename)
}

// IsBlockDeletionMarkFilename returns whether the input filename matches the expected pattern
// of block deletion markers stored in the markers location.
func IsBlockDeletionMarkFilename(name string) (ulid.ULID, bool) {
Expand All @@ -45,6 +58,24 @@ func IsBlockDeletionMarkFilename(name string) (ulid.ULID, bool) {
return id, err == nil
}

// IsBlockNoCompactMarkFilename returns whether the input filename matches the expected pattern
// of block no compact markers stored in the markers location.
func IsBlockNoCompactMarkFilename(name string) (ulid.ULID, bool) {
parts := strings.SplitN(name, "-", 2)
if len(parts) != 2 {
return ulid.ULID{}, false
}

// Ensure the 2nd part matches the block deletion mark filename.
if parts[1] != metadata.NoCompactMarkFilename {
return ulid.ULID{}, false
}

// Ensure the 1st part is a valid block ID.
id, err := ulid.Parse(filepath.Base(parts[0]))
return id, err == nil
}

// MigrateBlockDeletionMarksToGlobalLocation list all tenant's blocks and, for each of them, look for
// a deletion mark in the block location. Found deletion marks are copied to the global markers location.
// The migration continues on error and returns once all blocks have been checked.
Expand All @@ -67,22 +98,24 @@ func MigrateBlockDeletionMarksToGlobalLocation(ctx context.Context, bkt objstore
errs := tsdb_errors.NewMulti()

for _, blockID := range blocks {
// Look up the deletion mark (if any).
reader, err := userBucket.Get(ctx, path.Join(blockID.String(), metadata.DeletionMarkFilename))
if userBucket.IsObjNotFoundErr(err) {
continue
} else if err != nil {
errs.Add(err)
continue
}
for mark, globalFilePath := range MarkersMap {
// Look up mark (if any).
reader, err := userBucket.Get(ctx, path.Join(blockID.String(), mark))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who is mark?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark can be now both, the deletion and no-compact mark.

if userBucket.IsObjNotFoundErr(err) {
continue
} else if err != nil {
errs.Add(err)
continue
}

// Upload it to the global markers location.
uploadErr := userBucket.Upload(ctx, BlockDeletionMarkFilepath(blockID), reader)
if closeErr := reader.Close(); closeErr != nil {
errs.Add(closeErr)
}
if uploadErr != nil {
errs.Add(uploadErr)
// Upload it to the global markers location.
uploadErr := userBucket.Upload(ctx, globalFilePath(blockID), reader)
if closeErr := reader.Close(); closeErr != nil {
errs.Add(closeErr)
}
if uploadErr != nil {
errs.Add(uploadErr)
}
}
}

Expand Down
29 changes: 17 additions & 12 deletions pkg/storage/tsdb/bucketindex/markers_bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (
"io/ioutil"
"path"

"github.com/oklog/ulid"
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/objstore"
)

Expand All @@ -29,7 +27,7 @@ func BucketWithGlobalMarkers(b objstore.Bucket) objstore.Bucket {

// Upload implements objstore.Bucket.
func (b *globalMarkersBucket) Upload(ctx context.Context, name string, r io.Reader) error {
blockID, ok := b.isBlockDeletionMark(name)
globalMarkPath, ok := b.isMark(name)
if !ok {
return b.parent.Upload(ctx, name, r)
}
Expand All @@ -46,7 +44,6 @@ func (b *globalMarkersBucket) Upload(ctx context.Context, name string, r io.Read
}

// Upload it to the global markers location too.
globalMarkPath := path.Clean(path.Join(path.Dir(name), "../", BlockDeletionMarkFilepath(blockID)))
return b.parent.Upload(ctx, globalMarkPath, bytes.NewBuffer(body))
}

Expand All @@ -58,8 +55,7 @@ func (b *globalMarkersBucket) Delete(ctx context.Context, name string) error {
}

// Delete the marker in the global markers location too.
if blockID, ok := b.isBlockDeletionMark(name); ok {
globalMarkPath := path.Clean(path.Join(path.Dir(name), "../", BlockDeletionMarkFilepath(blockID)))
if globalMarkPath, ok := b.isMark(name); ok {
if err := b.parent.Delete(ctx, globalMarkPath); err != nil {
if !b.parent.IsObjNotFoundErr(err) {
return err
Expand Down Expand Up @@ -128,12 +124,21 @@ func (b *globalMarkersBucket) ReaderWithExpectedErrs(fn objstore.IsOpFailureExpe
return b
}

func (b *globalMarkersBucket) isBlockDeletionMark(name string) (ulid.ULID, bool) {
if path.Base(name) != metadata.DeletionMarkFilename {
return ulid.ULID{}, false
func (b *globalMarkersBucket) isMark(name string) (string, bool) {

for mark, globalFilePath := range MarkersMap {
if path.Base(name) == mark {
// Parse the block ID in the path. If there's not block ID, then it's not the per-block
// deletion mark.
id, ok := block.IsBlockDir(path.Dir(name))

if ok {
return path.Clean(path.Join(path.Dir(name), "../", globalFilePath(id))), ok
}

return "", ok
}
}

// Parse the block ID in the path. If there's not block ID, then it's not the per-block
// deletion mark.
return block.IsBlockDir(path.Dir(name))
return "", false
}
Loading