diff --git a/CHANGELOG.md b/CHANGELOG.md index 8429bef8bd7..6d8dddb747b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## master / unreleased +* [FEATURE] Flusher target to flush the WAL. + * `-flusher.wal-dir` for the WAL directory to recover from. + * `-flusher.concurrent-flushes` for number of concurrent flushes. + * `-flusher.flush-op-timeout` is duration after which a flush should timeout. ## 0.7.0-rc.0 / 2020-03-09 diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index 94acfd86878..2c4b155c7d1 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -352,6 +352,17 @@ It also talks to a KVStore and has it's own copies of the same flags used by the - `-ingester.recover-from-wal` Set this to `true` to recover data from an existing WAL. The data is recovered even if WAL is disabled and this is set to `true`. The WAL dir needs to be set for this. +#### Flusher + +- `-flusher.wal-dir` + Directory where the WAL data should be recovered from. + +- `-flusher.concurrent-flushes` + Number of concurrent flushes. + +- `-flusher.flush-op-timeout` + Duration after which a flush should timeout. + ## Runtime Configuration file Cortex has a concept of "runtime config" file, which is simply a file that is reloaded while Cortex is running. It is used by some Cortex components to allow operator to change some aspects of Cortex configuration without restarting it. File is specified by using `-runtime-config.file=` flag and reload period (which defaults to 10 seconds) can be changed by `-runtime-config.reload-period=` flag. Previously this mechanism was only used by limits overrides, and flags were called `-limits.per-user-override-config=` and `-limits.per-user-override-period=10s` respectively. These are still used, if `-runtime-config.file=` is not specified. diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index f982bc25378..c4f264619ae 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -79,6 +79,19 @@ Where default_value is the value to use if the environment variable is undefined # The ingester_config configures the Cortex ingester. [ingester: ] +flusher: + # Directory to read WAL from. + # CLI flag: -flusher.wal-dir + [wal_dir: | default = "wal"] + + # Number of concurrent goroutines flushing to dynamodb. + # CLI flag: -flusher.concurrent-flushes + [concurrent_flushes: | default = 50] + + # Timeout for individual flush operations. + # CLI flag: -flusher.flush-op-timeout + [flush_op_timeout: | default = 2m0s] + # The storage_config configures where Cortex stores the data (chunks storage # engine). [storage: ] diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index ad16128079e..f43c3457e23 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -30,6 +30,7 @@ import ( "github.com/cortexproject/cortex/pkg/configs/api" "github.com/cortexproject/cortex/pkg/configs/db" "github.com/cortexproject/cortex/pkg/distributor" + "github.com/cortexproject/cortex/pkg/flusher" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/querier" @@ -74,6 +75,7 @@ type Config struct { Querier querier.Config `yaml:"querier,omitempty"` IngesterClient client.Config `yaml:"ingester_client,omitempty"` Ingester ingester.Config `yaml:"ingester,omitempty"` + Flusher flusher.Config `yaml:"flusher,omitempty"` Storage storage.Config `yaml:"storage,omitempty"` ChunkStore chunk.StoreConfig `yaml:"chunk_store,omitempty"` Schema chunk.SchemaConfig `yaml:"schema,omitempty" doc:"hidden"` // Doc generation tool doesn't support it because part of the SchemaConfig doesn't support CLI flags (needs manual documentation) @@ -110,6 +112,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Querier.RegisterFlags(f) c.IngesterClient.RegisterFlags(f) c.Ingester.RegisterFlags(f) + c.Flusher.RegisterFlags(f) c.Storage.RegisterFlags(f) c.ChunkStore.RegisterFlags(f) c.Schema.RegisterFlags(f) @@ -177,6 +180,7 @@ type Cortex struct { overrides *validation.Overrides distributor *distributor.Distributor ingester *ingester.Ingester + flusher *flusher.Flusher store chunk.Store worker frontend.Worker frontend *frontend.Frontend @@ -318,7 +322,11 @@ func (t *Cortex) Run() error { // let's find out which module failed for m, s := range t.serviceMap { if s == service { - level.Error(util.Logger).Log("msg", "module failed", "module", m, "error", service.FailureCase()) + if service.FailureCase() == util.ErrStopCortex { + level.Info(util.Logger).Log("msg", "received stop signal via return error", "module", m, "error", service.FailureCase()) + } else { + level.Error(util.Logger).Log("msg", "module failed", "module", m, "error", service.FailureCase()) + } return } } @@ -354,8 +362,13 @@ func (t *Cortex) Run() error { // if any service failed, report that as an error to caller if err == nil { if failed := sm.ServicesByState()[services.Failed]; len(failed) > 0 { - // Details were reported via failure listener before - err = errors.New("failed services") + for _, f := range failed { + if f.FailureCase() != util.ErrStopCortex { + // Details were reported via failure listener before + err = errors.New("failed services") + break + } + } } } return err diff --git a/pkg/cortex/module_service_wrapper.go b/pkg/cortex/module_service_wrapper.go index 0d8e1d820b6..fca7070c1f1 100644 --- a/pkg/cortex/module_service_wrapper.go +++ b/pkg/cortex/module_service_wrapper.go @@ -85,9 +85,8 @@ func (w *moduleServiceWrapper) stop(_ error) error { level.Debug(util.Logger).Log("msg", "stopping", "module", w.module) - w.service.StopAsync() - err := w.service.AwaitTerminated(context.Background()) - if err != nil { + err := services.StopAndAwaitTerminated(context.Background(), w.service) + if err != nil && err != util.ErrStopCortex { level.Warn(util.Logger).Log("msg", "error stopping module", "module", w.module, "err", err) } else { level.Info(util.Logger).Log("msg", "module stopped", "module", w.module) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 55d87a9cf07..1a44fe5512a 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -27,6 +27,7 @@ import ( "github.com/cortexproject/cortex/pkg/configs/api" "github.com/cortexproject/cortex/pkg/configs/db" "github.com/cortexproject/cortex/pkg/distributor" + "github.com/cortexproject/cortex/pkg/flusher" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/querier" @@ -53,6 +54,7 @@ const ( Server moduleName = "server" Distributor moduleName = "distributor" Ingester moduleName = "ingester" + Flusher moduleName = "flusher" Querier moduleName = "querier" StoreQueryable moduleName = "store-queryable" QueryFrontend moduleName = "query-frontend" @@ -272,6 +274,21 @@ func (t *Cortex) initIngester(cfg *Config) (serv services.Service, err error) { return t.ingester, nil } +func (t *Cortex) initFlusher(cfg *Config) (serv services.Service, err error) { + t.flusher, err = flusher.New( + cfg.Flusher, + cfg.Ingester, + cfg.IngesterClient, + t.store, + prometheus.DefaultRegisterer, + ) + if err != nil { + return + } + + return t.flusher, nil +} + func (t *Cortex) initStore(cfg *Config) (serv services.Service, err error) { if cfg.Storage.Engine == storage.StorageEngineTSDB { return nil, nil @@ -549,6 +566,11 @@ var modules = map[moduleName]module{ wrappedService: (*Cortex).initIngester, }, + Flusher: { + deps: []moduleName{Store, Server}, + wrappedService: (*Cortex).initFlusher, + }, + Querier: { deps: []moduleName{Distributor, Store, Ring, Server, StoreQueryable}, wrappedService: (*Cortex).initQuerier, diff --git a/pkg/flusher/flusher.go b/pkg/flusher/flusher.go new file mode 100644 index 00000000000..c5f7fe71cf0 --- /dev/null +++ b/pkg/flusher/flusher.go @@ -0,0 +1,93 @@ +package flusher + +import ( + "context" + "flag" + "time" + + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + + "github.com/cortexproject/cortex/pkg/ingester" + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/services" +) + +// Config for an Ingester. +type Config struct { + WALDir string `yaml:"wal_dir,omitempty"` + ConcurrentFlushes int `yaml:"concurrent_flushes,omitempty"` + FlushOpTimeout time.Duration `yaml:"flush_op_timeout,omitempty"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&cfg.WALDir, "flusher.wal-dir", "wal", "Directory to read WAL from.") + f.IntVar(&cfg.ConcurrentFlushes, "flusher.concurrent-flushes", 50, "Number of concurrent goroutines flushing to dynamodb.") + f.DurationVar(&cfg.FlushOpTimeout, "flusher.flush-op-timeout", 2*time.Minute, "Timeout for individual flush operations.") +} + +// Flusher is designed to be used as a job to flush the chunks from the WAL on disk. +type Flusher struct { + services.Service + + cfg Config + ingesterConfig ingester.Config + clientConfig client.Config + chunkStore ingester.ChunkStore + registerer prometheus.Registerer +} + +const ( + postFlushSleepTime = 1 * time.Minute +) + +// New constructs a new Flusher and flushes the data from the WAL. +// The returned Flusher has no other operations. +func New( + cfg Config, + ingesterConfig ingester.Config, + clientConfig client.Config, + chunkStore ingester.ChunkStore, + registerer prometheus.Registerer, +) (*Flusher, error) { + + ingesterConfig.WALConfig.Dir = cfg.WALDir + ingesterConfig.ConcurrentFlushes = cfg.ConcurrentFlushes + ingesterConfig.FlushOpTimeout = cfg.FlushOpTimeout + + f := &Flusher{ + cfg: cfg, + ingesterConfig: ingesterConfig, + clientConfig: clientConfig, + chunkStore: chunkStore, + registerer: registerer, + } + f.Service = services.NewBasicService(nil, f.running, nil) + return f, nil +} + +func (f *Flusher) running(ctx context.Context) error { + ing, err := ingester.NewForFlusher(f.ingesterConfig, f.clientConfig, f.chunkStore, f.registerer) + if err != nil { + return errors.Wrap(err, "create ingester") + } + + if err := services.StartAndAwaitRunning(ctx, ing); err != nil { + return errors.Wrap(err, "start and await running ingester") + } + + ing.Flush() + + // Sleeping to give a chance to Prometheus + // to collect the metrics. + level.Info(util.Logger).Log("msg", "sleeping to give chance for collection of metrics", "duration", postFlushSleepTime.String()) + time.Sleep(postFlushSleepTime) + + if err := services.StopAndAwaitTerminated(ctx, ing); err != nil { + return errors.Wrap(err, "stop and await terminated ingester") + } + return util.ErrStopCortex +} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index fd4408ecb9e..8b9ff09b29d 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -208,12 +208,57 @@ func (i *Ingester) starting(ctx context.Context) error { return errors.Wrap(err, "failed to start lifecycler") } + i.startFlushLoops() + + return nil +} + +func (i *Ingester) startFlushLoops() { i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes) for j := 0; j < i.cfg.ConcurrentFlushes; j++ { i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueueLength) go i.flushLoop(j) } +} + +// NewForFlusher constructs a new Ingester to be used by flusher target. +// Compared to the 'New' method: +// * Always replays the WAL. +// * Does not start the lifecycler. +// * No ingester v2. +func NewForFlusher(cfg Config, clientConfig client.Config, chunkStore ChunkStore, registerer prometheus.Registerer) (*Ingester, error) { + if cfg.ingesterClientFactory == nil { + cfg.ingesterClientFactory = client.MakeIngesterClient + } + + i := &Ingester{ + cfg: cfg, + clientConfig: clientConfig, + metrics: newIngesterMetrics(registerer, true), + chunkStore: chunkStore, + flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), + wal: &noopWAL{}, + } + + i.Service = services.NewBasicService(i.startingForFlusher, i.loop, i.stopping) + return i, nil +} + +func (i *Ingester) startingForFlusher(ctx context.Context) error { + level.Info(util.Logger).Log("msg", "recovering from WAL") + + // We recover from WAL always. + start := time.Now() + if err := recoverFromWAL(i); err != nil { + level.Error(util.Logger).Log("msg", "failed to recover from WAL", "time", time.Since(start).String()) + return err + } + elapsed := time.Since(start) + + level.Info(util.Logger).Log("msg", "recovery from WAL completed", "time", elapsed.String()) + i.metrics.walReplayDuration.Set(elapsed.Seconds()) + i.startFlushLoops() return nil } @@ -245,8 +290,13 @@ func (i *Ingester) stopping(_ error) error { // This will prevent us accepting any more samples i.stopIncomingRequests() - // Next initiate our graceful exit from the ring. - return services.StopAndAwaitTerminated(context.Background(), i.lifecycler) + // Lifecycler can be nil if the ingester is for a flusher. + if i.lifecycler != nil { + // Next initiate our graceful exit from the ring. + return services.StopAndAwaitTerminated(context.Background(), i.lifecycler) + } + + return nil } // ShutdownHandler triggers the following set of operations in order: diff --git a/pkg/ingester/series_map.go b/pkg/ingester/series_map.go index dd4dbeacfdb..7fd97f32bbe 100644 --- a/pkg/ingester/series_map.go +++ b/pkg/ingester/series_map.go @@ -23,8 +23,9 @@ type seriesMap struct { type shard struct { mtx sync.Mutex m map[model.Fingerprint]*memorySeries - //nolint:structcheck,unused // Align this struct. - pad [cacheLineSize - unsafe.Sizeof(sync.Mutex{}) - unsafe.Sizeof(map[model.Fingerprint]*memorySeries{})]byte + + // Align this struct. + _ [cacheLineSize - unsafe.Sizeof(sync.Mutex{}) - unsafe.Sizeof(map[model.Fingerprint]*memorySeries{})]byte } // fingerprintSeriesPair pairs a fingerprint with a memorySeries pointer. diff --git a/pkg/util/errors.go b/pkg/util/errors.go new file mode 100644 index 00000000000..b830c0bc59e --- /dev/null +++ b/pkg/util/errors.go @@ -0,0 +1,6 @@ +package util + +import "errors" + +// ErrStopCortex is the error returned by a service as a hint to stop the Cortex server entirely. +var ErrStopCortex = errors.New("stop cortex")