Skip to content

Flusher target to flush WAL #2075

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## master / unreleased

* [FEATURE] Flusher target to flush the WAL.
* `-flusher.wal-dir` for the WAL directory to recover from.
* `-flusher.concurrent-flushes` for number of concurrent flushes.
* `-flusher.flush-op-timeout` is duration after which a flush should timeout.

## 0.7.0-rc.0 / 2020-03-09

Expand Down
11 changes: 11 additions & 0 deletions docs/configuration/arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,17 @@ It also talks to a KVStore and has it's own copies of the same flags used by the
- `-ingester.recover-from-wal`
Set this to `true` to recover data from an existing WAL. The data is recovered even if WAL is disabled and this is set to `true`. The WAL dir needs to be set for this.

#### Flusher

- `-flusher.wal-dir`
Directory where the WAL data should be recovered from.

- `-flusher.concurrent-flushes`
Number of concurrent flushes.

- `-flusher.flush-op-timeout`
Duration after which a flush should timeout.

## Runtime Configuration file

Cortex has a concept of "runtime config" file, which is simply a file that is reloaded while Cortex is running. It is used by some Cortex components to allow operator to change some aspects of Cortex configuration without restarting it. File is specified by using `-runtime-config.file=<filename>` flag and reload period (which defaults to 10 seconds) can be changed by `-runtime-config.reload-period=<duration>` flag. Previously this mechanism was only used by limits overrides, and flags were called `-limits.per-user-override-config=<filename>` and `-limits.per-user-override-period=10s` respectively. These are still used, if `-runtime-config.file=<filename>` is not specified.
Expand Down
13 changes: 13 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,19 @@ Where default_value is the value to use if the environment variable is undefined
# The ingester_config configures the Cortex ingester.
[ingester: <ingester_config>]

flusher:
# Directory to read WAL from.
# CLI flag: -flusher.wal-dir
[wal_dir: <string> | default = "wal"]

# Number of concurrent goroutines flushing to dynamodb.
# CLI flag: -flusher.concurrent-flushes
[concurrent_flushes: <int> | default = 50]

# Timeout for individual flush operations.
# CLI flag: -flusher.flush-op-timeout
[flush_op_timeout: <duration> | default = 2m0s]

