Skip to content

fix wal config not work on config.yml #2071

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [ENHANCEMENT] Experimental TSDB: Added dedicated flag `-experimental.tsdb.bucket-store.tenant-sync-concurrency` to configure the maximum number of concurrent tenants for which blocks are synched. #2026
* [ENHANCEMENT] Experimental TSDB: Expose metrics for objstore operations (prefixed with `cortex_<component>_thanos_objstore_`, component being one of `ingester`, `querier` and `compactor`). #2027
* [BUGFIX] Experimental TSDB: fixed `/all_user_stats` and `/api/prom/user_stats` endpoints when using the experimental TSDB blocks storage. #2042
* [BUGFIX] Fixed parsing of the WAL configuration when specified in the YAML config file. #2071

Cortex 0.4.0 is the last version that can *write* denormalised tokens. Cortex 0.5.0 and above always write normalised tokens.

Expand Down
12 changes: 6 additions & 6 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var (

// Config for an Ingester.
type Config struct {
WALConfig WALConfig
WALConfig WALConfig `yaml:"walconfig,omitempty"`
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`

// Config for transferring chunks. Zero or negative = no retries.
Expand Down Expand Up @@ -141,7 +141,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
return NewV2(cfg, clientConfig, limits, registerer)
}

if cfg.WALConfig.walEnabled {
if cfg.WALConfig.WALEnabled {
// If WAL is enabled, we don't transfer out the data to any ingester.
// Either the next ingester which takes it's place should recover from WAL
// or the data has to be flushed during scaledown.
Expand All @@ -167,14 +167,14 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
var err error
// During WAL recovery, it will create new user states which requires the limiter.
// Hence initialise the limiter before creating the WAL.
// The '!cfg.WALConfig.walEnabled' argument says don't flush on shutdown if the WAL is enabled.
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.walEnabled)
// The '!cfg.WALConfig.WALEnabled' argument says don't flush on shutdown if the WAL is enabled.
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.WALEnabled)
if err != nil {
return nil, err
}
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)

if cfg.WALConfig.recover {
if cfg.WALConfig.Recover {
level.Info(util.Logger).Log("msg", "recovering from WAL")
start := time.Now()
if err := recoverFromWAL(i); err != nil {
Expand Down Expand Up @@ -286,7 +286,7 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.

var lastPartialErr *validationError
var record *Record
if i.cfg.WALConfig.walEnabled {
if i.cfg.WALConfig.WALEnabled {
record = recordPool.Get().(*Record)
record.UserId = userID
// Assuming there is not much churn in most cases, there is no use
Expand Down
34 changes: 17 additions & 17 deletions pkg/ingester/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,21 @@ import (

// WALConfig is config for the Write Ahead Log.
type WALConfig struct {
walEnabled bool `yaml:"wal_enabled,omitempty"`
checkpointEnabled bool `yaml:"checkpoint_enabled,omitempty"`
recover bool `yaml:"recover_from_wal,omitempty"`
dir string `yaml:"wal_dir,omitempty"`
checkpointDuration time.Duration `yaml:"checkpoint_duration,omitempty"`
metricsRegisterer prometheus.Registerer
WALEnabled bool `yaml:"wal_enabled,omitempty"`
CheckpointEnabled bool `yaml:"checkpoint_enabled,omitempty"`
Recover bool `yaml:"recover_from_wal,omitempty"`
Dir string `yaml:"wal_dir,omitempty"`
CheckpointDuration time.Duration `yaml:"checkpoint_duration,omitempty"`
metricsRegisterer prometheus.Registerer `yaml:"-"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.dir, "ingester.wal-dir", "wal", "Directory to store the WAL and/or recover from WAL.")
f.BoolVar(&cfg.recover, "ingester.recover-from-wal", false, "Recover data from existing WAL irrespective of WAL enabled/disabled.")
f.BoolVar(&cfg.walEnabled, "ingester.wal-enabled", false, "Enable writing of ingested data into WAL.")
f.BoolVar(&cfg.checkpointEnabled, "ingester.checkpoint-enabled", false, "Enable checkpointing of in-memory chunks.")
f.DurationVar(&cfg.checkpointDuration, "ingester.checkpoint-duration", 30*time.Minute, "Interval at which checkpoints should be created.")
f.StringVar(&cfg.Dir, "ingester.wal-dir", "wal", "Directory to store the WAL and/or recover from WAL.")
f.BoolVar(&cfg.Recover, "ingester.recover-from-wal", false, "Recover data from existing WAL irrespective of WAL enabled/disabled.")
f.BoolVar(&cfg.WALEnabled, "ingester.wal-enabled", false, "Enable writing of ingested data into WAL.")
f.BoolVar(&cfg.CheckpointEnabled, "ingester.checkpoint-enabled", false, "Enable checkpointing of in-memory chunks.")
f.DurationVar(&cfg.CheckpointDuration, "ingester.checkpoint-duration", 30*time.Minute, "Interval at which checkpoints should be created.")
}

