34
34
35
35
// Updater is responsible to generate an update in-memory bucket index.
36
36
type Updater struct {
37
- bkt objstore.InstrumentedBucket
38
- logger log.Logger
37
+ bkt objstore.InstrumentedBucket
38
+ logger log.Logger
39
+ parquetEnabled bool
39
40
}
40
41
41
42
func NewUpdater (bkt objstore.Bucket , userID string , cfgProvider bucket.TenantConfigProvider , logger log.Logger ) * Updater {
@@ -45,6 +46,11 @@ func NewUpdater(bkt objstore.Bucket, userID string, cfgProvider bucket.TenantCon
45
46
}
46
47
}
47
48
49
+ func (w * Updater ) EnableParquet () * Updater {
50
+ w .parquetEnabled = true
51
+ return w
52
+ }
53
+
48
54
// UpdateIndex generates the bucket index and returns it, without storing it to the storage.
49
55
// If the old index is not passed in input, then the bucket index will be generated from scratch.
50
56
func (w * Updater ) UpdateIndex (ctx context.Context , old * Index ) (* Index , map [ulid.ULID ]error , int64 , error ) {
@@ -104,7 +110,7 @@ func (w *Updater) updateBlocks(ctx context.Context, old []*Block, deletedBlocks
104
110
// Check if parquet mark has been uploaded for the old block.
105
111
// TODO: this should be optimized to have all parquet marker in a global path.
106
112
// We assume parquet marker cannot be removed from a block if it exists before.
107
- if b .Parquet == nil {
113
+ if w . parquetEnabled && b .Parquet == nil {
108
114
if err := w .updateParquetBlockIndexEntry (ctx , b .ID , b ); err != nil {
109
115
return nil , nil , err
110
116
}
@@ -119,7 +125,9 @@ func (w *Updater) updateBlocks(ctx context.Context, old []*Block, deletedBlocks
119
125
for id := range discovered {
120
126
b , err := w .updateBlockIndexEntry (ctx , id )
121
127
if err == nil {
122
- err = w .updateParquetBlockIndexEntry (ctx , id , b )
128
+ if w .parquetEnabled {
129
+ err = w .updateParquetBlockIndexEntry (ctx , id , b )
130
+ }
123
131
if err == nil {
124
132
blocks = append (blocks , b )
125
133
continue
0 commit comments