diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index d493e905715..2e4830f3ac4 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -293,11 +293,15 @@ func (t *Cortex) initIngester(cfg *Config) (err error) { cfg.Ingester.TSDBConfig = cfg.TSDB cfg.Ingester.ShardByAllLabels = cfg.Distributor.ShardByAllLabels - t.ingester, err = ingester.New(cfg.Ingester, cfg.IngesterClient, t.overrides, t.store, prometheus.DefaultRegisterer) - if err != nil { - return + var errfut *ingester.ErrorFuture + t.ingester, errfut = ingester.New(cfg.Ingester, cfg.IngesterClient, t.overrides, t.store, prometheus.DefaultRegisterer) + if done, err := errfut.Get(); done && err != nil { + return err } + // if ingester initialization isn't done yet, we go ahead. Services that use ingester must check its + // readiness status and eventually wait for it to finish initialization. + client.RegisterIngesterServer(t.server.GRPC, t.ingester) grpc_health_v1.RegisterHealthServer(t.server.GRPC, t.ingester) t.server.HTTP.Path("/ready").Handler(http.HandlerFunc(t.ingester.ReadinessHandler)) diff --git a/pkg/ingester/error_future.go b/pkg/ingester/error_future.go new file mode 100644 index 00000000000..615fbf2137f --- /dev/null +++ b/pkg/ingester/error_future.go @@ -0,0 +1,61 @@ +package ingester + +import ( + "context" + "sync" +) + +// NewErrorFuture creates unfinished future. Must be finished by calling Finish method eventually. +func NewErrorFuture() *ErrorFuture { + return &ErrorFuture{ + ch: make(chan struct{}), + } +} + +// NewFinishedErrorFuture creates finished future with given (possibly nil) error. +func NewFinishedErrorFuture(err error) *ErrorFuture { + ef := NewErrorFuture() + ef.Finish(err) + return ef +} + +// This is a Future object, for holding error. +type ErrorFuture struct { + mu sync.Mutex + done bool + err error + ch chan struct{} // used by waiters +} + +// Returns true if this future is done, and associated error. If future is not done yet, returns false. +func (f *ErrorFuture) Get() (bool, error) { + f.mu.Lock() + defer f.mu.Unlock() + return f.done, f.err +} + +// Waits for future to finish, and returns true and associated error set via Finish method. +// If context is finished first, returns false and error from context instead. +func (f *ErrorFuture) WaitAndGet(ctx context.Context) (bool, error) { + d, err := f.Get() + if d { + return true, err + } + + select { + case <-f.ch: + return f.Get() // must return true now + case <-ctx.Done(): + return false, ctx.Err() + } +} + +// Mark this future as finished. Can only be called once. +func (f *ErrorFuture) Finish(err error) { + f.mu.Lock() + defer f.mu.Unlock() + + f.done = true + f.err = err + close(f.ch) // will panic, if called twice, which is fine. +} diff --git a/pkg/ingester/error_future_test.go b/pkg/ingester/error_future_test.go new file mode 100644 index 00000000000..441205bfd25 --- /dev/null +++ b/pkg/ingester/error_future_test.go @@ -0,0 +1,90 @@ +package ingester + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" + "golang.org/x/tools/go/ssa/interp/testdata/src/errors" +) + +func TestBasicErrorFuture(t *testing.T) { + ef := NewErrorFuture() + + done, err := ef.Get() + require.False(t, done) + require.NoError(t, err) + + ef.Finish(nil) + + done, err = ef.Get() + require.True(t, done) + require.NoError(t, err) +} + +func TestAsyncErrorFutureWithSuccess(t *testing.T) { + ef := NewErrorFuture() + go func() { + time.Sleep(100 * time.Millisecond) + ef.Finish(nil) + }() + + done, err := ef.WaitAndGet(context.Background()) + require.True(t, done) + require.NoError(t, err) +} + +func TestAsyncErrorFutureWithError(t *testing.T) { + initError := errors.New("test error") + ef := NewErrorFuture() + go func() { + time.Sleep(100 * time.Millisecond) + ef.Finish(initError) + }() + + done, err := ef.WaitAndGet(context.Background()) + require.True(t, done) + require.Equal(t, initError, err) +} + +func TestAsyncErrorFutureWithTimeout(t *testing.T) { + ef := NewErrorFuture() + go func() { + time.Sleep(500 * time.Millisecond) + ef.Finish(nil) + }() + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + done, err := ef.WaitAndGet(ctx) + require.False(t, done) + require.Equal(t, context.DeadlineExceeded, err) + + // wait for finish + done, err = ef.WaitAndGet(context.Background()) + require.True(t, done) + require.Equal(t, nil, err) +} + +func TestDoubleFinishPanics(t *testing.T) { + var recovered interface{} + + defer func() { + // make sure we have recovered from panic. + if recovered == nil { + t.Fatal("no recovered panic") + } + }() + + defer func() { + recovered = recover() + }() + + ef := NewErrorFuture() + ef.Finish(nil) + ef.Finish(nil) + // if we get here, there was no panic. + t.Fatal("no panic") +} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 26536bdeace..9bb0c854a65 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -15,6 +15,7 @@ import ( "github.com/go-kit/kit/log/level" "github.com/gogo/status" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" @@ -124,6 +125,10 @@ type Ingester struct { // Prometheus block storage TSDBState TSDBState + + // Initialization status. Can be checked to see if ingester is finished with its initialization, + // and if so, whether initialization failed or not. + initDone *ErrorFuture } // ChunkStore is the interface we need to store chunks @@ -132,7 +137,7 @@ type ChunkStore interface { } // New constructs a new Ingester. -func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, chunkStore ChunkStore, registerer prometheus.Registerer) (*Ingester, error) { +func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, chunkStore ChunkStore, registerer prometheus.Registerer) (*Ingester, *ErrorFuture) { if cfg.ingesterClientFactory == nil { cfg.ingesterClientFactory = client.MakeIngesterClient } @@ -162,6 +167,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c chunkStore: chunkStore, quit: make(chan struct{}), flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes, cfg.ConcurrentFlushes), + initDone: NewFinishedErrorFuture(nil), // initialization is finished once New is done. } var err error @@ -170,7 +176,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c // The '!cfg.WALConfig.WALEnabled' argument says don't flush on shutdown if the WAL is enabled. i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.WALEnabled) if err != nil { - return nil, err + return nil, NewFinishedErrorFuture(err) } i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels) @@ -179,7 +185,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c start := time.Now() if err := recoverFromWAL(i); err != nil { level.Error(util.Logger).Log("msg", "failed to recover from WAL", "time", time.Since(start).String()) - return nil, err + return nil, NewFinishedErrorFuture(err) } elapsed := time.Since(start) level.Info(util.Logger).Log("msg", "recovery from WAL completed", "time", elapsed.String()) @@ -193,7 +199,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c i.wal, err = newWAL(cfg.WALConfig, i.userStates.cp) if err != nil { - return nil, err + return nil, NewFinishedErrorFuture(err) } // Now that user states have been created, we can start the lifecycler @@ -208,7 +214,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c i.done.Add(1) go i.loop() - return i, nil + return i, NewFinishedErrorFuture(nil) } func (i *Ingester) loop() { @@ -256,6 +262,11 @@ func (i *Ingester) Shutdown() { // * Change the state of ring to stop accepting writes. // * Flush all the chunks. func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { + if err := i.checkIngesterInitFinished(); err != nil { + http.Error(w, "Not ready: "+err.Error(), http.StatusServiceUnavailable) + return + } + originalState := i.lifecycler.FlushOnShutdown() // We want to flush the chunks if transfer fails irrespective of original flag. i.lifecycler.SetFlushOnShutdown(true) @@ -687,6 +698,10 @@ func (i *Ingester) AllUserStats(ctx old_ctx.Context, req *client.UserStatsReques // Check implements the grpc healthcheck func (i *Ingester) Check(ctx old_ctx.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) { + if err := i.checkIngesterInitFinished(); err != nil { + return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, nil + } + return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil } @@ -699,9 +714,27 @@ func (i *Ingester) Watch(in *grpc_health_v1.HealthCheckRequest, stream grpc_heal // the addition removal of another ingester. Returns 204 when the ingester is // ready, 500 otherwise. func (i *Ingester) ReadinessHandler(w http.ResponseWriter, r *http.Request) { + if err := i.checkIngesterInitFinished(); err != nil { + http.Error(w, "Not ready: "+err.Error(), http.StatusServiceUnavailable) + return + } + if err := i.lifecycler.CheckReady(r.Context()); err == nil { w.WriteHeader(http.StatusNoContent) } else { http.Error(w, "Not ready: "+err.Error(), http.StatusServiceUnavailable) } } + +func (i *Ingester) checkIngesterInitFinished() error { + done, err := i.initDone.Get() + switch { + case !done: + return errors.New("ingester init in progress") + case err != nil: + return errors.New("ingester init failed") + default: + // done and no error, all good. + return nil + } +} diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 2c9a74cc799..21d3750e032 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -44,7 +44,9 @@ func newTestStore(t require.TestingT, cfg Config, clientConfig client.Config, li overrides, err := validation.NewOverrides(limits, nil) require.NoError(t, err) - ing, err := New(cfg, clientConfig, overrides, store, nil) + ing, errfut := New(cfg, clientConfig, overrides, store, nil) + initDone, err := errfut.WaitAndGet(context.Background()) + require.True(t, initDone) require.NoError(t, err) return store, ing diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 463f34198f1..cb396f7945f 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -66,11 +66,11 @@ type TSDBState struct { tsdbMetrics *tsdbMetrics } -// NewV2 returns a new Ingester that uses prometheus block storage instead of chunk storage -func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, error) { +// NewV2 returns a new Ingester that uses prometheus block storage instead of chunk storage. +func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, *ErrorFuture) { bucketClient, err := cortex_tsdb.NewBucketClient(context.Background(), cfg.TSDBConfig, "cortex", util.Logger) if err != nil { - return nil, errors.Wrap(err, "failed to create the bucket client") + return nil, NewFinishedErrorFuture(errors.Wrap(err, "failed to create the bucket client")) } if registerer != nil { @@ -104,31 +104,40 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true) if err != nil { - return nil, err + return nil, NewFinishedErrorFuture(err) } // Init the limter and instantiate the user states which depend on it i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels) i.userStates = newUserStates(i.limiter, cfg, i.metrics) - // Scan and open TSDB's that already exist on disk - if err := i.openExistingTSDB(context.Background()); err != nil { - return nil, err - } + errfut := NewErrorFuture() + // Run the rest of ingeters' initialization in goroutine. + go func() { + // Scan and open TSDB's that already exist on disk + if err := i.openExistingTSDB(context.Background()); err != nil { + errfut.Finish(err) + return + } - // Now that user states have been created, we can start the lifecycler - i.lifecycler.Start() + // Now that user states have been created, we can start the lifecycler + i.lifecycler.Start() + + // Run the blocks shipping in a dedicated go routine. + if i.cfg.TSDBConfig.ShipInterval > 0 { + i.done.Add(1) + go i.shipBlocksLoop() + } - // Run the blocks shipping in a dedicated go routine. - if i.cfg.TSDBConfig.ShipInterval > 0 { i.done.Add(1) - go i.shipBlocksLoop() - } + go i.rateUpdateLoop() - i.done.Add(1) - go i.rateUpdateLoop() + // All fine now + errfut.Finish(nil) + }() - return i, nil + i.initDone = errfut + return i, errfut } func (i *Ingester) rateUpdateLoop() { @@ -154,6 +163,10 @@ func (i *Ingester) rateUpdateLoop() { // v2Push adds metrics to a block func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.WriteResponse, error) { + if err := i.checkIngesterInitFinished(); err != nil { + return nil, err + } + var firstPartialErr error defer client.ReuseSlice(req.Timeseries) @@ -252,6 +265,10 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien } func (i *Ingester) v2Query(ctx old_ctx.Context, req *client.QueryRequest) (*client.QueryResponse, error) { + if err := i.checkIngesterInitFinished(); err != nil { + return nil, err + } + userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, err @@ -307,6 +324,10 @@ func (i *Ingester) v2Query(ctx old_ctx.Context, req *client.QueryRequest) (*clie } func (i *Ingester) v2LabelValues(ctx old_ctx.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) { + if err := i.checkIngesterInitFinished(); err != nil { + return nil, err + } + userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, err @@ -336,6 +357,10 @@ func (i *Ingester) v2LabelValues(ctx old_ctx.Context, req *client.LabelValuesReq } func (i *Ingester) v2LabelNames(ctx old_ctx.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) { + if err := i.checkIngesterInitFinished(); err != nil { + return nil, err + } + userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, err @@ -365,6 +390,10 @@ func (i *Ingester) v2LabelNames(ctx old_ctx.Context, req *client.LabelNamesReque } func (i *Ingester) v2MetricsForLabelMatchers(ctx old_ctx.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) { + if err := i.checkIngesterInitFinished(); err != nil { + return nil, err + } + userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, err @@ -432,6 +461,10 @@ func (i *Ingester) v2MetricsForLabelMatchers(ctx old_ctx.Context, req *client.Me } func (i *Ingester) v2UserStats(ctx old_ctx.Context, req *client.UserStatsRequest) (*client.UserStatsResponse, error) { + if err := i.checkIngesterInitFinished(); err != nil { + return nil, err + } + userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, err @@ -446,6 +479,10 @@ func (i *Ingester) v2UserStats(ctx old_ctx.Context, req *client.UserStatsRequest } func (i *Ingester) v2AllUserStats(ctx old_ctx.Context, req *client.UserStatsRequest) (*client.UsersStatsResponse, error) { + if err := i.checkIngesterInitFinished(); err != nil { + return nil, err + } + i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 915c3390b41..d8026fd3e57 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -865,7 +865,12 @@ func newIngesterMockWithTSDBStorage(ingesterCfg Config, registerer prometheus.Re ingesterCfg.TSDBConfig.Backend = "s3" ingesterCfg.TSDBConfig.S3.Endpoint = "localhost" - ingester, err := NewV2(ingesterCfg, clientCfg, overrides, registerer) + ingester, errfut := NewV2(ingesterCfg, clientCfg, overrides, registerer) + _, err = errfut.WaitAndGet(context.Background()) + if err != nil { + return nil, nil, err + } + if err != nil { return nil, nil, err } @@ -945,7 +950,9 @@ func TestIngester_v2LoadTSDBOnStartup(t *testing.T) { // setup the tsdbs dir testData.setup(t, tempDir) - ingester, err := NewV2(ingesterCfg, clientCfg, overrides, nil) + ingester, errfut := NewV2(ingesterCfg, clientCfg, overrides, nil) + initDone, err := errfut.WaitAndGet(context.Background()) + require.True(t, initDone) require.NoError(t, err) defer ingester.Shutdown() diff --git a/pkg/ingester/lifecycle_test.go b/pkg/ingester/lifecycle_test.go index 4e00fa5894e..b6d16394887 100644 --- a/pkg/ingester/lifecycle_test.go +++ b/pkg/ingester/lifecycle_test.go @@ -103,7 +103,9 @@ func TestIngesterTransfer(t *testing.T) { cfg1.LifecyclerConfig.Addr = "ingester1" cfg1.LifecyclerConfig.JoinAfter = 0 * time.Second cfg1.MaxTransferRetries = 10 - ing1, err := New(cfg1, defaultClientTestConfig(), limits, nil, nil) + ing1, errfut := New(cfg1, defaultClientTestConfig(), limits, nil, nil) + initDone, err := errfut.WaitAndGet(context.Background()) + require.True(t, initDone) require.NoError(t, err) test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { @@ -122,7 +124,9 @@ func TestIngesterTransfer(t *testing.T) { cfg2.LifecyclerConfig.ID = "ingester2" cfg2.LifecyclerConfig.Addr = "ingester2" cfg2.LifecyclerConfig.JoinAfter = 100 * time.Second - ing2, err := New(cfg2, defaultClientTestConfig(), limits, nil, nil) + ing2, errfut := New(cfg2, defaultClientTestConfig(), limits, nil, nil) + initDone, err = errfut.WaitAndGet(context.Background()) + require.True(t, initDone) require.NoError(t, err) // Let ing2 send chunks to ing1 @@ -167,7 +171,9 @@ func TestIngesterBadTransfer(t *testing.T) { cfg.LifecyclerConfig.ID = "ingester1" cfg.LifecyclerConfig.Addr = "ingester1" cfg.LifecyclerConfig.JoinAfter = 100 * time.Second - ing, err := New(cfg, defaultClientTestConfig(), limits, nil, nil) + ing, errfut := New(cfg, defaultClientTestConfig(), limits, nil, nil) + initDone, err := errfut.WaitAndGet(context.Background()) + require.True(t, initDone) require.NoError(t, err) test.Poll(t, 100*time.Millisecond, ring.PENDING, func() interface{} { @@ -438,7 +444,9 @@ func TestV2IngesterTransfer(t *testing.T) { cfg1.LifecyclerConfig.Addr = "ingester1" cfg1.LifecyclerConfig.JoinAfter = 0 * time.Second cfg1.MaxTransferRetries = 10 - ing1, err := New(cfg1, defaultClientTestConfig(), limits, nil, nil) + ing1, errfut1 := New(cfg1, defaultClientTestConfig(), limits, nil, nil) + initDone, err := errfut1.WaitAndGet(context.Background()) + require.True(t, initDone) require.NoError(t, err) test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { @@ -465,7 +473,9 @@ func TestV2IngesterTransfer(t *testing.T) { cfg2.LifecyclerConfig.ID = "ingester2" cfg2.LifecyclerConfig.Addr = "ingester2" cfg2.LifecyclerConfig.JoinAfter = 100 * time.Second - ing2, err := New(cfg2, defaultClientTestConfig(), limits, nil, nil) + ing2, errfut2 := New(cfg2, defaultClientTestConfig(), limits, nil, nil) + initDone, err = errfut2.WaitAndGet(context.Background()) + require.True(t, initDone) require.NoError(t, err) // Let ing1 send blocks/wal to ing2