diff --git a/CHANGELOG.md b/CHANGELOG.md index 638e41727f4..5265805b7c4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ * `cortex_prometheus_notifications_queue_length` * `cortex_prometheus_notifications_queue_capacity` * `cortex_prometheus_notifications_alertmanagers_discovered` +* [ENHANCEMENT] Added `-ingester.flush-on-shutdown-with-wal-enabled` option to enable chunks flushing even when WAL is enabled. #2780 * [BUGFIX] Fixed a bug in the index intersect code causing storage to return more chunks/series than required. #2796 * [BUGFIX] Fixed the number of reported keys in the background cache queue. #2764 * [BUGFIX] Fix race in processing of headers in sharded queries. #2762 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 1eb40782005..8954a8ee3bf 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -447,6 +447,11 @@ walconfig: # CLI flag: -ingester.checkpoint-duration [checkpoint_duration: | default = 30m] + # When WAL is enabled, should chunks be flushed to long-term storage on + # shutdown. Useful eg. for migration to blocks engine. + # CLI flag: -ingester.flush-on-shutdown-with-wal-enabled + [flush_on_shutdown_with_wal_enabled: | default = false] + lifecycler: ring: kvstore: diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index a80126a52bb..cc82fbd74ad 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -199,7 +199,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c // During WAL recovery, it will create new user states which requires the limiter. // Hence initialise the limiter before creating the WAL. // The '!cfg.WALConfig.WALEnabled' argument says don't flush on shutdown if the WAL is enabled. - i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.WALEnabled, registerer) + i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.WALEnabled || cfg.WALConfig.FlushOnShutdown, registerer) if err != nil { return nil, err } diff --git a/pkg/ingester/wal.go b/pkg/ingester/wal.go index cf23789330e..203587d2734 100644 --- a/pkg/ingester/wal.go +++ b/pkg/ingester/wal.go @@ -37,6 +37,7 @@ type WALConfig struct { Recover bool `yaml:"recover_from_wal"` Dir string `yaml:"wal_dir"` CheckpointDuration time.Duration `yaml:"checkpoint_duration"` + FlushOnShutdown bool `yaml:"flush_on_shutdown_with_wal_enabled"` // We always checkpoint during shutdown. This option exists for the tests. checkpointDuringShutdown bool } @@ -48,6 +49,7 @@ func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.WALEnabled, "ingester.wal-enabled", false, "Enable writing of ingested data into WAL.") f.BoolVar(&cfg.CheckpointEnabled, "ingester.checkpoint-enabled", true, "Enable checkpointing of in-memory chunks. It should always be true when using normally. Set it to false iff you are doing some small tests as there is no mechanism to delete the old WAL yet if checkpoint is disabled.") f.DurationVar(&cfg.CheckpointDuration, "ingester.checkpoint-duration", 30*time.Minute, "Interval at which checkpoints should be created.") + f.BoolVar(&cfg.FlushOnShutdown, "ingester.flush-on-shutdown-with-wal-enabled", false, "When WAL is enabled, should chunks be flushed to long-term storage on shutdown. Useful eg. for migration to blocks engine.") cfg.checkpointDuringShutdown = true } diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 6e9e4e4b347..a026a27064d 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -14,6 +14,7 @@ import ( perrors "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "go.uber.org/atomic" "github.com/cortexproject/cortex/pkg/ring/kv" "github.com/cortexproject/cortex/pkg/util" @@ -121,7 +122,7 @@ type Lifecycler struct { Zone string // Whether to flush if transfer fails on shutdown. - flushOnShutdown bool + flushOnShutdown *atomic.Bool // We need to remember the ingester state just in case consul goes away and comes // back empty. And it changes during lifecycle of ingester. @@ -177,7 +178,7 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa ID: cfg.ID, RingName: ringName, RingKey: ringKey, - flushOnShutdown: flushOnShutdown, + flushOnShutdown: atomic.NewBool(flushOnShutdown), Zone: zone, actorChan: make(chan func()), @@ -727,17 +728,17 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { // FlushOnShutdown returns if flushing is enabled if transfer fails on a shutdown. func (i *Lifecycler) FlushOnShutdown() bool { - return i.flushOnShutdown + return i.flushOnShutdown.Load() } // SetFlushOnShutdown enables/disables flush on shutdown if transfer fails. // Passing 'true' enables it, and 'false' disabled it. func (i *Lifecycler) SetFlushOnShutdown(flushOnShutdown bool) { - i.flushOnShutdown = flushOnShutdown + i.flushOnShutdown.Store(flushOnShutdown) } func (i *Lifecycler) processShutdown(ctx context.Context) { - flushRequired := i.flushOnShutdown + flushRequired := i.flushOnShutdown.Load() transferStart := time.Now() if err := i.flushTransferer.TransferOut(ctx); err != nil { if err == ErrTransferDisabled {