Skip to content

Commit 0b067df

Browse files
committed
switch to use global marker path for parquet converter mark
Signed-off-by: yeya24 <[email protected]>
1 parent 81c7de5 commit 0b067df

File tree

7 files changed

+74
-40
lines changed

7 files changed

+74
-40
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package parquet
2+
3+
const ConverterMakerFileName = "parquet-converter-mark.json"
4+
5+
type ParquetMeta struct {
6+
Version int `json:"version"`
7+
}

pkg/storage/tsdb/bucketindex/index.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/thanos-io/thanos/pkg/block"
1313
"github.com/thanos-io/thanos/pkg/block/metadata"
1414

15+
"github.com/cortexproject/cortex/pkg/storage/parquet"
1516
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
1617
"github.com/cortexproject/cortex/pkg/util"
1718
)
@@ -94,7 +95,7 @@ type Block struct {
9495
UploadedAt int64 `json:"uploaded_at"`
9596

9697
// Parquet metadata if exists. If doesn't exist it will be nil.
97-
Parquet *cortex_tsdb.ParquetMeta `json:"parquet,omitempty"`
98+
Parquet *parquet.ParquetMeta `json:"parquet,omitempty"`
9899
}
99100

100101
// Within returns whether the block contains samples within the provided range.

pkg/storage/tsdb/bucketindex/markers.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/thanos-io/thanos/pkg/block/metadata"
1616

1717
"github.com/cortexproject/cortex/pkg/storage/bucket"
18+
"github.com/cortexproject/cortex/pkg/storage/parquet"
1819
)
1920

