From f7c407db618f905b7acd493d0af2cf7c8f67c3be Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Thu, 5 Dec 2019 20:37:36 +0000 Subject: [PATCH 1/2] Add flush job mode to ingester. This is my attempt to simplify things compared to #1747 /cc @jtlisi @codesome Signed-off-by: Goutham Veeramachaneni --- pkg/cortex/cortex.go | 20 +++++++++++++++++++- pkg/ingester/ingester.go | 16 ++++++++++++++++ pkg/ingester/transfer.go | 2 +- pkg/ring/lifecycler.go | 12 +++++++----- 4 files changed, 43 insertions(+), 7 deletions(-) diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 2bd5204fc41..ca5dfb77699 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -142,6 +142,8 @@ func (c *Config) Validate() error { // Cortex is the root datastructure for Cortex. type Cortex struct { + cfg Config + target moduleName httpAuthMiddleware middleware.Interface @@ -171,6 +173,7 @@ func New(cfg Config) (*Cortex, error) { } cortex := &Cortex{ + cfg: cfg, target: cfg.Target, } @@ -238,7 +241,22 @@ func (t *Cortex) initModule(cfg *Config, m moduleName) error { // Run starts Cortex running, and blocks until a signal is received. func (t *Cortex) Run() error { - return t.server.Run() + var err error + done := make(chan struct{}) + go func() { + err = t.server.Run() + + close(done) + }() + + // Initiate shutdown if its a job to flush data. + if t.cfg.Ingester.IsFlushJob { + _ = t.Stop() + } + + <-done + + return err } // Stop gracefully stops a Cortex. diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 2cbb5899d75..5a544889e35 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -118,6 +118,10 @@ type Config struct { RateUpdatePeriod time.Duration + // Controls whether this is a job that solely flushes and exits or not. + // Useful when paired with WAL to flush WAL data immediately in case of incidents. + IsFlushJob bool + // Use tsdb block storage TSDBEnabled bool `yaml:"-"` TSDBConfig tsdb.Config `yaml:"-"` @@ -145,6 +149,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.SpreadFlushes, "ingester.spread-flushes", false, "If true, spread series flushes across the whole period of MaxChunkAge") f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 50, "Number of concurrent goroutines flushing to dynamodb.") f.DurationVar(&cfg.RateUpdatePeriod, "ingester.rate-update-period", 15*time.Second, "Period with which to update the per-user ingestion rates.") + f.BoolVar(&cfg.IsFlushJob, "ingester.is-flush-job", false, "Enables flush mode where the ingester just flushes and exits.") } // Ingester deals with "in flight" chunks. Based on Prometheus 1.x @@ -204,6 +209,12 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes, cfg.ConcurrentFlushes), } + // If it's a flush job then transfers needs to be disabled. + if cfg.IsFlushJob { + cfg.LifecyclerConfig.NumTokens = 0 + cfg.MaxTransferRetries = -1 // Disables transfers. + } + var err error i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester") if err != nil { @@ -214,6 +225,11 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels) i.userStates = newUserStates(i.limiter, cfg) + // If its a job, the ingester should not participate in the transfer or ingestion logic. It already has 0 tokens, + // and if we set the initial state to ACTIVE, it won't be transferred to. + if cfg.IsFlushJob { + i.lifecycler.SetState(ring.ACTIVE) + } // Now that user states have been created, we can start the lifecycler i.lifecycler.Start() diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 54df1d6ade4..76fdee5a6c3 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -368,7 +368,7 @@ func fromWireChunks(wireChunks []client.Chunk) ([]*desc, error) { // TransferOut finds an ingester in PENDING state and transfers our chunks to it. // Called as part of the ingester shutdown process. func (i *Ingester) TransferOut(ctx context.Context) error { - if i.cfg.MaxTransferRetries <= 0 { + if i.cfg.MaxTransferRetries <= 0 || i.cfg.IsFlushJob { return fmt.Errorf("transfers disabled") } backoff := util.NewBackoff(ctx, util.BackoffConfig{ diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index c1e7172295f..8ed085746fe 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -233,7 +233,9 @@ func (i *Lifecycler) GetState() IngesterState { return i.state } -func (i *Lifecycler) setState(state IngesterState) { +// SetState sets the internal state. This doesn't change the state in the ring. +// Use ChangeState instead. +func (i *Lifecycler) SetState(state IngesterState) { i.stateMtx.Lock() defer i.stateMtx.Unlock() i.state = state @@ -481,7 +483,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error { if len(tokensFromFile) > 0 { level.Info(util.Logger).Log("msg", "adding tokens from file", "num_tokens", len(tokensFromFile)) if len(tokensFromFile) >= i.cfg.NumTokens { - i.setState(ACTIVE) + i.SetState(ACTIVE) } ringDesc.AddIngester(i.ID, i.Addr, tokensFromFile, i.GetState(), i.cfg.NormaliseTokens) i.setTokens(tokensFromFile) @@ -495,7 +497,7 @@ func (i *Lifecycler) initRing(ctx context.Context) error { } // We exist in the ring, so assume the ring is right and copy out tokens & state out of there. - i.setState(ingesterDesc.State) + i.SetState(ingesterDesc.State) tokens, _ := ringDesc.TokensFor(i.ID) i.setTokens(tokens) @@ -595,7 +597,7 @@ func (i *Lifecycler) autoJoin(ctx context.Context, targetState IngesterState) er } newTokens := GenerateTokens(i.cfg.NumTokens-len(myTokens), takenTokens) - i.setState(targetState) + i.SetState(targetState) ringDesc.AddIngester(i.ID, i.Addr, newTokens, i.GetState(), i.cfg.NormaliseTokens) myTokens = append(myTokens, newTokens...) @@ -662,7 +664,7 @@ func (i *Lifecycler) changeState(ctx context.Context, state IngesterState) error } level.Info(util.Logger).Log("msg", "changing ingester state from", "old_state", currState, "new_state", state) - i.setState(state) + i.SetState(state) return i.updateConsul(ctx) } From 212ebd309dc6c3aa2c9dc5f9d12a3adc68cd863d Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Tue, 10 Dec 2019 15:18:14 +0530 Subject: [PATCH 2/2] Handle multiple stop calls gracefully Signed-off-by: Ganesh Vernekar --- cmd/cortex/main.go | 2 +- pkg/cortex/cortex.go | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/cmd/cortex/main.go b/cmd/cortex/main.go index 994206faf00..89d7cdcae10 100644 --- a/cmd/cortex/main.go +++ b/cmd/cortex/main.go @@ -81,7 +81,7 @@ func main() { runtime.KeepAlive(ballast) err = t.Stop() - util.CheckFatal("initializing cortex", err) + util.CheckFatal("stopping cortex", err) } // LoadConfig read YAML-formatted config from filename into cfg. diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index ca5dfb77699..656e586aea5 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -161,6 +161,8 @@ type Cortex struct { configAPI *api.API configDB db.DB alertmanager *alertmanager.MultitenantAlertmanager + + stopped bool } // New makes a new Cortex. @@ -261,6 +263,11 @@ func (t *Cortex) Run() error { // Stop gracefully stops a Cortex. func (t *Cortex) Stop() error { + if t.stopped { + return nil + } + + t.stopped = true t.stopModule(t.target) deps := orderedDeps(t.target) // iterate over our deps in reverse order and call stopModule