Skip to content

Allow ingester initialization to continue in background. #2104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,15 @@ func (t *Cortex) initIngester(cfg *Config) (err error) {
cfg.Ingester.TSDBConfig = cfg.TSDB
cfg.Ingester.ShardByAllLabels = cfg.Distributor.ShardByAllLabels

t.ingester, err = ingester.New(cfg.Ingester, cfg.IngesterClient, t.overrides, t.store, prometheus.DefaultRegisterer)
if err != nil {
return
var errfut *ingester.ErrorFuture
t.ingester, errfut = ingester.New(cfg.Ingester, cfg.IngesterClient, t.overrides, t.store, prometheus.DefaultRegisterer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A difference I see is that currently Cortex exits if the initialization fails. After this PR it doesn't and if the ingester initialization fails it will continue to run but not work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a valid point, and I can address that.

if done, err := errfut.Get(); done && err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Futures feel needlessly complicated here, and an anti-pattern with Go. Concurrency should be left to the caller, so I believe the module init function should call ingester.New in a goroutine that waits for an error response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that is a valid point. Back to drawing board.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, by contract the ingester should be populated in the Cortex struct once the initIngester() terminates.

return err
}

// if ingester initialization isn't done yet, we go ahead. Services that use ingester must check its
// readiness status and eventually wait for it to finish initialization.

client.RegisterIngesterServer(t.server.GRPC, t.ingester)
grpc_health_v1.RegisterHealthServer(t.server.GRPC, t.ingester)
t.server.HTTP.Path("/ready").Handler(http.HandlerFunc(t.ingester.ReadinessHandler))
Expand Down
61 changes: 61 additions & 0 deletions pkg/ingester/error_future.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package ingester

import (
"context"
"sync"
)

// NewErrorFuture creates unfinished future. Must be finished by calling Finish method eventually.
func NewErrorFuture() *ErrorFuture {
return &ErrorFuture{
ch: make(chan struct{}),
}
}

// NewFinishedErrorFuture creates finished future with given (possibly nil) error.
func NewFinishedErrorFuture(err error) *ErrorFuture {
ef := NewErrorFuture()
ef.Finish(err)
return ef
}

// This is a Future object, for holding error.
type ErrorFuture struct {
mu sync.Mutex
done bool
err error
ch chan struct{} // used by waiters
}

// Returns true if this future is done, and associated error. If future is not done yet, returns false.
func (f *ErrorFuture) Get() (bool, error) {
f.mu.Lock()
defer f.mu.Unlock()
return f.done, f.err
}

// Waits for future to finish, and returns true and associated error set via Finish method.
// If context is finished first, returns false and error from context instead.
func (f *ErrorFuture) WaitAndGet(ctx context.Context) (bool, error) {
d, err := f.Get()
if d {
return true, err
}

select {
case <-f.ch:
return f.Get() // must return true now
case <-ctx.Done():
return false, ctx.Err()
}
}

// Mark this future as finished. Can only be called once.
func (f *ErrorFuture) Finish(err error) {
f.mu.Lock()
defer f.mu.Unlock()

f.done = true
f.err = err
close(f.ch) // will panic, if called twice, which is fine.
}
90 changes: 90 additions & 0 deletions pkg/ingester/error_future_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package ingester

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"golang.org/x/tools/go/ssa/interp/testdata/src/errors"
)

func TestBasicErrorFuture(t *testing.T) {
ef := NewErrorFuture()

done, err := ef.Get()
require.False(t, done)
require.NoError(t, err)

ef.Finish(nil)

done, err = ef.Get()
require.True(t, done)
require.NoError(t, err)
}

func TestAsyncErrorFutureWithSuccess(t *testing.T) {
ef := NewErrorFuture()
go func() {
time.Sleep(100 * time.Millisecond)
ef.Finish(nil)
}()

done, err := ef.WaitAndGet(context.Background())
require.True(t, done)
require.NoError(t, err)
}

func TestAsyncErrorFutureWithError(t *testing.T) {
initError := errors.New("test error")
ef := NewErrorFuture()
go func() {
time.Sleep(100 * time.Millisecond)
ef.Finish(initError)
}()

done, err := ef.WaitAndGet(context.Background())
require.True(t, done)
require.Equal(t, initError, err)
}

func TestAsyncErrorFutureWithTimeout(t *testing.T) {
ef := NewErrorFuture()
go func() {
time.Sleep(500 * time.Millisecond)
ef.Finish(nil)
}()

ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()

done, err := ef.WaitAndGet(ctx)
require.False(t, done)
require.Equal(t, context.DeadlineExceeded, err)

// wait for finish
done, err = ef.WaitAndGet(context.Background())
require.True(t, done)
require.Equal(t, nil, err)
}