2021
const (
@@ -76,6 +77,24 @@ func IsBlockNoCompactMarkFilename(name string) (ulid.ULID, bool) {
7677
return id, err == nil
7778
}
7879

80+
// IsBlockParquetConverterMarkFilename returns whether the input filename matches the expected pattern
81+
// of block parquet conveter markers stored in the markers location.
82+
func IsBlockParquetConverterMarkFilename(name string) (ulid.ULID, bool) {
83+
parts := strings.SplitN(name, "-", 2)
84+
if len(parts) != 2 {
85+
return ulid.ULID{}, false
86+
}
87+
88+
// Ensure the 2nd part matches the parquet conveter mark filename.
89+
if parts[1] != parquet.ConverterMakerFileName {
90+
return ulid.ULID{}, false
91+
}
92+
93+
// Ensure the 1st part is a valid block ID.
94+
id, err := ulid.Parse(filepath.Base(parts[0]))
95+
return id, err == nil
96+
}
97+
7998
// MigrateBlockDeletionMarksToGlobalLocation list all tenant's blocks and, for each of them, look for
8099
// a deletion mark in the block location. Found deletion marks are copied to the global markers location.
81100
// The migration continues on error and returns once all blocks have been checked.

pkg/storage/tsdb/bucketindex/updater.go

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/thanos-io/thanos/pkg/block"
1616
"github.com/thanos-io/thanos/pkg/block/metadata"
1717

18+
"github.com/cortexproject/cortex/pkg/storage/parquet"
1819
"github.com/cortexproject/cortex/pkg/storage/tsdb"
1920

2021
"github.com/cortexproject/cortex/pkg/storage/bucket"
@@ -65,12 +66,12 @@ func (w *Updater) UpdateIndex(ctx context.Context, old *Index) (*Index, map[ulid
6566
oldBlockDeletionMarks = old.BlockDeletionMarks
6667
}
6768

68-
blockDeletionMarks, deletedBlocks, totalBlocksBlocksMarkedForNoCompaction, err := w.updateBlockMarks(ctx, oldBlockDeletionMarks)
69+
blockDeletionMarks, deletedBlocks, totalBlocksBlocksMarkedForNoCompaction, discoveredParquetBlocks, err := w.updateBlockMarks(ctx, oldBlockDeletionMarks)
6970
if err != nil {
7071
return nil, nil, 0, err
7172
}
7273

73-
blocks, partials, err := w.updateBlocks(ctx, oldBlocks, deletedBlocks)
74+
blocks, partials, err := w.updateBlocks(ctx, oldBlocks, deletedBlocks, discoveredParquetBlocks)
7475
if err != nil {
7576
return nil, nil, 0, err
7677
}
@@ -83,7 +84,7 @@ func (w *Updater) UpdateIndex(ctx context.Context, old *Index) (*Index, map[ulid
8384
}, partials, totalBlocksBlocksMarkedForNoCompaction, nil
8485
}
8586

86-
func (w *Updater) updateBlocks(ctx context.Context, old []*Block, deletedBlocks map[ulid.ULID]struct{}) (blocks []*Block, partials map[ulid.ULID]error, _ error) {
87+
func (w *Updater) updateBlocks(ctx context.Context, old []*Block, deletedBlocks map[ulid.ULID]struct{}, discoveredParquetBlocks map[ulid.ULID]struct{}) (blocks []*Block, partials map[ulid.ULID]error, _ error) {
8788
discovered := map[ulid.ULID]struct{}{}
8889
partials = map[ulid.ULID]error{}
8990

@@ -107,12 +108,14 @@ func (w *Updater) updateBlocks(ctx context.Context, old []*Block, deletedBlocks
107108
level.Warn(w.logger).Log("msg", "skipped block with missing global deletion marker", "block", b.ID.String())
108109
continue
109110
}
110-
// Check if parquet mark has been uploaded for the old block.
111-
// TODO: this should be optimized to have all parquet marker in a global path.
112-
// We assume parquet marker cannot be removed from a block if it exists before.
113-
if w.parquetEnabled && b.Parquet == nil {
114-
if err := w.updateParquetBlockIndexEntry(ctx, b.ID, b); err != nil {
115-
return nil, nil, err
111+
// Check if parquet mark has been uploaded or deleted for the old block.
112+
if w.parquetEnabled {
113+
if _, ok := discoveredParquetBlocks[b.ID]; ok {
114+
if err := w.updateParquetBlockIndexEntry(ctx, b.ID, b); err != nil {
115+
return nil, nil, err
116+
}
117+
} else if b.Parquet != nil {
118+
b.Parquet = nil
116119
}
117120
}
118121
blocks = append(blocks, b)
@@ -126,7 +129,11 @@ func (w *Updater) updateBlocks(ctx context.Context, old []*Block, deletedBlocks
126129
b, err := w.updateBlockIndexEntry(ctx, id)
127130
if err == nil {
128131
if w.parquetEnabled {
129-
err = w.updateParquetBlockIndexEntry(ctx, id, b)
132+
if _, ok := discoveredParquetBlocks[b.ID]; ok {
133+
if err := w.updateParquetBlockIndexEntry(ctx, b.ID, b); err != nil {
134+
return nil, nil, err
135+
}
136+
}
130137
}
131138
if err == nil {
132139
blocks = append(blocks, b)
@@ -203,7 +210,7 @@ func (w *Updater) updateBlockIndexEntry(ctx context.Context, id ulid.ULID) (*Blo
203210
}
204211

205212
func (w *Updater) updateParquetBlockIndexEntry(ctx context.Context, id ulid.ULID, block *Block) error {
206-
parquetMarkFile := path.Join(id.String(), tsdb.ParquetConverterMakerFileName)
213+
parquetMarkFile := path.Join(id.String(), parquet.ConverterMakerFileName)
207214

208215
// Get the block's parquet marker file.
209216
r, err := w.bkt.ReaderWithExpectedErrs(tsdb.IsOneOfTheExpectedErrors(w.bkt.IsObjNotFoundErr, w.bkt.IsAccessDeniedErr)).Get(ctx, parquetMarkFile)
@@ -223,18 +230,19 @@ func (w *Updater) updateParquetBlockIndexEntry(ctx context.Context, id ulid.ULID
223230
return errors.Wrapf(err, "read parquet converter marker file: %v", parquetMarkFile)
224231
}
225232

226-
m := tsdb.ParquetMeta{}
233+
m := parquet.ParquetMeta{}
227234
if err := json.Unmarshal(markContent, &m); err != nil {
228235
return errors.Wrapf(ErrBlockParquetMarkCorrupted, "unmarshal parquet converter marker file %s: %v", parquetMarkFile, err)
229236
}
230237
block.Parquet = &m
231238
return nil
232239
}
233240

234-
func (w *Updater) updateBlockMarks(ctx context.Context, old []*BlockDeletionMark) ([]*BlockDeletionMark, map[ulid.ULID]struct{}, int64, error) {
241+
func (w *Updater) updateBlockMarks(ctx context.Context, old []*BlockDeletionMark) ([]*BlockDeletionMark, map[ulid.ULID]struct{}, int64, map[ulid.ULID]struct{}, error) {
235242
out := make([]*BlockDeletionMark, 0, len(old))
236243
deletedBlocks := map[ulid.ULID]struct{}{}
237244
discovered := map[ulid.ULID]struct{}{}
245+
discoveredParquetBlocks := map[ulid.ULID]struct{}{}
238246
totalBlocksBlocksMarkedForNoCompaction := int64(0)
239247

240248
// Find all markers in the storage.
@@ -247,10 +255,16 @@ func (w *Updater) updateBlockMarks(ctx context.Context, old []*BlockDeletionMark
247255
totalBlocksBlocksMarkedForNoCompaction++
248256
}
249257

258+
if w.parquetEnabled {
259+
if blockID, ok := IsBlockParquetConverterMarkFilename(path.Base(name)); ok {
260+
discoveredParquetBlocks[blockID] = struct{}{}
261+
}
262+
}
263+
250264
return nil
251265
})
252266
if err != nil {
253-
return nil, nil, totalBlocksBlocksMarkedForNoCompaction, errors.Wrap(err, "list block deletion marks")
267+
return nil, nil, totalBlocksBlocksMarkedForNoCompaction, discoveredParquetBlocks, errors.Wrap(err, "list block deletion marks")
254268
}
255269

256270
// Since deletion marks are immutable, all markers already existing in the index can just be copied.
@@ -276,13 +290,13 @@ func (w *Updater) updateBlockMarks(ctx context.Context, old []*BlockDeletionMark
276290
continue
277291
}
278292
if err != nil {
279-
return nil, nil, totalBlocksBlocksMarkedForNoCompaction, err
293+
return nil, nil, totalBlocksBlocksMarkedForNoCompaction, discoveredParquetBlocks, err
280294
}
281295

282296
out = append(out, m)
283297
}
284298

285-
return out, deletedBlocks, totalBlocksBlocksMarkedForNoCompaction, nil
299+
return out, deletedBlocks, totalBlocksBlocksMarkedForNoCompaction, discoveredParquetBlocks, nil
286300
}
287301

288302
func (w *Updater) updateBlockDeletionMarkIndexEntry(ctx context.Context, id ulid.ULID) (*BlockDeletionMark, error) {

pkg/storage/tsdb/bucketindex/updater_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import (
2222
"github.com/thanos-io/thanos/pkg/block/metadata"
2323

2424
"github.com/cortexproject/cortex/pkg/storage/bucket"
25-
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
25+
"github.com/cortexproject/cortex/pkg/storage/parquet"
2626
"github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
2727
)
2828

@@ -324,7 +324,7 @@ func TestUpdater_UpdateIndex_WithParquet(t *testing.T) {
324324
require.NoError(t, err)
325325
assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID,
326326
[]tsdb.BlockMeta{block1, block2},
327-
[]*metadata.DeletionMark{block2Mark}, map[string]*cortex_tsdb.ParquetMeta{
327+
[]*metadata.DeletionMark{block2Mark}, map[string]*parquet.ParquetMeta{
328328
block1.ULID.String(): block1ParquetMark,
329329
})
330330

@@ -338,7 +338,7 @@ func TestUpdater_UpdateIndex_WithParquet(t *testing.T) {
338338
assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID,
339339
[]tsdb.BlockMeta{block1, block2, block3, block4},
340340
[]*metadata.DeletionMark{block2Mark, block4Mark},
341-
map[string]*cortex_tsdb.ParquetMeta{
341+
map[string]*parquet.ParquetMeta{
342342
block1.ULID.String(): block1ParquetMark,
343343
})
344344

@@ -349,7 +349,7 @@ func TestUpdater_UpdateIndex_WithParquet(t *testing.T) {
349349
require.NoError(t, err)
350350
assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID,
351351
[]tsdb.BlockMeta{block1, block3, block4},
352-
[]*metadata.DeletionMark{block4Mark}, map[string]*cortex_tsdb.ParquetMeta{
352+
[]*metadata.DeletionMark{block4Mark}, map[string]*parquet.ParquetMeta{
353353
block1.ULID.String(): block1ParquetMark,
354354
})
355355

@@ -359,7 +359,7 @@ func TestUpdater_UpdateIndex_WithParquet(t *testing.T) {
359359
require.NoError(t, err)
360360
assertBucketIndexEqualWithParquet(t, returnedIdx, bkt, userID,
361361
[]tsdb.BlockMeta{block1, block3, block4},
362-
[]*metadata.DeletionMark{block4Mark}, map[string]*cortex_tsdb.ParquetMeta{
362+
[]*metadata.DeletionMark{block4Mark}, map[string]*parquet.ParquetMeta{
363363
block1.ULID.String(): block1ParquetMark,
364364
block3.ULID.String(): block3ParquetMark,
365365
})
@@ -375,22 +375,22 @@ func TestUpdater_UpdateParquetBlockIndexEntry(t *testing.T) {
375375
setupBucket func(t *testing.T, bkt objstore.InstrumentedBucket, blockID ulid.ULID) objstore.InstrumentedBucket
376376
expectedError error
377377
expectParquet bool
378-
expectParquetMeta *cortex_tsdb.ParquetMeta
378+
expectParquetMeta *parquet.ParquetMeta
379379
}{
380380
{
381381
name: "should successfully read parquet marker",
382382
setupBucket: func(t *testing.T, bkt objstore.InstrumentedBucket, blockID ulid.ULID) objstore.InstrumentedBucket {
383-
parquetMark := cortex_tsdb.ParquetMeta{
383+
parquetMark := parquet.ParquetMeta{
384384
Version: 1,
385385
}
386386
data, err := json.Marshal(parquetMark)
387387
require.NoError(t, err)
388-
require.NoError(t, bkt.Upload(ctx, path.Join(userID, blockID.String(), cortex_tsdb.ParquetConverterMakerFileName), bytes.NewReader(data)))
388+
require.NoError(t, bkt.Upload(ctx, path.Join(userID, blockID.String(), parquet.ConverterMakerFileName), bytes.NewReader(data)))
389389
return bkt
390390
},
391391
expectedError: nil,
392392
expectParquet: true,
393-
expectParquetMeta: &cortex_tsdb.ParquetMeta{Version: 1},
393+
expectParquetMeta: &parquet.ParquetMeta{Version: 1},
394394
},
395395
{
396396
name: "should handle missing parquet marker",
@@ -407,7 +407,7 @@ func TestUpdater_UpdateParquetBlockIndexEntry(t *testing.T) {
407407
return &testutil.MockBucketFailure{
408408
Bucket: bkt,
409409
GetFailures: map[string]error{
410-
path.Join(userID, blockID.String(), cortex_tsdb.ParquetConverterMakerFileName): testutil.ErrKeyAccessDeniedError,
410+
path.Join(userID, blockID.String(), parquet.ConverterMakerFileName): testutil.ErrKeyAccessDeniedError,
411411
},
412412
}
413413
},
@@ -417,7 +417,7 @@ func TestUpdater_UpdateParquetBlockIndexEntry(t *testing.T) {
417417
{
418418
name: "should handle corrupted parquet marker",
419419
setupBucket: func(t *testing.T, bkt objstore.InstrumentedBucket, blockID ulid.ULID) objstore.InstrumentedBucket {
420-
require.NoError(t, bkt.Upload(ctx, path.Join(userID, blockID.String(), cortex_tsdb.ParquetConverterMakerFileName), bytes.NewReader([]byte("invalid json"))))
420+
require.NoError(t, bkt.Upload(ctx, path.Join(userID, blockID.String(), parquet.ConverterMakerFileName), bytes.NewReader([]byte("invalid json"))))
421421
return bkt
422422
},
423423
expectedError: ErrBlockParquetMarkCorrupted,
@@ -494,7 +494,7 @@ func assertBucketIndexEqual(t testing.TB, idx *Index, bkt objstore.Bucket, userI
494494
assert.ElementsMatch(t, expectedMarkEntries, idx.BlockDeletionMarks)
495495
}
496496

497-
func assertBucketIndexEqualWithParquet(t testing.TB, idx *Index, bkt objstore.Bucket, userID string, expectedBlocks []tsdb.BlockMeta, expectedDeletionMarks []*metadata.DeletionMark, parquetBlocks map[string]*cortex_tsdb.ParquetMeta) {
497+
func assertBucketIndexEqualWithParquet(t testing.TB, idx *Index, bkt objstore.Bucket, userID string, expectedBlocks []tsdb.BlockMeta, expectedDeletionMarks []*metadata.DeletionMark, parquetBlocks map[string]*parquet.ParquetMeta) {
498498
assert.Equal(t, IndexVersion1, idx.Version)
499499
assert.InDelta(t, time.Now().Unix(), idx.UpdatedAt, 2)
500500

pkg/storage/tsdb/parquet.go

Lines changed: 0 additions & 7 deletions
This file was deleted.

pkg/storage/tsdb/testutil/block_mock.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515
"github.com/thanos-io/objstore"
1616
"github.com/thanos-io/thanos/pkg/block/metadata"
1717

18-
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
18+
"github.com/cortexproject/cortex/pkg/storage/parquet"
1919
)
2020

2121
func MockStorageBlock(t testing.TB, bucket objstore.Bucket, userID string, minT, maxT int64) tsdb.BlockMeta {
@@ -89,8 +89,8 @@ func MockStorageNonCompactionMark(t testing.TB, bucket objstore.Bucket, userID s
8989
return &mark
9090
}
9191

92-
func MockStorageParquetMark(t testing.TB, bucket objstore.Bucket, userID string, meta tsdb.BlockMeta) *cortex_tsdb.ParquetMeta {
93-
mark := cortex_tsdb.ParquetMeta{
92+
func MockStorageParquetMark(t testing.TB, bucket objstore.Bucket, userID string, meta tsdb.BlockMeta) *parquet.ParquetMeta {
93+
mark := parquet.ParquetMeta{
9494
Version: 1,
9595
}
9696

@@ -100,7 +100,7 @@ func MockStorageParquetMark(t testing.TB, bucket objstore.Bucket, userID string,
100100
}
101101

102102
markContentReader := strings.NewReader(string(markContent))
103-
markPath := fmt.Sprintf("%s/%s/%s", userID, meta.ULID.String(), cortex_tsdb.ParquetConverterMakerFileName)
103+
markPath := fmt.Sprintf("%s/%s/%s", userID, meta.ULID.String(), parquet.ConverterMakerFileName)
104104
require.NoError(t, bucket.Upload(context.Background(), markPath, markContentReader))
105105

106106
return &mark

0 commit comments

Comments
 (0)