Skip to content

Commit a992ef6

Browse files
authored
Added flag to flush chunks to long-term storage even when using WAL. (#2780)
* Added flag to flush chunks to long-term storage even when using WAL. Signed-off-by: Peter Štibraný <[email protected]> * Updated doc. Signed-off-by: Peter Štibraný <[email protected]> * Added CHANGELOG.md entry. Signed-off-by: Peter Štibraný <[email protected]>
1 parent dce0c72 commit a992ef6

File tree

5 files changed

+15
-6
lines changed

5 files changed

+15
-6
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
* `cortex_prometheus_notifications_queue_length`
2222
* `cortex_prometheus_notifications_queue_capacity`
2323
* `cortex_prometheus_notifications_alertmanagers_discovered`
24+
* [ENHANCEMENT] Added `-ingester.flush-on-shutdown-with-wal-enabled` option to enable chunks flushing even when WAL is enabled. #2780
2425
* [BUGFIX] Fixed a bug in the index intersect code causing storage to return more chunks/series than required. #2796
2526
* [BUGFIX] Fixed the number of reported keys in the background cache queue. #2764
2627
* [BUGFIX] Fix race in processing of headers in sharded queries. #2762

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,11 @@ walconfig:
447447
# CLI flag: -ingester.checkpoint-duration
448448
[checkpoint_duration: <duration> | default = 30m]
449449
450+
# When WAL is enabled, should chunks be flushed to long-term storage on
451+
# shutdown. Useful eg. for migration to blocks engine.
452+
# CLI flag: -ingester.flush-on-shutdown-with-wal-enabled
453+
[flush_on_shutdown_with_wal_enabled: <boolean> | default = false]
454+
450455
lifecycler:
451456
ring:
452457
kvstore:

pkg/ingester/ingester.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
199199
// During WAL recovery, it will create new user states which requires the limiter.
200200
// Hence initialise the limiter before creating the WAL.
201201
// The '!cfg.WALConfig.WALEnabled' argument says don't flush on shutdown if the WAL is enabled.
202-
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.WALEnabled, registerer)
202+
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.WALEnabled || cfg.WALConfig.FlushOnShutdown, registerer)
203203
if err != nil {
204204
return nil, err
205205
}

pkg/ingester/wal.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type WALConfig struct {
3737
Recover bool `yaml:"recover_from_wal"`
3838
Dir string `yaml:"wal_dir"`
3939
CheckpointDuration time.Duration `yaml:"checkpoint_duration"`
40+
FlushOnShutdown bool `yaml:"flush_on_shutdown_with_wal_enabled"`
4041
// We always checkpoint during shutdown. This option exists for the tests.
4142
checkpointDuringShutdown bool
4243
}
@@ -48,6 +49,7 @@ func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) {
4849
f.BoolVar(&cfg.WALEnabled, "ingester.wal-enabled", false, "Enable writing of ingested data into WAL.")
4950
f.BoolVar(&cfg.CheckpointEnabled, "ingester.checkpoint-enabled", true, "Enable checkpointing of in-memory chunks. It should always be true when using normally. Set it to false iff you are doing some small tests as there is no mechanism to delete the old WAL yet if checkpoint is disabled.")
5051
f.DurationVar(&cfg.CheckpointDuration, "ingester.checkpoint-duration", 30*time.Minute, "Interval at which checkpoints should be created.")
52+
f.BoolVar(&cfg.FlushOnShutdown, "ingester.flush-on-shutdown-with-wal-enabled", false, "When WAL is enabled, should chunks be flushed to long-term storage on shutdown. Useful eg. for migration to blocks engine.")
5153
cfg.checkpointDuringShutdown = true
5254
}
5355

pkg/ring/lifecycler.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
perrors "github.com/pkg/errors"
1515
"github.com/prometheus/client_golang/prometheus"
1616
"github.com/prometheus/client_golang/prometheus/promauto"
17+
"go.uber.org/atomic"
1718

1819
"github.com/cortexproject/cortex/pkg/ring/kv"
1920
"github.com/cortexproject/cortex/pkg/util"
@@ -121,7 +122,7 @@ type Lifecycler struct {
121122
Zone string
122123

123124
// Whether to flush if transfer fails on shutdown.
124-
flushOnShutdown bool
125+
flushOnShutdown *atomic.Bool
125126

126127
// We need to remember the ingester state just in case consul goes away and comes
127128
// back empty. And it changes during lifecycle of ingester.
@@ -177,7 +178,7 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa
177178
ID: cfg.ID,
178179
RingName: ringName,
179180
RingKey: ringKey,
180-
flushOnShutdown: flushOnShutdown,
181+
flushOnShutdown: atomic.NewBool(flushOnShutdown),
181182
Zone: zone,
182183

183184
actorChan: make(chan func()),
@@ -727,17 +728,17 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) {
727728

728729
// FlushOnShutdown returns if flushing is enabled if transfer fails on a shutdown.
729730
func (i *Lifecycler) FlushOnShutdown() bool {
730-
return i.flushOnShutdown
731+
return i.flushOnShutdown.Load()
731732
}
732733

733734
// SetFlushOnShutdown enables/disables flush on shutdown if transfer fails.
734735
// Passing 'true' enables it, and 'false' disabled it.
735736
func (i *Lifecycler) SetFlushOnShutdown(flushOnShutdown bool) {
736-
i.flushOnShutdown = flushOnShutdown
737+
i.flushOnShutdown.Store(flushOnShutdown)
737738
}
738739

739740
func (i *Lifecycler) processShutdown(ctx context.Context) {
740-
flushRequired := i.flushOnShutdown
741+
flushRequired := i.flushOnShutdown.Load()
741742
transferStart := time.Now()
742743
if err := i.flushTransferer.TransferOut(ctx); err != nil {
743744
if err == ErrTransferDisabled {

0 commit comments

Comments
 (0)