Skip to content

Modules can now start additional startup tasks, while server is starting #2119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ require (
github.com/weaveworks/common v0.0.0-20200201141823-27e183090ab1
go.etcd.io/bbolt v1.3.3
go.etcd.io/etcd v0.0.0-20190709142735-eb7dd97135a5
go.uber.org/atomic v1.5.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of this additional dependency, can we use https://golang.org/pkg/sync/atomic/ for atomic loads and stores?

Copy link
Contributor Author

@pstibrany pstibrany Feb 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have this dependency, although under old name (github.com/uber-go/atomic). I've removed old version in PR #2127.

golang.org/x/net v0.0.0-20191112182307-2180aed22343
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
google.golang.org/api v0.14.0
google.golang.org/grpc v1.25.1
Expand Down
44 changes: 43 additions & 1 deletion pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cortex

import (
"context"
"flag"
"fmt"
"os"
Expand All @@ -10,6 +11,7 @@ import (
"github.com/pkg/errors"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"gopkg.in/yaml.v2"

Expand Down Expand Up @@ -251,6 +253,25 @@ func (t *Cortex) init(cfg *Config, m moduleName) error {
return t.initModule(cfg, m)
}

// calls start functions of modules. Functions are called concurrently.
func (t *Cortex) start(target moduleName) error {
deps := orderedDeps(target)
deps = append(deps, target) // target is not part of dependencies, make sure we call its start function too

grp, ctx := errgroup.WithContext(context.Background())

for _, m := range deps {
start := modules[m].start
if start != nil {
grp.Go(func() error {
return start(t, ctx)
})
}
}

return grp.Wait()
}

func (t *Cortex) initModule(cfg *Config, m moduleName) error {
level.Info(util.Logger).Log("msg", "initialising", "module", m)
if modules[m].init != nil {
Expand All @@ -263,7 +284,28 @@ func (t *Cortex) initModule(cfg *Config, m moduleName) error {

// Run starts Cortex running, and blocks until a signal is received.
func (t *Cortex) Run() error {
return t.server.Run()
// all modules are initialized, let's call start functions as well.
// We do this at the same time as starting the server.
// The reason is that we want to perform additional startup tasks that can take a long time,
// but we also want to be able to collect the metrics in the meantime.
errs := make(chan error, 1)
go func() {
err := t.start(t.target)
if err != nil {
errs <- err
}
}()

go func() {
errs <- t.server.Run()
}()

err := <-errs

// if we have received error from calling start methods, stop the server.
// If it was server that signalled error, this is no-op.
t.server.Stop()
return err
}

// Stop gracefully stops a Cortex.
Expand Down
39 changes: 31 additions & 8 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cortex

import (
"context"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -306,6 +307,14 @@ func (t *Cortex) initQuerierChunkStore(cfg *Config) error {
return fmt.Errorf("unknown storage engine '%s'", cfg.Storage.Engine)
}

func (t *Cortex) startQuerierChunkStore(ctx context.Context) error {
if bq, ok := t.querierChunkStore.(*querier.BlockQuerier); ok {
// Blocks querier needs to be "started" before it can be used
return bq.Start(ctx)
}
return nil
}

func (t *Cortex) stopQuerierChunkStore() error {
return nil
}
Expand All @@ -331,6 +340,10 @@ func (t *Cortex) initIngester(cfg *Config) (err error) {
return
}

func (t *Cortex) startIngester(ctx context.Context) (err error) {
return t.ingester.Start(ctx)
}

func (t *Cortex) stopIngester() error {
t.ingester.Shutdown()
return nil
Expand Down Expand Up @@ -538,8 +551,16 @@ func (t *Cortex) stopMemberlistKV() (err error) {

type module struct {
deps []moduleName
init func(t *Cortex, cfg *Config) error
stop func(t *Cortex) error

// Lifecycle:
//
// 1. init is called first for all modules sequentially.
// 2. after all modules are initialized, start functions are called concurrently.
// If any start function returns error, server is stopped. When start is called, HTTP requests are already handled.
// 3. stop is called on Cortex shutdown.
init func(t *Cortex, cfg *Config) error
start func(t *Cortex, ctx context.Context) error
stop func(t *Cortex) error
}

var modules = map[moduleName]module{
Expand Down Expand Up @@ -581,9 +602,10 @@ var modules = map[moduleName]module{
},

Ingester: {
deps: []moduleName{Overrides, Store, Server, RuntimeConfig, MemberlistKV},
init: (*Cortex).initIngester,
stop: (*Cortex).stopIngester,
deps: []moduleName{Overrides, Store, Server, RuntimeConfig, MemberlistKV},
init: (*Cortex).initIngester,
start: (*Cortex).startIngester,
stop: (*Cortex).stopIngester,
},

Querier: {
Expand All @@ -593,9 +615,10 @@ var modules = map[moduleName]module{
},

QuerierChunkStore: {
deps: []moduleName{Store},
init: (*Cortex).initQuerierChunkStore,
stop: (*Cortex).stopQuerierChunkStore,
deps: []moduleName{Store},
init: (*Cortex).initQuerierChunkStore,
start: (*Cortex).startQuerierChunkStore,
stop: (*Cortex).stopQuerierChunkStore,
},

QueryFrontend: {
Expand Down
68 changes: 67 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,23 @@ package ingester

import (
"context"
"errors"
"flag"
"fmt"
"net/http"
"sync"
"time"

"github.com/go-kit/kit/log/level"
"github.com/gogo/status"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"go.uber.org/atomic"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"

cortex_chunk "github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ingester/client"
Expand All @@ -35,6 +37,8 @@ const (
var (
// This is initialised if the WAL is enabled and the records are fetched from this pool.
recordPool sync.Pool

errIngesterNotStarted = errors.New("ingester not started")
)

// Config for an Ingester.
Expand Down Expand Up @@ -121,6 +125,9 @@ type Ingester struct {

// Prometheus block storage
TSDBState TSDBState

// Has start been called and finished successfully?
startDone *atomic.Bool
}

// ChunkStore is the interface we need to store chunks
Expand Down Expand Up @@ -159,6 +166,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
chunkStore: chunkStore,
quit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes, cfg.ConcurrentFlushes),
startDone: atomic.NewBool(false),
}

var err error
Expand Down Expand Up @@ -208,6 +216,23 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c
return i, nil
}

func (i *Ingester) Start(ctx context.Context) error {
if i.cfg.TSDBEnabled {
err := i.startV2(ctx)
if err == nil {
i.startDone.Store(true)
}
return err
}

i.startDone.Store(true)
return nil
}

func (i *Ingester) ingesterStarted() bool {
return i.startDone.Load()
}

func (i *Ingester) loop() {
defer i.done.Done()

Expand Down Expand Up @@ -270,6 +295,10 @@ func (i *Ingester) StopIncomingRequests() {

// Push implements client.IngesterServer
func (i *Ingester) Push(ctx context.Context, req *client.WriteRequest) (*client.WriteResponse, error) {
if !i.ingesterStarted() {
return nil, status.Error(codes.Unavailable, errIngesterNotStarted.Error())
}

if i.cfg.TSDBEnabled {
return i.v2Push(ctx, req)
}
Expand Down Expand Up @@ -406,6 +435,10 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs,

// Query implements service.IngesterServer
func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client.QueryResponse, error) {
if !i.ingesterStarted() {
return nil, status.Error(codes.Unavailable, errIngesterNotStarted.Error())
}

if i.cfg.TSDBEnabled {
return i.v2Query(ctx, req)
}
Expand Down Expand Up @@ -469,6 +502,10 @@ func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client

// QueryStream implements service.IngesterServer
func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_QueryStreamServer) error {
if !i.ingesterStarted() {
return status.Error(codes.Unavailable, errIngesterNotStarted.Error())
}

if i.cfg.TSDBEnabled {
return fmt.Errorf("Unimplemented for V2")
}
Expand Down Expand Up @@ -545,6 +582,10 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_

// LabelValues returns all label values that are associated with a given label name.
func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesRequest) (*client.LabelValuesResponse, error) {
if !i.ingesterStarted() {
return nil, status.Error(codes.Unavailable, errIngesterNotStarted.Error())
}

if i.cfg.TSDBEnabled {
return i.v2LabelValues(ctx, req)
}
Expand All @@ -566,6 +607,10 @@ func (i *Ingester) LabelValues(ctx context.Context, req *client.LabelValuesReque

// LabelNames return all the label names.
func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest) (*client.LabelNamesResponse, error) {
if !i.ingesterStarted() {
return nil, status.Error(codes.Unavailable, errIngesterNotStarted.Error())
}

if i.cfg.TSDBEnabled {
return i.v2LabelNames(ctx, req)
}
Expand All @@ -587,6 +632,10 @@ func (i *Ingester) LabelNames(ctx context.Context, req *client.LabelNamesRequest

// MetricsForLabelMatchers returns all the metrics which match a set of matchers.
func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.MetricsForLabelMatchersRequest) (*client.MetricsForLabelMatchersResponse, error) {
if !i.ingesterStarted() {
return nil, status.Error(codes.Unavailable, errIngesterNotStarted.Error())
}

if i.cfg.TSDBEnabled {
return i.v2MetricsForLabelMatchers(ctx, req)
}
Expand Down Expand Up @@ -630,6 +679,10 @@ func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.Metr

// UserStats returns ingestion statistics for the current user.
func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UserStatsResponse, error) {
if !i.ingesterStarted() {
return nil, status.Error(codes.Unavailable, errIngesterNotStarted.Error())
}

if i.cfg.TSDBEnabled {
return i.v2UserStats(ctx, req)
}
Expand All @@ -655,6 +708,10 @@ func (i *Ingester) UserStats(ctx context.Context, req *client.UserStatsRequest)

// AllUserStats returns ingestion statistics for all users known to this ingester.
func (i *Ingester) AllUserStats(ctx context.Context, req *client.UserStatsRequest) (*client.UsersStatsResponse, error) {
if !i.ingesterStarted() {
return nil, status.Error(codes.Unavailable, errIngesterNotStarted.Error())
}

if i.cfg.TSDBEnabled {
return i.v2AllUserStats(ctx, req)
}
Expand Down Expand Up @@ -684,6 +741,10 @@ func (i *Ingester) AllUserStats(ctx context.Context, req *client.UserStatsReques

// Check implements the grpc healthcheck
func (i *Ingester) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
if !i.ingesterStarted() {
return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING}, nil
}

return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil
}

Expand All @@ -696,6 +757,11 @@ func (i *Ingester) Watch(in *grpc_health_v1.HealthCheckRequest, stream grpc_heal
// the addition removal of another ingester. Returns 204 when the ingester is
// ready, 500 otherwise.
func (i *Ingester) ReadinessHandler(w http.ResponseWriter, r *http.Request) {
if !i.ingesterStarted() {
http.Error(w, "Not ready: "+errIngesterNotStarted.Error(), http.StatusServiceUnavailable)
return
}

if err := i.lifecycler.CheckReady(r.Context()); err == nil {
w.WriteHeader(http.StatusNoContent)
} else {
Expand Down
3 changes: 3 additions & 0 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func newTestStore(t require.TestingT, cfg Config, clientConfig client.Config, li
ing, err := New(cfg, clientConfig, overrides, store, nil)
require.NoError(t, err)

err = ing.Start(context.Background())
require.NoError(t, err)

return store, ing
}

Expand Down
Loading