Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/storage/parquet/converter_marker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func ReadConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.Instr

return &ConverterMark{}, err
}
defer runutil.CloseWithLogOnErr(logger, reader, "close bucket index reader")
defer runutil.CloseWithLogOnErr(logger, reader, "close parquet converter marker file reader")

metaContent, err := io.ReadAll(reader)
if err != nil {
Expand All @@ -59,3 +59,8 @@ func WriteConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.Buck
}
return userBkt.Upload(ctx, markerPath, bytes.NewReader(b))
}

// ConverterMarkMeta is used in Bucket Index. It might not be the same as ConverterMark.
type ConverterMarkMeta struct {
Version int `json:"version"`
}
9 changes: 7 additions & 2 deletions pkg/storage/tsdb/bucketindex/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bucketindex
import (
"fmt"
"path/filepath"
"slices"
"strings"
"time"

Expand All @@ -11,6 +12,7 @@ import (
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/pkg/storage/parquet"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util"
)
Expand Down Expand Up @@ -51,14 +53,14 @@ func (idx *Index) GetUpdatedAt() time.Time {
func (idx *Index) RemoveBlock(id ulid.ULID) {
for i := 0; i < len(idx.Blocks); i++ {
if idx.Blocks[i].ID == id {
idx.Blocks = append(idx.Blocks[:i], idx.Blocks[i+1:]...)
idx.Blocks = slices.Delete(idx.Blocks, i, i+1)
break
}
}

for i := 0; i < len(idx.BlockDeletionMarks); i++ {
if idx.BlockDeletionMarks[i].ID == id {
idx.BlockDeletionMarks = append(idx.BlockDeletionMarks[:i], idx.BlockDeletionMarks[i+1:]...)
idx.BlockDeletionMarks = slices.Delete(idx.BlockDeletionMarks, i, i+1)
break
}
}
Expand Down Expand Up @@ -91,6 +93,9 @@ type Block struct {
// UploadedAt is a unix timestamp (seconds precision) of when the block has been completed to be uploaded
// to the storage.
UploadedAt int64 `json:"uploaded_at"`

// Parquet metadata if exists. If doesn't exist it will be nil.
Parquet *parquet.ConverterMarkMeta `json:"parquet,omitempty"`
}

// Within returns whether the block contains samples within the provided range.
Expand Down
18 changes: 18 additions & 0 deletions pkg/storage/tsdb/bucketindex/markers.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,24 @@ func IsBlockNoCompactMarkFilename(name string) (ulid.ULID, bool) {
return id, err == nil
}

// IsBlockParquetConverterMarkFilename returns whether the input filename matches the expected pattern
// of block parquet converter markers stored in the markers location.
func IsBlockParquetConverterMarkFilename(name string) (ulid.ULID, bool) {
parts := strings.SplitN(name, "-", 2)
if len(parts) != 2 {
return ulid.ULID{}, false
}

// Ensure the 2nd part matches the parquet converter mark filename.
if parts[1] != parquet.ConverterMarkerFileName {
return ulid.ULID{}, false
}

// Ensure the 1st part is a valid block ID.
id, err := ulid.Parse(filepath.Base(parts[0]))
return id, err == nil
}

// MigrateBlockDeletionMarksToGlobalLocation list all tenant's blocks and, for each of them, look for
// a deletion mark in the block location. Found deletion marks are copied to the global markers location.
// The migration continues on error and returns once all blocks have been checked.
Expand Down
67 changes: 63 additions & 4 deletions pkg/storage/tsdb/bucketindex/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/pkg/storage/parquet"
"github.com/cortexproject/cortex/pkg/storage/tsdb"

"github.com/cortexproject/cortex/pkg/storage/bucket"
Expand All @@ -33,8 +34,9 @@ var (

// Updater is responsible to generate an update in-memory bucket index.
type Updater struct {
bkt objstore.InstrumentedBucket
logger log.Logger
bkt objstore.InstrumentedBucket
logger log.Logger
parquetEnabled bool
}

func NewUpdater(bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider, logger log.Logger) *Updater {
Expand All @@ -44,11 +46,18 @@ func NewUpdater(bkt objstore.Bucket, userID string, cfgProvider bucket.TenantCon
}
}

func (w *Updater) EnableParquet() *Updater {
w.parquetEnabled = true
return w
}

// UpdateIndex generates the bucket index and returns it, without storing it to the storage.
// If the old index is not passed in input, then the bucket index will be generated from scratch.
func (w *Updater) UpdateIndex(ctx context.Context, old *Index) (*Index, map[ulid.ULID]error, int64, error) {
var oldBlocks []*Block
var oldBlockDeletionMarks []*BlockDeletionMark
var (
oldBlocks []*Block
oldBlockDeletionMarks []*BlockDeletionMark
)

// Read the old index, if provided.
if old != nil {
Expand All @@ -65,6 +74,11 @@ func (w *Updater) UpdateIndex(ctx context.Context, old *Index) (*Index, map[ulid
if err != nil {
return nil, nil, 0, err
}
if w.parquetEnabled {
if err := w.updateParquetBlocks(ctx, blocks); err != nil {
return nil, nil, 0, err
}
}

return &Index{
Version: IndexVersion1,
Expand Down Expand Up @@ -180,6 +194,23 @@ func (w *Updater) updateBlockIndexEntry(ctx context.Context, id ulid.ULID) (*Blo
return block, nil
}

func (w *Updater) updateParquetBlockIndexEntry(ctx context.Context, id ulid.ULID, block *Block) error {
marker, err := parquet.ReadConverterMark(ctx, id, w.bkt, w.logger)
if err != nil {
return errors.Wrapf(err, "read parquet converter marker file: %v", path.Join(id.String(), parquet.ConverterMarkerFileName))
}
// Could be not found or access denied.
// Just treat it as no parquet block available.
if marker == nil || marker.Version == 0 {
return nil
}

block.Parquet = &parquet.ConverterMarkMeta{
Version: marker.Version,
}
return nil
}

func (w *Updater) updateBlockMarks(ctx context.Context, old []*BlockDeletionMark) ([]*BlockDeletionMark, map[ulid.ULID]struct{}, int64, error) {
out := make([]*BlockDeletionMark, 0, len(old))
deletedBlocks := map[ulid.ULID]struct{}{}
Expand Down Expand Up @@ -249,3 +280,31 @@ func (w *Updater) updateBlockDeletionMarkIndexEntry(ctx context.Context, id ulid

return BlockDeletionMarkFromThanosMarker(&m), nil
}

func (w *Updater) updateParquetBlocks(ctx context.Context, blocks []*Block) error {
discoveredParquetBlocks := map[ulid.ULID]struct{}{}

// Find all parquet markers in the storage.
if err := w.bkt.Iter(ctx, parquet.ConverterMarkerPrefix+"/", func(name string) error {
if blockID, ok := IsBlockParquetConverterMarkFilename(path.Base(name)); ok {
discoveredParquetBlocks[blockID] = struct{}{}
}

return nil
}); err != nil {
return errors.Wrap(err, "list block parquet converter marks")
}

// Check if parquet mark has been uploaded or deleted for the block.
for _, m := range blocks {
if _, ok := discoveredParquetBlocks[m.ID]; ok {
if err := w.updateParquetBlockIndexEntry(ctx, m.ID, m); err != nil {
return err
}
} else if m.Parquet != nil {
// Converter marker removed. Reset parquet field.
m.Parquet = nil
}
}
return nil
}
179 changes: 179 additions & 0 deletions pkg/storage/tsdb/bucketindex/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package bucketindex
import (
"bytes"
"context"
"encoding/json"
"path"
"strings"
"testing"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/thanos-io/thanos/pkg/block/metadata"

"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/parquet"
"github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
)

Expand Down Expand Up @@ -301,6 +303,150 @@ func TestUpdater_UpdateIndex_NoTenantInTheBucket(t *testing.T) {
}
}

func TestUpdater_UpdateIndex_WithParquet(t *testing.T) {
const userID = "user-1"

bkt, _ := testutil.PrepareFilesystemBucket(t)

ctx := context.Background()
logger := log.NewNopLogger()

// Generate the initial index.
bkt = BucketWithGlobalMarkers(bkt)
block1 := testutil.MockStorageBlock(t, bkt, userID, 10, 20)
block2 := testutil.MockStorageBlock(t, bkt, userID, 20, 30)
block2Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block2)
// Add parquet marker to block 1.
block1ParquetMark := testutil.MockStorageParquetConverterMark(t, bkt, userID, block1)

w := NewUpdater(bkt, userID, nil, logger).EnableParquet()
returnedIdx, _, _, err := w.UpdateIndex(ctx, nil)
require.NoError(t, err)
assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID,
[]tsdb.BlockMeta{block1, block2},
[]*metadata.DeletionMark{block2Mark}, map[string]*parquet.ConverterMarkMeta{
block1.ULID.String(): {Version: block1ParquetMark.Version},
})

// Create new blocks, and update the index.
block3 := testutil.MockStorageBlock(t, bkt, userID, 30, 40)
block4 := testutil.MockStorageBlock(t, bkt, userID, 40, 50)
block4Mark := testutil.MockStorageDeletionMark(t, bkt, userID, block4)

returnedIdx, _, _, err = w.UpdateIndex(ctx, returnedIdx)
require.NoError(t, err)
assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID,
[]tsdb.BlockMeta{block1, block2, block3, block4},
[]*metadata.DeletionMark{block2Mark, block4Mark},
map[string]*parquet.ConverterMarkMeta{
block1.ULID.String(): {Version: block1ParquetMark.Version},
})

// Hard delete a block and update the index.
require.NoError(t, block.Delete(ctx, log.NewNopLogger(), bucket.NewUserBucketClient(userID, bkt, nil), block2.ULID))

returnedIdx, _, _, err = w.UpdateIndex(ctx, returnedIdx)
require.NoError(t, err)
assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID,
[]tsdb.BlockMeta{block1, block3, block4},
[]*metadata.DeletionMark{block4Mark}, map[string]*parquet.ConverterMarkMeta{
block1.ULID.String(): {Version: block1ParquetMark.Version},
})

// Upload parquet marker to an old block and update index
block3ParquetMark := testutil.MockStorageParquetConverterMark(t, bkt, userID, block3)
returnedIdx, _, _, err = w.UpdateIndex(ctx, returnedIdx)
require.NoError(t, err)
assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID,
[]tsdb.BlockMeta{block1, block3, block4},
[]*metadata.DeletionMark{block4Mark}, map[string]*parquet.ConverterMarkMeta{
block1.ULID.String(): {Version: block1ParquetMark.Version},
block3.ULID.String(): {Version: block3ParquetMark.Version},
})
}

func TestUpdater_UpdateParquetBlockIndexEntry(t *testing.T) {
const userID = "user-1"
ctx := context.Background()
logger := log.NewNopLogger()

tests := []struct {
name string
setupBucket func(t *testing.T, bkt objstore.InstrumentedBucket, blockID ulid.ULID) objstore.InstrumentedBucket
expectedError error
expectParquet bool
expectParquetMeta *parquet.ConverterMarkMeta
}{
{
name: "should successfully read parquet marker",
setupBucket: func(t *testing.T, bkt objstore.InstrumentedBucket, blockID ulid.ULID) objstore.InstrumentedBucket {
parquetMark := parquet.ConverterMarkMeta{
Version: 1,
}
data, err := json.Marshal(parquetMark)
require.NoError(t, err)
require.NoError(t, bkt.Upload(ctx, path.Join(userID, blockID.String(), parquet.ConverterMarkerFileName), bytes.NewReader(data)))
return bkt
},
expectedError: nil,
expectParquet: true,
expectParquetMeta: &parquet.ConverterMarkMeta{Version: 1},
},
{
name: "should handle missing parquet marker",
setupBucket: func(t *testing.T, bkt objstore.InstrumentedBucket, blockID ulid.ULID) objstore.InstrumentedBucket {
// Don't upload any parquet marker
return bkt
},
expectedError: nil,
expectParquet: false,
},
{
name: "should handle access denied",
setupBucket: func(t *testing.T, bkt objstore.InstrumentedBucket, blockID ulid.ULID) objstore.InstrumentedBucket {
return &testutil.MockBucketFailure{
Bucket: bkt,
GetFailures: map[string]error{
path.Join(userID, blockID.String(), parquet.ConverterMarkerFileName): testutil.ErrKeyAccessDeniedError,
},
}
},
expectedError: nil,
expectParquet: false,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
bkt, _ := testutil.PrepareFilesystemBucket(t)
blockID := ulid.MustNew(1, nil)
block := &Block{ID: blockID}

// Setup the bucket with test data
bkt = tc.setupBucket(t, bkt, blockID)

// Create an instrumented bucket wrapper
registry := prometheus.NewRegistry()
ibkt := objstore.WrapWithMetrics(bkt, prometheus.WrapRegistererWithPrefix("thanos_", registry), "test-bucket")
w := NewUpdater(ibkt, userID, nil, logger)

err := w.updateParquetBlockIndexEntry(ctx, blockID, block)
if tc.expectedError != nil {
assert.True(t, errors.Is(err, tc.expectedError))
} else {
assert.NoError(t, err)
}

if tc.expectParquet {
assert.NotNil(t, block.Parquet)
assert.Equal(t, tc.expectParquetMeta, block.Parquet)
} else {
assert.Nil(t, block.Parquet)
}
})
}
}

func getBlockUploadedAt(t testing.TB, bkt objstore.Bucket, userID string, blockID ulid.ULID) int64 {
metaFile := path.Join(userID, blockID.String(), block.MetaFilename)

Expand Down Expand Up @@ -338,3 +484,36 @@ func assertBucketIndexEqual(t testing.TB, idx *Index, bkt objstore.Bucket, userI

assert.ElementsMatch(t, expectedMarkEntries, idx.BlockDeletionMarks)
}

func assertBucketIndexEqualWithParquet(t testing.TB, idx *Index, bkt objstore.Bucket, userID string, expectedBlocks []tsdb.BlockMeta, expectedDeletionMarks []*metadata.DeletionMark, parquetBlocks map[string]*parquet.ConverterMarkMeta) {
assert.Equal(t, IndexVersion1, idx.Version)
assert.InDelta(t, time.Now().Unix(), idx.UpdatedAt, 2)

// Build the list of expected block index entries.
var expectedBlockEntries []*Block
for _, b := range expectedBlocks {
block := &Block{
ID: b.ULID,
MinTime: b.MinTime,
MaxTime: b.MaxTime,
UploadedAt: getBlockUploadedAt(t, bkt, userID, b.ULID),
}
if meta, ok := parquetBlocks[b.ULID.String()]; ok {
block.Parquet = meta
}
expectedBlockEntries = append(expectedBlockEntries, block)
}

assert.ElementsMatch(t, expectedBlockEntries, idx.Blocks)

// Build the list of expected block deletion mark index entries.
var expectedMarkEntries []*BlockDeletionMark
for _, m := range expectedDeletionMarks {
expectedMarkEntries = append(expectedMarkEntries, &BlockDeletionMark{
ID: m.ID,
DeletionTime: m.DeletionTime,
})
}

assert.ElementsMatch(t, expectedMarkEntries, idx.BlockDeletionMarks)
}
Loading
Loading