diff --git a/CHANGELOG.md b/CHANGELOG.md index 2572d0fa833..c4eb67a8a57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ * [CHANGE] If you are vendoring Cortex and use its components in your project, be aware that many Cortex components no longer start automatically when they are created. You may want to review PR and attached document. #2166 * [CHANGE] Cortex now has /ready probe for all services, not just ingester and querier as before. In single-binary mode, /ready reports 204 only if all components are running properly. #2166 * [CHANGE] Experimental TSDB: switched the blocks storage index header to the binary format. This change is expected to have no visible impact, except lower startup times and memory usage in the queriers. It's possible to switch back to the old JSON format via the flag `-experimental.tsdb.bucket-store.binary-index-header-enabled=false`. #2223 +* [CHANGE] WAL replays are now done while the rest of Cortex is starting, and more specifically, when HTTP server is running. This makes it possible to scrape metrics during WAL replays. Applies to both chunks and experimental blocks storage. #2222 * [FEATURE] Added a read-only local alertmanager config store using files named corresponding to their tenant id. #2125 * [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947 * `--experimental.distributor.user-subring-size` diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index a0d3387a0a0..7b8c1e52f2d 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -171,12 +171,17 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c } i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels) - if cfg.WALConfig.Recover { + i.Service = services.NewBasicService(i.starting, i.loop, i.stopping) + return i, nil +} + +func (i *Ingester) starting(ctx context.Context) error { + if i.cfg.WALConfig.Recover { level.Info(util.Logger).Log("msg", "recovering from WAL") start := time.Now() if err := recoverFromWAL(i); err != nil { level.Error(util.Logger).Log("msg", "failed to recover from WAL", "time", time.Since(start).String()) - return nil, err + return errors.Wrap(err, "failed to recover from WAL") } elapsed := time.Since(start) level.Info(util.Logger).Log("msg", "recovery from WAL completed", "time", elapsed.String()) @@ -185,20 +190,15 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c // If the WAL recover happened, then the userStates would already be set. if i.userStates == nil { - i.userStates = newUserStates(i.limiter, cfg, i.metrics) + i.userStates = newUserStates(i.limiter, i.cfg, i.metrics) } - i.wal, err = newWAL(cfg.WALConfig, i.userStates.cp) + var err error + i.wal, err = newWAL(i.cfg.WALConfig, i.userStates.cp) if err != nil { - return nil, err + return errors.Wrap(err, "starting WAL") } - // TODO: lot more stuff can be put into startingFn (esp. WAL replay), but for now keep it in New - i.Service = services.NewBasicService(i.starting, i.loop, i.stopping) - return i, nil -} - -func (i *Ingester) starting(ctx context.Context) error { // Now that user states have been created, we can start the lifecycler. // Important: we want to keep lifecycler running until we ask it to stop, so we need to give it independent context if err := i.lifecycler.StartAsync(context.Background()); err != nil { @@ -268,8 +268,23 @@ func (i *Ingester) stopIncomingRequests() { i.stopped = true } +// check that ingester has finished starting, i.e. it is in Running or Stopping state. +// Why Stopping? Because ingester still runs, even when it is transferring data out in Stopping state. +// Ingester handles this state on its own (via `stopped` flag). +func (i *Ingester) checkRunningOrStopping() error { + s := i.State() + if s == services.Running || s == services.Stopping { + return nil + } + return status.Error(codes.Unavailable, s.String()) +} + // Push implements client.IngesterServer func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) { + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } + if i.cfg.TSDBEnabled { return i.v2Push(ctx, req) } @@ -414,6 +429,10 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, // Query implements service.IngesterServer func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) { + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } + if i.cfg.TSDBEnabled { return i.v2Query(ctx, req) } @@ -477,6 +496,10 @@ func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client // QueryStream implements service.IngesterServer func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error { + if err := i.checkRunningOrStopping(); err != nil { + return err + } + if i.cfg.TSDBEnabled { return i.v2QueryStream(req, stream) } @@ -553,6 +576,10 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ // LabelValues returns all label values that are associated with a given label name. func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) { + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } + if i.cfg.TSDBEnabled { return i.v2LabelValues(ctx, req) } @@ -574,6 +601,10 @@ func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesReque // LabelNames return all the label names. func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) { + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } + if i.cfg.TSDBEnabled { return i.v2LabelNames(ctx, req) } @@ -595,6 +626,10 @@ func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest // MetricsForLabelMatchers returns all the metrics which match a set of matchers. func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) { + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } + if i.cfg.TSDBEnabled { return i.v2MetricsForLabelMatchers(ctx, req) } @@ -638,6 +673,10 @@ func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.Metr // UserStats returns ingestion statistics for the current user. func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UserStatsResponse, error) { + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } + if i.cfg.TSDBEnabled { return i.v2UserStats(ctx, req) } @@ -663,6 +702,10 @@ func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) // AllUserStats returns ingestion statistics for all users known to this ingester. func (i *Ingester) AllUserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UsersStatsResponse, error) { + if err := i.checkRunningOrStopping(); err != nil { + return nil, err + } + if i.cfg.TSDBEnabled { return i.v2AllUserStats(ctx, req) } @@ -692,6 +735,10 @@ func (i *Ingester) AllUserStats(ctx context.Context, req *client.UserStatsReques // Check implements the grpc healthcheck func (i *Ingester) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { + if err := i.checkRunningOrStopping(); err != nil { + return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, nil + } + return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil } @@ -704,8 +751,8 @@ func (i *Ingester) Watch(in *grpc_health_v1.HealthCheckRequest, stream grpc_heal // the addition removal of another ingester. Returns 204 when the ingester is // ready, 500 otherwise. func (i *Ingester) CheckReady(ctx context.Context) error { - if s := i.State(); s != services.Running { - return fmt.Errorf("service not Running: %v", s) + if err := i.checkRunningOrStopping(); err != nil { + return fmt.Errorf("ingester not ready: %v", err) } return i.lifecycler.CheckReady(ctx) } diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 3db27693d3f..592ca1fe0b9 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -131,16 +131,16 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels) i.userStates = newUserStates(i.limiter, cfg, i.metrics) - // Scan and open TSDB's that already exist on disk // TODO: move to starting - if err := i.openExistingTSDB(context.Background()); err != nil { - return nil, err - } - i.Service = services.NewBasicService(i.startingV2, i.updateLoop, i.stoppingV2) return i, nil } func (i *Ingester) startingV2(ctx context.Context) error { + // Scan and open TSDB's that already exist on disk + if err := i.openExistingTSDB(context.Background()); err != nil { + return errors.Wrap(err, "opening existing TSDBs") + } + // Important: we want to keep lifecycler running until we ask it to stop, so we need to give it independent context if err := i.lifecycler.StartAsync(context.Background()); err != nil { return errors.Wrap(err, "failed to start lifecycler")