# The storage_config configures where Cortex stores the data (chunks storage
# engine).
[storage: <storage_config>]
Expand Down
19 changes: 16 additions & 3 deletions pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/cortexproject/cortex/pkg/configs/api"
"github.com/cortexproject/cortex/pkg/configs/db"
"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/flusher"
"github.com/cortexproject/cortex/pkg/ingester"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/querier"
Expand Down Expand Up @@ -74,6 +75,7 @@ type Config struct {
Querier querier.Config `yaml:"querier,omitempty"`
IngesterClient client.Config `yaml:"ingester_client,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
Flusher flusher.Config `yaml:"flusher,omitempty"`
Storage storage.Config `yaml:"storage,omitempty"`
ChunkStore chunk.StoreConfig `yaml:"chunk_store,omitempty"`
Schema chunk.SchemaConfig `yaml:"schema,omitempty" doc:"hidden"` // Doc generation tool doesn't support it because part of the SchemaConfig doesn't support CLI flags (needs manual documentation)
Expand Down Expand Up @@ -110,6 +112,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Querier.RegisterFlags(f)
c.IngesterClient.RegisterFlags(f)
c.Ingester.RegisterFlags(f)
c.Flusher.RegisterFlags(f)
c.Storage.RegisterFlags(f)
c.ChunkStore.RegisterFlags(f)
c.Schema.RegisterFlags(f)
Expand Down Expand Up @@ -177,6 +180,7 @@ type Cortex struct {
overrides *validation.Overrides
distributor *distributor.Distributor
ingester *ingester.Ingester
flusher *flusher.Flusher
store chunk.Store
worker frontend.Worker
frontend *frontend.Frontend
Expand Down Expand Up @@ -318,7 +322,11 @@ func (t *Cortex) Run() error {
// let's find out which module failed
for m, s := range t.serviceMap {
if s == service {
level.Error(util.Logger).Log("msg", "module failed", "module", m, "error", service.FailureCase())
if service.FailureCase() == util.ErrStopCortex {
level.Info(util.Logger).Log("msg", "received stop signal via return error", "module", m, "error", service.FailureCase())
} else {
level.Error(util.Logger).Log("msg", "module failed", "module", m, "error", service.FailureCase())
}
return
}
}
Expand Down Expand Up @@ -354,8 +362,13 @@ func (t *Cortex) Run() error {
// if any service failed, report that as an error to caller
if err == nil {
if failed := sm.ServicesByState()[services.Failed]; len(failed) > 0 {
// Details were reported via failure listener before
err = errors.New("failed services")
for _, f := range failed {
if f.FailureCase() != util.ErrStopCortex {
// Details were reported via failure listener before
err = errors.New("failed services")
break
}
}
}
}
return err
Expand Down
5 changes: 2 additions & 3 deletions pkg/cortex/module_service_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,8 @@ func (w *moduleServiceWrapper) stop(_ error) error {

level.Debug(util.Logger).Log("msg", "stopping", "module", w.module)

w.service.StopAsync()
err := w.service.AwaitTerminated(context.Background())
if err != nil {
err := services.StopAndAwaitTerminated(context.Background(), w.service)
if err != nil && err != util.ErrStopCortex {
level.Warn(util.Logger).Log("msg", "error stopping module", "module", w.module, "err", err)
} else {
level.Info(util.Logger).Log("msg", "module stopped", "module", w.module)
Expand Down
22 changes: 22 additions & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cortexproject/cortex/pkg/configs/api"
"github.com/cortexproject/cortex/pkg/configs/db"
"github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/flusher"
"github.com/cortexproject/cortex/pkg/ingester"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/querier"
Expand All @@ -53,6 +54,7 @@ const (
Server moduleName = "server"
Distributor moduleName = "distributor"
Ingester moduleName = "ingester"
Flusher moduleName = "flusher"
Querier moduleName = "querier"
StoreQueryable moduleName = "store-queryable"
QueryFrontend moduleName = "query-frontend"
Expand Down Expand Up @@ -272,6 +274,21 @@ func (t *Cortex) initIngester(cfg *Config) (serv services.Service, err error) {
return t.ingester, nil
}

func (t *Cortex) initFlusher(cfg *Config) (serv services.Service, err error) {
t.flusher, err = flusher.New(
cfg.Flusher,
cfg.Ingester,
cfg.IngesterClient,
t.store,
prometheus.DefaultRegisterer,
)
if err != nil {
return
}

return t.flusher, nil
}

func (t *Cortex) initStore(cfg *Config) (serv services.Service, err error) {
if cfg.Storage.Engine == storage.StorageEngineTSDB {
return nil, nil
Expand Down Expand Up @@ -549,6 +566,11 @@ var modules = map[moduleName]module{
wrappedService: (*Cortex).initIngester,
},

Flusher: {
deps: []moduleName{Store, Server},
wrappedService: (*Cortex).initFlusher,
},

Querier: {
deps: []moduleName{Distributor, Store, Ring, Server, StoreQueryable},
wrappedService: (*Cortex).initQuerier,
Expand Down
93 changes: 93 additions & 0 deletions pkg/flusher/flusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package flusher

import (
"context"
"flag"
"time"

"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"

"github.com/cortexproject/cortex/pkg/ingester"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
)

// Config for an Ingester.
type Config struct {
WALDir string `yaml:"wal_dir,omitempty"`
ConcurrentFlushes int `yaml:"concurrent_flushes,omitempty"`
FlushOpTimeout time.Duration `yaml:"flush_op_timeout,omitempty"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.WALDir, "flusher.wal-dir", "wal", "Directory to read WAL from.")
f.IntVar(&cfg.ConcurrentFlushes, "flusher.concurrent-flushes", 50, "Number of concurrent goroutines flushing to dynamodb.")
f.DurationVar(&cfg.FlushOpTimeout, "flusher.flush-op-timeout", 2*time.Minute, "Timeout for individual flush operations.")
}

// Flusher is designed to be used as a job to flush the chunks from the WAL on disk.
type Flusher struct {
services.Service

cfg Config
ingesterConfig ingester.Config
clientConfig client.Config
chunkStore ingester.ChunkStore
registerer prometheus.Registerer
}

const (
postFlushSleepTime = 1 * time.Minute
)