// WAL interface allows us to have a no-op WAL when the WAL is disabled.
Expand Down Expand Up @@ -77,15 +77,15 @@ type walWrapper struct {

// newWAL creates a WAL object. If the WAL is disabled, then the returned WAL is a no-op WAL.
func newWAL(cfg WALConfig, userStatesFunc func() map[string]*userState) (WAL, error) {
if !cfg.walEnabled {
if !cfg.WALEnabled {
return &noopWAL{}, nil
}

var walRegistry prometheus.Registerer
if cfg.metricsRegisterer != nil {
walRegistry = prometheus.WrapRegistererWith(prometheus.Labels{"kind": "wal"}, cfg.metricsRegisterer)
}
tsdbWAL, err := wal.NewSize(util.Logger, walRegistry, cfg.dir, wal.DefaultSegmentSize/4, true)
tsdbWAL, err := wal.NewSize(util.Logger, walRegistry, cfg.Dir, wal.DefaultSegmentSize/4, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -158,11 +158,11 @@ func (w *walWrapper) Log(record *Record) error {
func (w *walWrapper) run() {
defer w.wait.Done()

if !w.cfg.checkpointEnabled {
if !w.cfg.CheckpointEnabled {
return
}

ticker := time.NewTicker(w.cfg.checkpointDuration)
ticker := time.NewTicker(w.cfg.CheckpointDuration)
defer ticker.Stop()

for {
Expand Down Expand Up @@ -190,7 +190,7 @@ func (w *walWrapper) run() {
const checkpointPrefix = "checkpoint."

func (w *walWrapper) performCheckpoint() (err error) {
if !w.cfg.checkpointEnabled {
if !w.cfg.CheckpointEnabled {
return nil
}

Expand Down Expand Up @@ -359,7 +359,7 @@ func (w *walWrapper) checkpointSeries(cp *wal.WAL, userID string, fp model.Finge
}

func recoverFromWAL(ingester *Ingester) (err error) {
walDir := ingester.cfg.WALConfig.dir
walDir := ingester.cfg.WALConfig.Dir
// Use a local userStates, so we don't need to worry about locking.
userStates := newUserStates(ingester.limiter, ingester.cfg, ingester.metrics)

Expand Down
14 changes: 7 additions & 7 deletions pkg/ingester/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ func TestWAL(t *testing.T) {
require.NoError(t, err)

cfg := defaultIngesterTestConfig()
cfg.WALConfig.walEnabled = true
cfg.WALConfig.checkpointEnabled = true
cfg.WALConfig.recover = true
cfg.WALConfig.dir = dirname
cfg.WALConfig.checkpointDuration = 100 * time.Millisecond
cfg.WALConfig.WALEnabled = true
cfg.WALConfig.CheckpointEnabled = true
cfg.WALConfig.Recover = true
cfg.WALConfig.Dir = dirname
cfg.WALConfig.CheckpointDuration = 100 * time.Millisecond

numSeries := 100
numSamplesPerSeriesPerPush := 10
Expand All @@ -37,8 +37,8 @@ func TestWAL(t *testing.T) {

for r := 0; r < numRestarts; r++ {
if r == numRestarts-1 {
cfg.WALConfig.walEnabled = false
cfg.WALConfig.checkpointEnabled = false
cfg.WALConfig.WALEnabled = false
cfg.WALConfig.CheckpointEnabled = false
}
// Start a new ingester and recover the WAL.
_, ing = newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig())
Expand Down