func TestDoubleFinishPanics(t *testing.T) {
var recovered interface{}

defer func() {
// make sure we have recovered from panic.
if recovered == nil {
t.Fatal("no recovered panic")
}
}()

defer func() {
recovered = recover()
}()

ef := NewErrorFuture()
ef.Finish(nil)
ef.Finish(nil)
// if we get here, there was no panic.
t.Fatal("no panic")
}
43 changes: 38 additions & 5 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/go-kit/kit/log/level"
"github.com/gogo/status"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
Expand Down Expand Up @@ -124,6 +125,10 @@ type Ingester struct {

// Prometheus block storage
TSDBState TSDBState

// Initialization status. Can be checked to see if ingester is finished with its initialization,
// and if so, whether initialization failed or not.
initDone *ErrorFuture
}

// ChunkStore is the interface we need to store chunks
Expand All @@ -132,7 +137,7 @@ type ChunkStore interface {
}

// New constructs a new Ingester.
func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, chunkStore ChunkStore, registerer prometheus.Registerer) (*Ingester, error) {
func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, chunkStore ChunkStore, registerer prometheus.Registerer) (*Ingester, *ErrorFuture) {
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.MakeIngesterClient
}
Expand Down Expand Up @@ -162,6 +167,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
chunkStore: chunkStore,
quit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes, cfg.ConcurrentFlushes),
initDone: NewFinishedErrorFuture(nil), // initialization is finished once New is done.
}

var err error
Expand All @@ -170,7 +176,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
// The '!cfg.WALConfig.WALEnabled' argument says don't flush on shutdown if the WAL is enabled.
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.WALEnabled)
if err != nil {
return nil, err
return nil, NewFinishedErrorFuture(err)
}
i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels)

Expand All @@ -179,7 +185,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
start := time.Now()
if err := recoverFromWAL(i); err != nil {
level.Error(util.Logger).Log("msg", "failed to recover from WAL", "time", time.Since(start).String())
return nil, err
return nil, NewFinishedErrorFuture(err)
}
elapsed := time.Since(start)
level.Info(util.Logger).Log("msg", "recovery from WAL completed", "time", elapsed.String())
Expand All @@ -193,7 +199,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c

i.wal, err = newWAL(cfg.WALConfig, i.userStates.cp)
if err != nil {
return nil, err
return nil, NewFinishedErrorFuture(err)
}

// Now that user states have been created, we can start the lifecycler
Expand All @@ -208,7 +214,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
i.done.Add(1)
go i.loop()

return i, nil
return i, NewFinishedErrorFuture(nil)
}

func (i *Ingester) loop() {
Expand Down Expand Up @@ -256,6 +262,11 @@ func (i *Ingester) Shutdown() {
// * Change the state of ring to stop accepting writes.
// * Flush all the chunks.
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
if err := i.checkIngesterInitFinished(); err != nil {
http.Error(w, "Not ready: "+err.Error(), http.StatusServiceUnavailable)
return
}

originalState := i.lifecycler.FlushOnShutdown()
// We want to flush the chunks if transfer fails irrespective of original flag.
i.lifecycler.SetFlushOnShutdown(true)
Expand Down Expand Up @@ -687,6 +698,10 @@ func (i *Ingester) AllUserStats(ctx old_ctx.Context, req *client.UserStatsReques

// Check implements the grpc healthcheck
func (i *Ingester) Check(ctx old_ctx.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
if err := i.checkIngesterInitFinished(); err != nil {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, nil
}

return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}

Expand All @@ -699,9 +714,27 @@ func (i *Ingester) Watch(in *grpc_health_v1.HealthCheckRequest, stream grpc_heal
// the addition removal of another ingester. Returns 204 when the ingester is
// ready, 500 otherwise.
func (i *Ingester) ReadinessHandler(w http.ResponseWriter, r *http.Request) {
if err := i.checkIngesterInitFinished(); err != nil {
http.Error(w, "Not ready: "+err.Error(), http.StatusServiceUnavailable)
return
}

if err := i.lifecycler.CheckReady(r.Context()); err == nil {
w.WriteHeader(http.StatusNoContent)
} else {
http.Error(w, "Not ready: "+err.Error(), http.StatusServiceUnavailable)
}
}

func (i *Ingester) checkIngesterInitFinished() error {
done, err := i.initDone.Get()
switch {
case !done:
return errors.New("ingester init in progress")
case err != nil:
return errors.New("ingester init failed")
default:
// done and no error, all good.
return nil
}
}
4 changes: 3 additions & 1 deletion pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ func newTestStore(t require.TestingT, cfg Config, clientConfig client.Config, li
overrides, err := validation.NewOverrides(limits, nil)
require.NoError(t, err)

ing, err := New(cfg, clientConfig, overrides, store, nil)
ing, errfut := New(cfg, clientConfig, overrides, store, nil)
initDone, err := errfut.WaitAndGet(context.Background())
require.True(t, initDone)
require.NoError(t, err)

return store, ing
Expand Down
Loading