// New constructs a new Flusher and flushes the data from the WAL.
// The returned Flusher has no other operations.
func New(
cfg Config,
ingesterConfig ingester.Config,
clientConfig client.Config,
chunkStore ingester.ChunkStore,
registerer prometheus.Registerer,
) (*Flusher, error) {

ingesterConfig.WALConfig.Dir = cfg.WALDir
ingesterConfig.ConcurrentFlushes = cfg.ConcurrentFlushes
ingesterConfig.FlushOpTimeout = cfg.FlushOpTimeout

f := &Flusher{
cfg: cfg,
ingesterConfig: ingesterConfig,
clientConfig: clientConfig,
chunkStore: chunkStore,
registerer: registerer,
}
f.Service = services.NewBasicService(nil, f.running, nil)
return f, nil
}

func (f *Flusher) running(ctx context.Context) error {
ing, err := ingester.NewForFlusher(f.ingesterConfig, f.clientConfig, f.chunkStore, f.registerer)
if err != nil {
return errors.Wrap(err, "create ingester")
}

if err := services.StartAndAwaitRunning(ctx, ing); err != nil {
return errors.Wrap(err, "start and await running ingester")
}

ing.Flush()

// Sleeping to give a chance to Prometheus
// to collect the metrics.
level.Info(util.Logger).Log("msg", "sleeping to give chance for collection of metrics", "duration", postFlushSleepTime.String())
time.Sleep(postFlushSleepTime)

if err := services.StopAndAwaitTerminated(ctx, ing); err != nil {
return errors.Wrap(err, "stop and await terminated ingester")
}
return util.ErrStopCortex
}
54 changes: 52 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,57 @@ func (i *Ingester) starting(ctx context.Context) error {
return errors.Wrap(err, "failed to start lifecycler")
}

i.startFlushLoops()

return nil
}

func (i *Ingester) startFlushLoops() {
i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)
for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueueLength)
go i.flushLoop(j)
}
}

// NewForFlusher constructs a new Ingester to be used by flusher target.
// Compared to the 'New' method:
// * Always replays the WAL.
// * Does not start the lifecycler.
// * No ingester v2.
func NewForFlusher(cfg Config, clientConfig client.Config, chunkStore ChunkStore, registerer prometheus.Registerer) (*Ingester, error) {
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.MakeIngesterClient
}

i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,
metrics: newIngesterMetrics(registerer, true),
chunkStore: chunkStore,
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
wal: &noopWAL{},
}

i.Service = services.NewBasicService(i.startingForFlusher, i.loop, i.stopping)
return i, nil
}

func (i *Ingester) startingForFlusher(ctx context.Context) error {
level.Info(util.Logger).Log("msg", "recovering from WAL")

// We recover from WAL always.
start := time.Now()
if err := recoverFromWAL(i); err != nil {
level.Error(util.Logger).Log("msg", "failed to recover from WAL", "time", time.Since(start).String())
return err
}
elapsed := time.Since(start)

level.Info(util.Logger).Log("msg", "recovery from WAL completed", "time", elapsed.String())
i.metrics.walReplayDuration.Set(elapsed.Seconds())

i.startFlushLoops()
return nil
}

Expand Down Expand Up @@ -245,8 +290,13 @@ func (i *Ingester) stopping(_ error) error {
// This will prevent us accepting any more samples
i.stopIncomingRequests()

// Next initiate our graceful exit from the ring.
return services.StopAndAwaitTerminated(context.Background(), i.lifecycler)
// Lifecycler can be nil if the ingester is for a flusher.
if i.lifecycler != nil {
// Next initiate our graceful exit from the ring.
return services.StopAndAwaitTerminated(context.Background(), i.lifecycler)
}

return nil
}

// ShutdownHandler triggers the following set of operations in order:
Expand Down
5 changes: 3 additions & 2 deletions pkg/ingester/series_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ type seriesMap struct {
type shard struct {
mtx sync.Mutex
m map[model.Fingerprint]*memorySeries
//nolint:structcheck,unused // Align this struct.
pad [cacheLineSize - unsafe.Sizeof(sync.Mutex{}) - unsafe.Sizeof(map[model.Fingerprint]*memorySeries{})]byte

// Align this struct.
_ [cacheLineSize - unsafe.Sizeof(sync.Mutex{}) - unsafe.Sizeof(map[model.Fingerprint]*memorySeries{})]byte
}

// fingerprintSeriesPair pairs a fingerprint with a memorySeries pointer.
Expand Down
6 changes: 6 additions & 0 deletions pkg/util/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package util

import "errors"

// ErrStopCortex is the error returned by a service as a hint to stop the Cortex server entirely.
var ErrStopCortex = errors.New("stop cortex")