diff --git a/CHANGELOG.md b/CHANGELOG.md index 30925513f65..909c56c627d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ * [ENHANCEMENT] Experimental TSDB: Added dedicated flag `-experimental.tsdb.bucket-store.tenant-sync-concurrency` to configure the maximum number of concurrent tenants for which blocks are synched. #2026 * [ENHANCEMENT] Experimental TSDB: Expose metrics for objstore operations (prefixed with `cortex__thanos_objstore_`, component being one of `ingester`, `querier` and `compactor`). #2027 * [BUGFIX] Experimental TSDB: fixed `/all_user_stats` and `/api/prom/user_stats` endpoints when using the experimental TSDB blocks storage. #2042 +* [BUGFIX] Fixed parsing of the WAL configuration when specified in the YAML config file. #2071 Cortex 0.4.0 is the last version that can *write* denormalised tokens. Cortex 0.5.0 and above always write normalised tokens. diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index e62ea3d0f88..26536bdeace 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -42,7 +42,7 @@ var ( // Config for an Ingester. type Config struct { - WALConfig WALConfig + WALConfig WALConfig `yaml:"walconfig,omitempty"` LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"` // Config for transferring chunks. Zero or negative = no retries. @@ -141,7 +141,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c return NewV2(cfg, clientConfig, limits, registerer) } - if cfg.WALConfig.walEnabled { + if cfg.WALConfig.WALEnabled { // If WAL is enabled, we don't transfer out the data to any ingester. // Either the next ingester which takes it's place should recover from WAL // or the data has to be flushed during scaledown. @@ -167,14 +167,14 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c var err error // During WAL recovery, it will create new user states which requires the limiter. // Hence initialise the limiter before creating the WAL. - // The '!cfg.WALConfig.walEnabled' argument says don't flush on shutdown if the WAL is enabled. - i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.walEnabled) + // The '!cfg.WALConfig.WALEnabled' argument says don't flush on shutdown if the WAL is enabled. + i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.WALEnabled) if err != nil { return nil, err } i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels) - if cfg.WALConfig.recover { + if cfg.WALConfig.Recover { level.Info(util.Logger).Log("msg", "recovering from WAL") start := time.Now() if err := recoverFromWAL(i); err != nil { @@ -286,7 +286,7 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. var lastPartialErr *validationError var record *Record - if i.cfg.WALConfig.walEnabled { + if i.cfg.WALConfig.WALEnabled { record = recordPool.Get().(*Record) record.UserId = userID // Assuming there is not much churn in most cases, there is no use diff --git a/pkg/ingester/wal.go b/pkg/ingester/wal.go index 031c0194ae7..cf0f30b19d8 100644 --- a/pkg/ingester/wal.go +++ b/pkg/ingester/wal.go @@ -29,21 +29,21 @@ import ( // WALConfig is config for the Write Ahead Log. type WALConfig struct { - walEnabled bool `yaml:"wal_enabled,omitempty"` - checkpointEnabled bool `yaml:"checkpoint_enabled,omitempty"` - recover bool `yaml:"recover_from_wal,omitempty"` - dir string `yaml:"wal_dir,omitempty"` - checkpointDuration time.Duration `yaml:"checkpoint_duration,omitempty"` - metricsRegisterer prometheus.Registerer + WALEnabled bool `yaml:"wal_enabled,omitempty"` + CheckpointEnabled bool `yaml:"checkpoint_enabled,omitempty"` + Recover bool `yaml:"recover_from_wal,omitempty"` + Dir string `yaml:"wal_dir,omitempty"` + CheckpointDuration time.Duration `yaml:"checkpoint_duration,omitempty"` + metricsRegisterer prometheus.Registerer `yaml:"-"` } // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) { - f.StringVar(&cfg.dir, "ingester.wal-dir", "wal", "Directory to store the WAL and/or recover from WAL.") - f.BoolVar(&cfg.recover, "ingester.recover-from-wal", false, "Recover data from existing WAL irrespective of WAL enabled/disabled.") - f.BoolVar(&cfg.walEnabled, "ingester.wal-enabled", false, "Enable writing of ingested data into WAL.") - f.BoolVar(&cfg.checkpointEnabled, "ingester.checkpoint-enabled", false, "Enable checkpointing of in-memory chunks.") - f.DurationVar(&cfg.checkpointDuration, "ingester.checkpoint-duration", 30*time.Minute, "Interval at which checkpoints should be created.") + f.StringVar(&cfg.Dir, "ingester.wal-dir", "wal", "Directory to store the WAL and/or recover from WAL.") + f.BoolVar(&cfg.Recover, "ingester.recover-from-wal", false, "Recover data from existing WAL irrespective of WAL enabled/disabled.") + f.BoolVar(&cfg.WALEnabled, "ingester.wal-enabled", false, "Enable writing of ingested data into WAL.") + f.BoolVar(&cfg.CheckpointEnabled, "ingester.checkpoint-enabled", false, "Enable checkpointing of in-memory chunks.") + f.DurationVar(&cfg.CheckpointDuration, "ingester.checkpoint-duration", 30*time.Minute, "Interval at which checkpoints should be created.") } // WAL interface allows us to have a no-op WAL when the WAL is disabled. @@ -77,7 +77,7 @@ type walWrapper struct { // newWAL creates a WAL object. If the WAL is disabled, then the returned WAL is a no-op WAL. func newWAL(cfg WALConfig, userStatesFunc func() map[string]*userState) (WAL, error) { - if !cfg.walEnabled { + if !cfg.WALEnabled { return &noopWAL{}, nil } @@ -85,7 +85,7 @@ func newWAL(cfg WALConfig, userStatesFunc func() map[string]*userState) (WAL, er if cfg.metricsRegisterer != nil { walRegistry = prometheus.WrapRegistererWith(prometheus.Labels{"kind": "wal"}, cfg.metricsRegisterer) } - tsdbWAL, err := wal.NewSize(util.Logger, walRegistry, cfg.dir, wal.DefaultSegmentSize/4, true) + tsdbWAL, err := wal.NewSize(util.Logger, walRegistry, cfg.Dir, wal.DefaultSegmentSize/4, true) if err != nil { return nil, err } @@ -158,11 +158,11 @@ func (w *walWrapper) Log(record *Record) error { func (w *walWrapper) run() { defer w.wait.Done() - if !w.cfg.checkpointEnabled { + if !w.cfg.CheckpointEnabled { return } - ticker := time.NewTicker(w.cfg.checkpointDuration) + ticker := time.NewTicker(w.cfg.CheckpointDuration) defer ticker.Stop() for { @@ -190,7 +190,7 @@ func (w *walWrapper) run() { const checkpointPrefix = "checkpoint." func (w *walWrapper) performCheckpoint() (err error) { - if !w.cfg.checkpointEnabled { + if !w.cfg.CheckpointEnabled { return nil } @@ -359,7 +359,7 @@ func (w *walWrapper) checkpointSeries(cp *wal.WAL, userID string, fp model.Finge } func recoverFromWAL(ingester *Ingester) (err error) { - walDir := ingester.cfg.WALConfig.dir + walDir := ingester.cfg.WALConfig.Dir // Use a local userStates, so we don't need to worry about locking. userStates := newUserStates(ingester.limiter, ingester.cfg, ingester.metrics) diff --git a/pkg/ingester/wal_test.go b/pkg/ingester/wal_test.go index c0baa0989a9..aab64961487 100644 --- a/pkg/ingester/wal_test.go +++ b/pkg/ingester/wal_test.go @@ -20,11 +20,11 @@ func TestWAL(t *testing.T) { require.NoError(t, err) cfg := defaultIngesterTestConfig() - cfg.WALConfig.walEnabled = true - cfg.WALConfig.checkpointEnabled = true - cfg.WALConfig.recover = true - cfg.WALConfig.dir = dirname - cfg.WALConfig.checkpointDuration = 100 * time.Millisecond + cfg.WALConfig.WALEnabled = true + cfg.WALConfig.CheckpointEnabled = true + cfg.WALConfig.Recover = true + cfg.WALConfig.Dir = dirname + cfg.WALConfig.CheckpointDuration = 100 * time.Millisecond numSeries := 100 numSamplesPerSeriesPerPush := 10 @@ -37,8 +37,8 @@ func TestWAL(t *testing.T) { for r := 0; r < numRestarts; r++ { if r == numRestarts-1 { - cfg.WALConfig.walEnabled = false - cfg.WALConfig.checkpointEnabled = false + cfg.WALConfig.WALEnabled = false + cfg.WALConfig.CheckpointEnabled = false } // Start a new ingester and recover the WAL. _, ing = newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig())