From 750d9355a677d092343d6b1aa19edc9f1e825859 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Mon, 20 Jan 2020 11:03:22 +0530 Subject: [PATCH 1/6] Flusher target to flush WAL Signed-off-by: Ganesh Vernekar --- CHANGELOG.md | 5 +- cmd/cortex/main.go | 21 +- docs/configuration/arguments.md | 11 + docs/configuration/config-file-reference.md | 13 ++ pkg/cortex/cortex.go | 4 + pkg/cortex/modules.go | 44 ++++ pkg/flusher/flusher.go | 247 ++++++++++++++++++++ pkg/ingester/flush.go | 24 +- pkg/ingester/ingester.go | 33 ++- pkg/ingester/ingester_v2.go | 4 +- pkg/ingester/mapper_test.go | 14 +- pkg/ingester/metrics.go | 6 +- pkg/ingester/series.go | 44 ++-- pkg/ingester/series_map.go | 26 +-- pkg/ingester/transfer.go | 28 +-- pkg/ingester/user_state.go | 69 +++--- pkg/ingester/user_state_test.go | 2 +- pkg/ingester/wal.go | 58 ++--- 18 files changed, 498 insertions(+), 155 deletions(-) create mode 100644 pkg/flusher/flusher.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fcc82f9f4d..84efb66f111 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,10 @@ * `--store.min-chunk-age` has been removed * `--querier.query-store-after` has been added in it's place. * [FEATURE] Added user sub rings to distribute users to a subset of ingesters. #1947 - * `--experimental.distributor.user-subring-size` +* [FEATURE] Flusher target to flush the WAL. + * `-flusher.wal-dir` for the WAL directory to recover from. + * `-flusher.concurrent-flushes` for number of concurrent flushes. + * `-flusher.flush-op-timeout` is duration after which a flush should timeout. * [ENHANCEMENT] Experimental TSDB: Export TSDB Syncer metrics from Compactor component, they are prefixed with `cortex_compactor_`. #2023 * [ENHANCEMENT] Experimental TSDB: Added dedicated flag `-experimental.tsdb.bucket-store.tenant-sync-concurrency` to configure the maximum number of concurrent tenants for which blocks are synched. #2026 * [ENHANCEMENT] Experimental TSDB: Expose metrics for objstore operations (prefixed with `cortex__thanos_objstore_`, component being one of `ingester`, `querier` and `compactor`). #2027 diff --git a/cmd/cortex/main.go b/cmd/cortex/main.go index bd45e3a0539..52027c1201f 100644 --- a/cmd/cortex/main.go +++ b/cmd/cortex/main.go @@ -7,6 +7,7 @@ import ( "math/rand" "os" "runtime" + "sync" "time" "github.com/go-kit/kit/log/level" @@ -99,12 +100,26 @@ func main() { level.Info(util.Logger).Log("msg", "Starting Cortex", "version", version.Info()) - if err := t.Run(); err != nil { - level.Error(util.Logger).Log("msg", "error running Cortex", "err", err) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + if err := t.Run(); err != nil { + level.Error(util.Logger).Log("msg", "error running Cortex", "err", err) + } + }() + + if cfg.Target.IsJob() { + err = t.Stop() + } + + wg.Wait() + + if !cfg.Target.IsJob() { + err = t.Stop() } runtime.KeepAlive(ballast) - err = t.Stop() util.CheckFatal("initializing cortex", err) } diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index 27633155677..77ddf5b79dc 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -323,6 +323,17 @@ It also talks to a KVStore and has it's own copies of the same flags used by the - `-ingester.recover-from-wal` Set this to `true` to recover data from an existing WAL. The data is recovered even if WAL is disabled and this is set to `true`. The WAL dir needs to be set for this. +#### Flusher + +- `-flusher.wal-dir` + Directory where the WAL data should be recovered from. + +- `-flusher.concurrent-flushes` + Number of concurrent flushes. + +- `-flusher.flush-op-timeout` + Duration after which a flush should timeout. + ## Runtime Configuration file Cortex has a concept of "runtime config" file, which is simply a file that is reloaded while Cortex is running. It is used by some Cortex components to allow operator to change some aspects of Cortex configuration without restarting it. File is specified by using `-runtime-config.file=` flag and reload period (which defaults to 10 seconds) can be changed by `-runtime-config.reload-period=` flag. Previously this mechanism was only used by limits overrides, and flags were called `-limits.per-user-override-config=` and `-limits.per-user-override-period=10s` respectively. These are still used, if `-runtime-config.file=` is not specified. diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 4ac0514f684..920e97d8141 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -53,6 +53,19 @@ Supported contents and default values of the config file: # The ingester_config configures the Cortex ingester. [ingester: ] +flusher: + # Directory to read WAL from. + # CLI flag: -flusher.wal-dir + [wal_dir: | default = "wal"] + + # Number of concurrent goroutines flushing to dynamodb. + # CLI flag: -flusher.concurrent-flushes + [concurrent_flushes: | default = 50] + + # Timeout for individual flush operations. + # CLI flag: -flusher.flush-op-timeout + [flush_op_timeout: | default = 2m0s] + # The storage_config configures where Cortex stores the data (chunks storage # engine). [storage: ] diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index f8a37431f6e..4e3ec1295ec 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -24,6 +24,7 @@ import ( config_client "github.com/cortexproject/cortex/pkg/configs/client" "github.com/cortexproject/cortex/pkg/configs/db" "github.com/cortexproject/cortex/pkg/distributor" + "github.com/cortexproject/cortex/pkg/flusher" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/querier" @@ -66,6 +67,7 @@ type Config struct { Querier querier.Config `yaml:"querier,omitempty"` IngesterClient client.Config `yaml:"ingester_client,omitempty"` Ingester ingester.Config `yaml:"ingester,omitempty"` + Flusher flusher.Config `yaml:"flusher,omitempty"` Storage storage.Config `yaml:"storage,omitempty"` ChunkStore chunk.StoreConfig `yaml:"chunk_store,omitempty"` Schema chunk.SchemaConfig `yaml:"schema,omitempty" doc:"hidden"` // Doc generation tool doesn't support it because part of the SchemaConfig doesn't support CLI flags (needs manual documentation) @@ -101,6 +103,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) { c.Querier.RegisterFlags(f) c.IngesterClient.RegisterFlags(f) c.Ingester.RegisterFlags(f) + c.Flusher.RegisterFlags(f) c.Storage.RegisterFlags(f) c.ChunkStore.RegisterFlags(f) c.Schema.RegisterFlags(f) @@ -164,6 +167,7 @@ type Cortex struct { overrides *validation.Overrides distributor *distributor.Distributor ingester *ingester.Ingester + flusher *flusher.Flusher store chunk.Store worker frontend.Worker frontend *frontend.Frontend diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index f7405df8fb9..2002bc2a23e 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -25,6 +25,7 @@ import ( "github.com/cortexproject/cortex/pkg/configs/api" "github.com/cortexproject/cortex/pkg/configs/db" "github.com/cortexproject/cortex/pkg/distributor" + "github.com/cortexproject/cortex/pkg/flusher" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/querier" @@ -47,6 +48,7 @@ const ( Server Distributor Ingester + Flusher Querier QueryFrontend Store @@ -74,6 +76,8 @@ func (m moduleName) String() string { return "store" case Ingester: return "ingester" + case Flusher: + return "flusher" case Querier: return "querier" case QueryFrontend: @@ -115,6 +119,9 @@ func (m *moduleName) Set(s string) error { case "ingester": *m = Ingester return nil + case "flusher": + *m = Flusher + return nil case "querier": *m = Querier return nil @@ -156,6 +163,15 @@ func (m *moduleName) UnmarshalYAML(unmarshal func(interface{}) error) error { return m.Set(s) } +func (m moduleName) IsJob() bool { + switch m { + case Flusher: + return true + } + + return false +} + func (t *Cortex) initServer(cfg *Config) (err error) { t.server, err = server.New(cfg.Server) return @@ -311,6 +327,28 @@ func (t *Cortex) stopIngester() error { return nil } +func (t *Cortex) initFlusher(cfg *Config) (err error) { + // By the end of this call, the chunks should be recovered + // from the WAL and flushed. + t.flusher, err = flusher.New( + cfg.Flusher, + cfg.Ingester, + cfg.IngesterClient, + t.store, + prometheus.DefaultRegisterer, + ) + if err != nil { + return + } + + t.server.HTTP.Path("/ready").Handler(http.HandlerFunc(t.flusher.ReadinessHandler)) + return +} + +func (t *Cortex) stopFlusher() error { + return t.flusher.Close() +} + func (t *Cortex) initStore(cfg *Config) (err error) { if cfg.Storage.Engine == storage.StorageEngineTSDB { return nil @@ -520,6 +558,12 @@ var modules = map[moduleName]module{ stop: (*Cortex).stopIngester, }, + Flusher: { + deps: []moduleName{Store, Server}, + init: (*Cortex).initFlusher, + stop: (*Cortex).stopFlusher, + }, + Querier: { deps: []moduleName{Distributor, Store, Ring, Server}, init: (*Cortex).initQuerier, diff --git a/pkg/flusher/flusher.go b/pkg/flusher/flusher.go new file mode 100644 index 00000000000..796042c3c98 --- /dev/null +++ b/pkg/flusher/flusher.go @@ -0,0 +1,247 @@ +package flusher + +import ( + "context" + "flag" + "net/http" + "sync" + "time" + + "github.com/go-kit/kit/log/level" + ot "github.com/opentracing/opentracing-go" + "github.com/prometheus/client_golang/prometheus" + + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/ingester" + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util" +) + +const ( + maxFlushRetries = 15 +) + +// Config for an Ingester. +type Config struct { + WALDir string `yaml:"wal_dir,omitempty"` + ConcurrentFlushes int `yaml:"concurrent_flushes,omitempty"` + FlushOpTimeout time.Duration `yaml:"flush_op_timeout,omitempty"` +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *Config) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&cfg.WALDir, "flusher.wal-dir", "wal", "Directory to read WAL from.") + f.IntVar(&cfg.ConcurrentFlushes, "flusher.concurrent-flushes", 50, "Number of concurrent goroutines flushing to dynamodb.") + f.DurationVar(&cfg.FlushOpTimeout, "flusher.flush-op-timeout", 2*time.Minute, "Timeout for individual flush operations.") +} + +// Flusher deals with "in flight" chunks. Based on Prometheus 1.x +// MemorySeriesStorage. +type Flusher struct { + cfg Config + + chunkStore ingester.ChunkStore + + // One queue per flush thread. Fingerprint is used to + // pick a queue. + seriesFlushedCounter prometheus.Counter + chunksFlushedCounter prometheus.Counter +} + +// New constructs a new Ingester. +func New( + cfg Config, + ingesterConfig ingester.Config, + clientConfig client.Config, + chunkStore ingester.ChunkStore, + registerer prometheus.Registerer, +) (*Flusher, error) { + f := &Flusher{ + cfg: cfg, + chunkStore: chunkStore, + } + + f.registerMetrics(registerer) + + metrics := ingester.NewIngesterMetrics(nil, true) + userStates := ingester.NewUserStates(nil, ingesterConfig, metrics) + + level.Info(util.Logger).Log("msg", "recovering from WAL") + + start := time.Now() + if err := ingester.RecoverFromWAL(cfg.WALDir, userStates); err != nil { + level.Error(util.Logger).Log("msg", "failed to recover from WAL", "time", time.Since(start).String()) + return nil, err + } + elapsed := time.Since(start) + + level.Info(util.Logger).Log("msg", "recovery from WAL completed", "time", elapsed.String()) + + err := f.flushAllUsers(userStates) + + // Sleeping to give a chance to Prometheus + // to collect the metrics. + time.Sleep(1 * time.Minute) + + return f, err +} + +func (f *Flusher) registerMetrics(registerer prometheus.Registerer) { + f.seriesFlushedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cortex_flusher_series_flushed_total", + Help: "Total number of series flushed.", + }) + f.chunksFlushedCounter = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "cortex_flusher_chunks_flushed_total", + Help: "Total number of chunks flushed.", + }) + + if registerer != nil { + registerer.MustRegister( + f.seriesFlushedCounter, + f.chunksFlushedCounter, + ) + } +} + +func (f *Flusher) flushAllUsers(userStates *ingester.UserStates) error { + level.Info(util.Logger).Log("msg", "flushing all series") + var ( + errChan = make(chan error, f.cfg.ConcurrentFlushes) + flushDataPool = sync.Pool{ + New: func() interface{} { + return &flushData{} + }, + } + flushDataChan = make(chan *flushData, 2*f.cfg.ConcurrentFlushes) + wg sync.WaitGroup + capturedErr error + ) + + wg.Add(f.cfg.ConcurrentFlushes) + for i := 0; i < f.cfg.ConcurrentFlushes; i++ { + go f.flushUserSeries(flushDataChan, errChan, &flushDataPool, &wg) + } + +Loop: + for userID, state := range userStates.Copy() { + for pair := range state.IterateSeries() { + select { + case capturedErr = <-errChan: + break Loop + default: + } + + fd := flushDataPool.Get().(*flushData) + fd.userID = userID + fd.pair = pair + flushDataChan <- fd + } + } + + close(flushDataChan) + wg.Wait() + + // In case there was an error, drain the channel. + for range flushDataChan { + } + + if capturedErr != nil { + level.Error(util.Logger).Log("msg", "error while flushing", "err", capturedErr) + return capturedErr + } + + select { + case err := <-errChan: + level.Error(util.Logger).Log("msg", "error while flushing", "err", err) + return err + default: + level.Info(util.Logger).Log("msg", "flushing done") + return nil + } +} + +type flushData struct { + userID string + pair ingester.FingerprintSeriesPair +} + +func (f *Flusher) flushUserSeries(flushDataChan <-chan *flushData, errChan chan<- error, flushDataPool *sync.Pool, wg *sync.WaitGroup) { + defer wg.Done() + + for fd := range flushDataChan { + err := f.flushSeries(fd) + flushDataPool.Put(fd) + if err != nil { + errChan <- err + return + } + } +} + +func (f *Flusher) flushSeries(fd *flushData) error { + fp := fd.pair.Fingerprint + series := fd.pair.Series + userID := fd.userID + + // shouldFlushSeries() has told us we have at least one chunk + chunkDescs := series.GetChunks() + if len(chunkDescs) == 0 { + return nil + } + + // flush the chunks without locking the series, as we don't want to hold the series lock for the duration of the dynamo/s3 rpcs. + ctx, cancel := context.WithTimeout(context.Background(), f.cfg.FlushOpTimeout) + defer cancel() // releases resources if slowOperation completes before timeout elapses + + sp, ctx := ot.StartSpanFromContext(ctx, "flushUserSeries") + defer sp.Finish() + sp.SetTag("organization", userID) + + util.Event().Log("msg", "flush chunks", "userID", userID, "numChunks", len(chunkDescs), "firstTime", chunkDescs[0].FirstTime, "fp", fp, "series", series.Metric(), "nlabels", len(series.Metric())) + + wireChunks := make([]chunk.Chunk, 0, len(chunkDescs)) + for _, chunkDesc := range chunkDescs { + c := chunk.NewChunk(userID, fp, series.Metric(), chunkDesc.C, chunkDesc.FirstTime, chunkDesc.LastTime) + if err := c.Encode(); err != nil { + return err + } + wireChunks = append(wireChunks, c) + } + + backoff := util.NewBackoff(ctx, util.BackoffConfig{ + MinBackoff: 1 * time.Second, + MaxBackoff: 10 * time.Second, + MaxRetries: maxFlushRetries, + }) + var err error + for backoff.Ongoing() { + err = f.chunkStore.Put(ctx, wireChunks) + if err == nil { + break + } + backoff.Wait() + } + if err != nil { + return err + } + + for _, chunkDesc := range chunkDescs { + utilization, length, size := chunkDesc.C.Utilization(), chunkDesc.C.Len(), chunkDesc.C.Size() + util.Event().Log("msg", "chunk flushed", "userID", userID, "fp", fp, "series", series.Metric(), "nlabels", len(series.Metric()), "utilization", utilization, "length", length, "size", size, "firstTime", chunkDesc.FirstTime, "lastTime", chunkDesc.LastTime) + } + + f.seriesFlushedCounter.Inc() + f.chunksFlushedCounter.Add(float64(len(wireChunks))) + + return nil +} + +// ReadinessHandler returns 204 always. +func (f *Flusher) ReadinessHandler(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNoContent) +} + +func (f *Flusher) Close() error { + return nil +} diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 7ae3909ca7c..27896d29be4 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -120,13 +120,13 @@ func (i *Ingester) sweepUsers(immediate bool) { oldest := model.Time(0) - for id, state := range i.userStates.cp() { - for pair := range state.fpToSeries.iter() { - state.fpLocker.Lock(pair.fp) - i.sweepSeries(id, pair.fp, pair.series, immediate) - i.removeFlushedChunks(state, pair.fp, pair.series) - first := pair.series.firstUnflushedChunkTime() - state.fpLocker.Unlock(pair.fp) + for id, state := range i.userStates.Copy() { + for pair := range state.fpToSeries.Iter() { + state.fpLocker.Lock(pair.Fingerprint) + i.sweepSeries(id, pair.Fingerprint, pair.Series, immediate) + i.removeFlushedChunks(state, pair.Fingerprint, pair.Series) + first := pair.Series.firstUnflushedChunkTime() + state.fpLocker.Unlock(pair.Fingerprint) if first > 0 && (oldest == 0 || first < oldest) { oldest = first @@ -174,7 +174,7 @@ func (f flushReason) String() string { // // NB we don't close the head chunk here, as the series could wait in the queue // for some time, and we want to encourage chunks to be as full as possible. -func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memorySeries, immediate bool) { +func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *MemorySeries, immediate bool) { if len(series.chunkDescs) <= 0 { return } @@ -192,7 +192,7 @@ func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memo } } -func (i *Ingester) shouldFlushSeries(series *memorySeries, fp model.Fingerprint, immediate bool) flushReason { +func (i *Ingester) shouldFlushSeries(series *MemorySeries, fp model.Fingerprint, immediate bool) flushReason { if len(series.chunkDescs) == 0 { return noFlush } @@ -211,7 +211,7 @@ func (i *Ingester) shouldFlushSeries(series *memorySeries, fp model.Fingerprint, return i.shouldFlushChunk(series.chunkDescs[0], fp, series.isStale()) } -func (i *Ingester) shouldFlushChunk(c *desc, fp model.Fingerprint, lastValueIsStale bool) flushReason { +func (i *Ingester) shouldFlushChunk(c *Desc, fp model.Fingerprint, lastValueIsStale bool) flushReason { if c.flushed { // don't flush chunks we've already flushed return noFlush } @@ -361,7 +361,7 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model. } // must be called under fpLocker lock -func (i *Ingester) removeFlushedChunks(userState *userState, fp model.Fingerprint, series *memorySeries) { +func (i *Ingester) removeFlushedChunks(userState *UserState, fp model.Fingerprint, series *MemorySeries) { now := model.Now() for len(series.chunkDescs) > 0 { if series.chunkDescs[0].flushed && now.Sub(series.chunkDescs[0].LastUpdate) > i.cfg.RetainPeriod { @@ -377,7 +377,7 @@ func (i *Ingester) removeFlushedChunks(userState *userState, fp model.Fingerprin } } -func (i *Ingester) flushChunks(ctx context.Context, userID string, fp model.Fingerprint, metric labels.Labels, chunkDescs []*desc) error { +func (i *Ingester) flushChunks(ctx context.Context, userID string, fp model.Fingerprint, metric labels.Labels, chunkDescs []*Desc) error { wireChunks := make([]chunk.Chunk, 0, len(chunkDescs)) for _, chunkDesc := range chunkDescs { c := chunk.NewChunk(userID, fp, metric, chunkDesc.C, chunkDesc.FirstTime, chunkDesc.LastTime) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index e62ea3d0f88..37041da3f9b 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -97,7 +97,7 @@ type Ingester struct { cfg Config clientConfig client.Config - metrics *ingesterMetrics + metrics *Metrics chunkStore ChunkStore lifecycler *ring.Lifecycler @@ -108,7 +108,7 @@ type Ingester struct { done sync.WaitGroup userStatesMtx sync.RWMutex // protects userStates and stopped - userStates *userStates + userStates *UserStates stopped bool // protected by userStatesMtx // One queue per flush thread. Fingerprint is used to @@ -157,7 +157,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c i := &Ingester{ cfg: cfg, clientConfig: clientConfig, - metrics: newIngesterMetrics(registerer, true), + metrics: NewIngesterMetrics(registerer, true), limits: limits, chunkStore: chunkStore, quit: make(chan struct{}), @@ -176,22 +176,31 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c if cfg.WALConfig.recover { level.Info(util.Logger).Log("msg", "recovering from WAL") + + // Use a local userStates, so we don't need to worry about locking. + userStates := NewUserStates(i.limiter, i.cfg, i.metrics) + start := time.Now() - if err := recoverFromWAL(i); err != nil { + if err := RecoverFromWAL(i.cfg.WALConfig.dir, userStates); err != nil { level.Error(util.Logger).Log("msg", "failed to recover from WAL", "time", time.Since(start).String()) return nil, err } elapsed := time.Since(start) + + i.userStatesMtx.Lock() + i.userStates = userStates + i.userStatesMtx.Unlock() + level.Info(util.Logger).Log("msg", "recovery from WAL completed", "time", elapsed.String()) i.metrics.walReplayDuration.Set(elapsed.Seconds()) } // If the WAL recover happened, then the userStates would already be set. if i.userStates == nil { - i.userStates = newUserStates(i.limiter, cfg, i.metrics) + i.userStates = NewUserStates(i.limiter, cfg, i.metrics) } - i.wal, err = newWAL(cfg.WALConfig, i.userStates.cp) + i.wal, err = newWAL(cfg.WALConfig, i.userStates.Copy) if err != nil { return nil, err } @@ -335,7 +344,7 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, labels.removeBlanks() var ( - state *userState + state *UserState fp model.Fingerprint ) i.userStatesMtx.RLock() @@ -437,7 +446,7 @@ func (i *Ingester) Query(ctx old_ctx.Context, req *client.QueryRequest) (*client result := &client.QueryResponse{} numSeries, numSamples := 0, 0 maxSamplesPerQuery := i.limits.MaxSamplesPerQuery(userID) - err = state.forSeriesMatching(ctx, matchers, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error { + err = state.forSeriesMatching(ctx, matchers, func(ctx context.Context, _ model.Fingerprint, series *MemorySeries) error { values, err := series.samplesForRange(from, through) if err != nil { return err @@ -500,8 +509,8 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ // can iteratively merge them with entries coming from the chunk store. But // that would involve locking all the series & sorting, so until we have // a better solution in the ingesters I'd rather take the hit in the queriers. - err = state.forSeriesMatching(stream.Context(), matchers, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error { - chunks := make([]*desc, 0, len(series.chunkDescs)) + err = state.forSeriesMatching(stream.Context(), matchers, func(ctx context.Context, _ model.Fingerprint, series *MemorySeries) error { + chunks := make([]*Desc, 0, len(series.chunkDescs)) for _, chunk := range series.chunkDescs { if !(chunk.FirstTime.After(through) || chunk.LastTime.Before(from)) { chunks = append(chunks, chunk.slice(from, through)) @@ -611,7 +620,7 @@ func (i *Ingester) MetricsForLabelMatchers(ctx old_ctx.Context, req *client.Metr lss := map[model.Fingerprint]labels.Labels{} for _, matchers := range matchersSet { - if err := state.forSeriesMatching(ctx, matchers, func(ctx context.Context, fp model.Fingerprint, series *memorySeries) error { + if err := state.forSeriesMatching(ctx, matchers, func(ctx context.Context, fp model.Fingerprint, series *MemorySeries) error { if _, ok := lss[fp]; !ok { lss[fp] = series.metric } @@ -664,7 +673,7 @@ func (i *Ingester) AllUserStats(ctx old_ctx.Context, req *client.UserStatsReques i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() - users := i.userStates.cp() + users := i.userStates.Copy() response := &client.UsersStatsResponse{ Stats: make([]*client.UserIDStatsResponse, 0, len(users)), diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 463f34198f1..02454dc514f 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -80,7 +80,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, i := &Ingester{ cfg: cfg, clientConfig: clientConfig, - metrics: newIngesterMetrics(registerer, false), + metrics: NewIngesterMetrics(registerer, false), limits: limits, chunkStore: nil, quit: make(chan struct{}), @@ -109,7 +109,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, // Init the limter and instantiate the user states which depend on it i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels) - i.userStates = newUserStates(i.limiter, cfg, i.metrics) + i.userStates = NewUserStates(i.limiter, cfg, i.metrics) // Scan and open TSDB's that already exist on disk if err := i.openExistingTSDB(context.Background()); err != nil { diff --git a/pkg/ingester/mapper_test.go b/pkg/ingester/mapper_test.go index bffe3cc473d..73debec1cdb 100644 --- a/pkg/ingester/mapper_test.go +++ b/pkg/ingester/mapper_test.go @@ -66,12 +66,12 @@ func TestFPMapper(t *testing.T) { // cm11 is in sm. Adding cm11 should do nothing. Mapping cm12 should resolve // the collision. - sm.put(fp1, &memorySeries{metric: cm11.copyValuesAndSort()}) + sm.put(fp1, &MemorySeries{metric: cm11.copyValuesAndSort()}) assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1) assertFingerprintEqual(t, mapper.mapFP(fp1, cm12), model.Fingerprint(1)) // The mapped cm12 is added to sm, too. That should not change the outcome. - sm.put(model.Fingerprint(1), &memorySeries{metric: cm12.copyValuesAndSort()}) + sm.put(model.Fingerprint(1), &MemorySeries{metric: cm12.copyValuesAndSort()}) assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1) assertFingerprintEqual(t, mapper.mapFP(fp1, cm12), model.Fingerprint(1)) @@ -80,27 +80,27 @@ func TestFPMapper(t *testing.T) { assertFingerprintEqual(t, mapper.mapFP(fp1, cm13), model.Fingerprint(2)) // Add cm13 to sm. Should not change anything. - sm.put(model.Fingerprint(2), &memorySeries{metric: cm13.copyValuesAndSort()}) + sm.put(model.Fingerprint(2), &MemorySeries{metric: cm13.copyValuesAndSort()}) assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1) assertFingerprintEqual(t, mapper.mapFP(fp1, cm12), model.Fingerprint(1)) assertFingerprintEqual(t, mapper.mapFP(fp1, cm13), model.Fingerprint(2)) // Now add cm21 and cm22 in the same way, checking the mapped FPs. assertFingerprintEqual(t, mapper.mapFP(fp2, cm21), fp2) - sm.put(fp2, &memorySeries{metric: cm21.copyValuesAndSort()}) + sm.put(fp2, &MemorySeries{metric: cm21.copyValuesAndSort()}) assertFingerprintEqual(t, mapper.mapFP(fp2, cm21), fp2) assertFingerprintEqual(t, mapper.mapFP(fp2, cm22), model.Fingerprint(3)) - sm.put(model.Fingerprint(3), &memorySeries{metric: cm22.copyValuesAndSort()}) + sm.put(model.Fingerprint(3), &MemorySeries{metric: cm22.copyValuesAndSort()}) assertFingerprintEqual(t, mapper.mapFP(fp2, cm21), fp2) assertFingerprintEqual(t, mapper.mapFP(fp2, cm22), model.Fingerprint(3)) // Map cm31, resulting in a mapping straight away. assertFingerprintEqual(t, mapper.mapFP(fp3, cm31), model.Fingerprint(4)) - sm.put(model.Fingerprint(4), &memorySeries{metric: cm31.copyValuesAndSort()}) + sm.put(model.Fingerprint(4), &MemorySeries{metric: cm31.copyValuesAndSort()}) // Map cm32, which is now mapped for two reasons... assertFingerprintEqual(t, mapper.mapFP(fp3, cm32), model.Fingerprint(5)) - sm.put(model.Fingerprint(5), &memorySeries{metric: cm32.copyValuesAndSort()}) + sm.put(model.Fingerprint(5), &MemorySeries{metric: cm32.copyValuesAndSort()}) // Now check ALL the mappings, just to be sure. assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1) diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 7512d36a332..1e69413af39 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -15,7 +15,7 @@ const ( memSeriesRemovedTotalHelp = "The total number of series that were removed per user." ) -type ingesterMetrics struct { +type Metrics struct { flushQueueLength prometheus.Gauge ingestedSamples prometheus.Counter ingestedSamplesFail prometheus.Counter @@ -30,8 +30,8 @@ type ingesterMetrics struct { walReplayDuration prometheus.Gauge } -func newIngesterMetrics(r prometheus.Registerer, registerMetricsConflictingWithTSDB bool) *ingesterMetrics { - m := &ingesterMetrics{ +func NewIngesterMetrics(r prometheus.Registerer, registerMetricsConflictingWithTSDB bool) *Metrics { + m := &Metrics{ flushQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "cortex_ingester_flush_queue_length", Help: "The total number of series pending in the flush queue.", diff --git a/pkg/ingester/series.go b/pkg/ingester/series.go index 5bf2f57f0eb..6e46a52c070 100644 --- a/pkg/ingester/series.go +++ b/pkg/ingester/series.go @@ -24,11 +24,11 @@ func init() { prometheus.MustRegister(createdChunks) } -type memorySeries struct { +type MemorySeries struct { metric labels.Labels // Sorted by start time, overlapping chunk ranges are forbidden. - chunkDescs []*desc + chunkDescs []*Desc // Whether the current head chunk has already been finished. If true, // the current head chunk must not be modified anymore. @@ -43,8 +43,8 @@ type memorySeries struct { // newMemorySeries returns a pointer to a newly allocated memorySeries for the // given metric. -func newMemorySeries(m labels.Labels) *memorySeries { - return &memorySeries{ +func newMemorySeries(m labels.Labels) *MemorySeries { + return &MemorySeries{ metric: m, lastTime: model.Earliest, } @@ -52,7 +52,7 @@ func newMemorySeries(m labels.Labels) *memorySeries { // add adds a sample pair to the series, possibly creating a new chunk. // The caller must have locked the fingerprint of the series. -func (s *memorySeries) add(v model.SamplePair) error { +func (s *MemorySeries) add(v model.SamplePair) error { // If sender has repeated the same timestamp, check more closely and perhaps return error. if v.Timestamp == s.lastTime { // If we don't know what the last sample value is, silently discard. @@ -126,7 +126,7 @@ func firstAndLastTimes(c encoding.Chunk) (model.Time, model.Time, error) { // closeHead marks the head chunk closed. The caller must have locked // the fingerprint of the memorySeries. This method will panic if this // series has no chunk descriptors. -func (s *memorySeries) closeHead(reason flushReason) { +func (s *MemorySeries) closeHead(reason flushReason) { s.chunkDescs[0].flushReason = reason s.headChunkClosed = true } @@ -134,14 +134,14 @@ func (s *memorySeries) closeHead(reason flushReason) { // firstTime returns the earliest known time for the series. The caller must have // locked the fingerprint of the memorySeries. This method will panic if this // series has no chunk descriptors. -func (s *memorySeries) firstTime() model.Time { +func (s *MemorySeries) firstTime() model.Time { return s.chunkDescs[0].FirstTime } // Returns time of oldest chunk in the series, that isn't flushed. If there are // no chunks, or all chunks are flushed, returns 0. // The caller must have locked the fingerprint of the memorySeries. -func (s *memorySeries) firstUnflushedChunkTime() model.Time { +func (s *MemorySeries) firstUnflushedChunkTime() model.Time { for _, c := range s.chunkDescs { if !c.flushed { return c.FirstTime @@ -154,11 +154,11 @@ func (s *memorySeries) firstUnflushedChunkTime() model.Time { // head returns a pointer to the head chunk descriptor. The caller must have // locked the fingerprint of the memorySeries. This method will panic if this // series has no chunk descriptors. -func (s *memorySeries) head() *desc { +func (s *MemorySeries) head() *Desc { return s.chunkDescs[len(s.chunkDescs)-1] } -func (s *memorySeries) samplesForRange(from, through model.Time) ([]model.SamplePair, error) { +func (s *MemorySeries) samplesForRange(from, through model.Time) ([]model.SamplePair, error) { // Find first chunk with start time after "from". fromIdx := sort.Search(len(s.chunkDescs), func(i int) bool { return s.chunkDescs[i].FirstTime.After(from) @@ -199,7 +199,7 @@ func (s *memorySeries) samplesForRange(from, through model.Time) ([]model.Sample return values, nil } -func (s *memorySeries) setChunks(descs []*desc) error { +func (s *MemorySeries) setChunks(descs []*Desc) error { if len(s.chunkDescs) != 0 { return fmt.Errorf("series already has chunks") } @@ -211,11 +211,19 @@ func (s *memorySeries) setChunks(descs []*desc) error { return nil } -func (s *memorySeries) isStale() bool { +func (s *MemorySeries) GetChunks() []*Desc { + return s.chunkDescs +} + +func (s *MemorySeries) Metric() labels.Labels { + return s.metric +} + +func (s *MemorySeries) isStale() bool { return s.lastSampleValueSet && value.IsStaleNaN(float64(s.lastSampleValue)) } -type desc struct { +type Desc struct { C encoding.Chunk // nil if chunk is evicted. FirstTime model.Time // Timestamp of first sample. Populated at creation. Immutable. LastTime model.Time // Timestamp of last sample. Populated at creation & on append. @@ -224,8 +232,8 @@ type desc struct { flushed bool // set to true when flush succeeds } -func newDesc(c encoding.Chunk, firstTime model.Time, lastTime model.Time) *desc { - return &desc{ +func newDesc(c encoding.Chunk, firstTime model.Time, lastTime model.Time) *Desc { + return &Desc{ C: c, FirstTime: firstTime, LastTime: lastTime, @@ -236,7 +244,7 @@ func newDesc(c encoding.Chunk, firstTime model.Time, lastTime model.Time) *desc // Add adds a sample pair to the underlying chunk. For safe concurrent access, // The chunk must be pinned, and the caller must have locked the fingerprint of // the series. -func (d *desc) add(s model.SamplePair) (encoding.Chunk, error) { +func (d *Desc) add(s model.SamplePair) (encoding.Chunk, error) { cs, err := d.C.Add(s) if err != nil { return nil, err @@ -250,8 +258,8 @@ func (d *desc) add(s model.SamplePair) (encoding.Chunk, error) { return cs, nil } -func (d *desc) slice(start, end model.Time) *desc { - return &desc{ +func (d *Desc) slice(start, end model.Time) *Desc { + return &Desc{ C: d.C.Slice(start, end), FirstTime: start, LastTime: end, diff --git a/pkg/ingester/series_map.go b/pkg/ingester/series_map.go index 15f598ccba1..de0cf2dbb20 100644 --- a/pkg/ingester/series_map.go +++ b/pkg/ingester/series_map.go @@ -21,15 +21,15 @@ type seriesMap struct { type shard struct { mtx sync.Mutex - m map[model.Fingerprint]*memorySeries + m map[model.Fingerprint]*MemorySeries // Align this struct. - pad [cacheLineSize - unsafe.Sizeof(sync.Mutex{}) - unsafe.Sizeof(map[model.Fingerprint]*memorySeries{})]byte + _ [cacheLineSize - unsafe.Sizeof(sync.Mutex{}) - unsafe.Sizeof(map[model.Fingerprint]*MemorySeries{})]byte } -// fingerprintSeriesPair pairs a fingerprint with a memorySeries pointer. -type fingerprintSeriesPair struct { - fp model.Fingerprint - series *memorySeries +// FingerprintSeriesPair pairs a fingerprint with a memorySeries pointer. +type FingerprintSeriesPair struct { + Fingerprint model.Fingerprint + Series *MemorySeries } // newSeriesMap returns a newly allocated empty seriesMap. To create a seriesMap @@ -37,7 +37,7 @@ type fingerprintSeriesPair struct { func newSeriesMap() *seriesMap { shards := make([]shard, seriesMapShards) for i := 0; i < seriesMapShards; i++ { - shards[i].m = map[model.Fingerprint]*memorySeries{} + shards[i].m = map[model.Fingerprint]*MemorySeries{} } return &seriesMap{ shards: shards, @@ -46,7 +46,7 @@ func newSeriesMap() *seriesMap { // get returns a memorySeries for a fingerprint. Return values have the same // semantics as the native Go map. -func (sm *seriesMap) get(fp model.Fingerprint) (*memorySeries, bool) { +func (sm *seriesMap) get(fp model.Fingerprint) (*MemorySeries, bool) { shard := &sm.shards[util.HashFP(fp)%seriesMapShards] shard.mtx.Lock() ms, ok := shard.m[fp] @@ -55,7 +55,7 @@ func (sm *seriesMap) get(fp model.Fingerprint) (*memorySeries, bool) { } // put adds a mapping to the seriesMap. -func (sm *seriesMap) put(fp model.Fingerprint, s *memorySeries) { +func (sm *seriesMap) put(fp model.Fingerprint, s *MemorySeries) { shard := &sm.shards[util.HashFP(fp)%seriesMapShards] shard.mtx.Lock() _, ok := shard.m[fp] @@ -79,21 +79,21 @@ func (sm *seriesMap) del(fp model.Fingerprint) { } } -// iter returns a channel that produces all mappings in the seriesMap. The +// Iter returns a channel that produces all mappings in the seriesMap. The // channel will be closed once all fingerprints have been received. Not // consuming all fingerprints from the channel will leak a goroutine. The // semantics of concurrent modification of seriesMap is the similar as the one // for iterating over a map with a 'range' clause. However, if the next element // in iteration order is removed after the current element has been received // from the channel, it will still be produced by the channel. -func (sm *seriesMap) iter() <-chan fingerprintSeriesPair { - ch := make(chan fingerprintSeriesPair) +func (sm *seriesMap) Iter() <-chan FingerprintSeriesPair { + ch := make(chan FingerprintSeriesPair) go func() { for i := range sm.shards { sm.shards[i].mtx.Lock() for fp, ms := range sm.shards[i].m { sm.shards[i].mtx.Unlock() - ch <- fingerprintSeriesPair{fp, ms} + ch <- FingerprintSeriesPair{fp, ms} sm.shards[i].mtx.Lock() } sm.shards[i].mtx.Unlock() diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index f74e30eb015..8ee6fb906c7 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -67,7 +67,7 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e fromIngesterID := "" seriesReceived := 0 xfer := func() error { - userStates := newUserStates(i.limiter, i.cfg, i.metrics) + userStates := NewUserStates(i.limiter, i.cfg, i.metrics) for { wireSeries, err := stream.Recv() @@ -349,7 +349,7 @@ func (i *Ingester) TransferTSDB(stream client.Ingester_TransferTSDBServer) error } // The passed wireChunks slice is for re-use. -func toWireChunks(descs []*desc, wireChunks []client.Chunk) ([]client.Chunk, error) { +func toWireChunks(descs []*Desc, wireChunks []client.Chunk) ([]client.Chunk, error) { if cap(wireChunks) < len(descs) { wireChunks = make([]client.Chunk, 0, len(descs)) } @@ -372,10 +372,10 @@ func toWireChunks(descs []*desc, wireChunks []client.Chunk) ([]client.Chunk, err return wireChunks, nil } -func fromWireChunks(wireChunks []client.Chunk) ([]*desc, error) { - descs := make([]*desc, 0, len(wireChunks)) +func fromWireChunks(wireChunks []client.Chunk) ([]*Desc, error) { + descs := make([]*Desc, 0, len(wireChunks)) for _, c := range wireChunks { - desc := &desc{ + desc := &Desc{ FirstTime: model.Time(c.StartTimestampMs), LastTime: model.Time(c.EndTimestampMs), LastUpdate: model.Now(), @@ -433,7 +433,7 @@ func (i *Ingester) transferOut(ctx context.Context) error { return i.v2TransferOut(ctx) } - userStatesCopy := i.userStates.cp() + userStatesCopy := i.userStates.Copy() if len(userStatesCopy) == 0 { level.Info(util.Logger).Log("msg", "nothing to transfer") return nil @@ -459,27 +459,27 @@ func (i *Ingester) transferOut(ctx context.Context) error { var chunks []client.Chunk for userID, state := range userStatesCopy { - for pair := range state.fpToSeries.iter() { - state.fpLocker.Lock(pair.fp) + for pair := range state.fpToSeries.Iter() { + state.fpLocker.Lock(pair.Fingerprint) - if len(pair.series.chunkDescs) == 0 { // Nothing to send? - state.fpLocker.Unlock(pair.fp) + if len(pair.Series.chunkDescs) == 0 { // Nothing to send? + state.fpLocker.Unlock(pair.Fingerprint) continue } - chunks, err = toWireChunks(pair.series.chunkDescs, chunks) + chunks, err = toWireChunks(pair.Series.chunkDescs, chunks) if err != nil { - state.fpLocker.Unlock(pair.fp) + state.fpLocker.Unlock(pair.Fingerprint) return errors.Wrap(err, "toWireChunks") } err = stream.Send(&client.TimeSeriesChunk{ FromIngesterId: i.lifecycler.ID, UserId: userID, - Labels: client.FromLabelsToLabelAdapters(pair.series.metric), + Labels: client.FromLabelsToLabelAdapters(pair.Series.metric), Chunks: chunks, }) - state.fpLocker.Unlock(pair.fp) + state.fpLocker.Unlock(pair.Fingerprint) if err != nil { return errors.Wrap(err, "Send") } diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index f735fc0b013..ad011952e82 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -22,16 +22,16 @@ import ( "github.com/weaveworks/common/user" ) -// userStates holds the userState object for all users (tenants), +// UserStates holds the userState object for all users (tenants), // each one containing all the in-memory series for a given user. -type userStates struct { +type UserStates struct { states sync.Map limiter *SeriesLimiter cfg Config - metrics *ingesterMetrics + metrics *Metrics } -type userState struct { +type UserState struct { limiter *SeriesLimiter userID string fpLocker *fingerprintLocker @@ -62,51 +62,41 @@ type metricCounterShard struct { m map[string]int } -func newUserStates(limiter *SeriesLimiter, cfg Config, metrics *ingesterMetrics) *userStates { - return &userStates{ +func NewUserStates(limiter *SeriesLimiter, cfg Config, metrics *Metrics) *UserStates { + return &UserStates{ limiter: limiter, cfg: cfg, metrics: metrics, } } -func (us *userStates) cp() map[string]*userState { - states := map[string]*userState{} +func (us *UserStates) Copy() map[string]*UserState { + states := map[string]*UserState{} us.states.Range(func(key, value interface{}) bool { - states[key.(string)] = value.(*userState) + states[key.(string)] = value.(*UserState) return true }) return states } -func (us *userStates) gc() { +func (us *UserStates) updateRates() { us.states.Range(func(key, value interface{}) bool { - state := value.(*userState) - if state.fpToSeries.length() == 0 { - us.states.Delete(key) - } - return true - }) -} - -func (us *userStates) updateRates() { - us.states.Range(func(key, value interface{}) bool { - state := value.(*userState) + state := value.(*UserState) state.ingestedAPISamples.tick() state.ingestedRuleSamples.tick() return true }) } -func (us *userStates) get(userID string) (*userState, bool) { +func (us *UserStates) get(userID string) (*UserState, bool) { state, ok := us.states.Load(userID) if !ok { return nil, ok } - return state.(*userState), ok + return state.(*UserState), ok } -func (us *userStates) getOrCreate(userID string) *userState { +func (us *UserStates) getOrCreate(userID string) *UserState { state, ok := us.get(userID) if !ok { @@ -120,7 +110,7 @@ func (us *userStates) getOrCreate(userID string) *userState { // Speculatively create a userState object and try to store it // in the map. Another goroutine may have got there before // us, in which case this userState will be discarded - state = &userState{ + state = &UserState{ userID: userID, limiter: us.limiter, fpToSeries: newSeriesMap(), @@ -140,13 +130,13 @@ func (us *userStates) getOrCreate(userID string) *userState { if !ok { us.metrics.memUsers.Inc() } - state = stored.(*userState) + state = stored.(*UserState) } return state } -func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, error) { +func (us *UserStates) getViaContext(ctx context.Context) (*UserState, bool, error) { userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, false, fmt.Errorf("no user id") @@ -155,13 +145,13 @@ func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, erro return state, ok, nil } -func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labels []client.LabelAdapter, record *Record) (*userState, model.Fingerprint, *memorySeries, error) { +func (us *UserStates) getOrCreateSeries(ctx context.Context, userID string, labels []client.LabelAdapter, record *Record) (*UserState, model.Fingerprint, *MemorySeries, error) { state := us.getOrCreate(userID) fp, series, err := state.getSeries(labels, record) return state, fp, series, err } -func (u *userState) getSeries(metric labelPairs, record *Record) (model.Fingerprint, *memorySeries, error) { +func (u *UserState) getSeries(metric labelPairs, record *Record) (model.Fingerprint, *MemorySeries, error) { rawFP := client.FastFingerprint(metric) u.fpLocker.Lock(rawFP) fp := u.mapper.mapFP(rawFP, metric) @@ -184,7 +174,7 @@ func (u *userState) getSeries(metric labelPairs, record *Record) (model.Fingerpr return fp, series, nil } -func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric labelPairs, record *Record, recovery bool) (*memorySeries, error) { +func (u *UserState) createSeriesWithFingerprint(fp model.Fingerprint, metric labelPairs, record *Record, recovery bool) (*MemorySeries, error) { // There's theoretically a relatively harmless race here if multiple // goroutines get the length of the series map at the same time, then // all proceed to add a new series. This is likely not worth addressing, @@ -226,7 +216,7 @@ func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric lab return series, nil } -func (u *userState) canAddSeriesFor(metric string) error { +func (u *UserState) canAddSeriesFor(metric string) error { shard := &u.seriesInMetric[util.HashFP(model.Fingerprint(fnv1a.HashString64(string(metric))))%metricCounterShards] shard.mtx.Lock() defer shard.mtx.Unlock() @@ -240,7 +230,7 @@ func (u *userState) canAddSeriesFor(metric string) error { return nil } -func (u *userState) removeSeries(fp model.Fingerprint, metric labels.Labels) { +func (u *UserState) removeSeries(fp model.Fingerprint, metric labels.Labels) { u.fpToSeries.del(fp) u.index.Delete(labels.Labels(metric), fp) @@ -272,8 +262,8 @@ func (u *userState) removeSeries(fp model.Fingerprint, metric labels.Labels) { // - The `send` callback is called at certain intervals specified by batchSize // with no locks held, and is intended to be used by the caller to send the // built batches. -func (u *userState) forSeriesMatching(ctx context.Context, allMatchers []*labels.Matcher, - add func(context.Context, model.Fingerprint, *memorySeries) error, +func (u *UserState) forSeriesMatching(ctx context.Context, allMatchers []*labels.Matcher, + add func(context.Context, model.Fingerprint, *MemorySeries) error, send func(context.Context) error, batchSize int, ) error { log, ctx := spanlogger.New(ctx, "forSeriesMatching") @@ -328,3 +318,14 @@ outer: } return nil } + +// IterateSeries returns a channel that produces all mappings in the seriesMap. The +// channel will be closed once all fingerprints have been received. Not +// consuming all fingerprints from the channel will leak a goroutine. The +// semantics of concurrent modification of seriesMap is the similar as the one +// for iterating over a map with a 'range' clause. However, if the next element +// in iteration order is removed after the current element has been received +// from the channel, it will still be produced by the channel. +func (u *UserState) IterateSeries() <-chan FingerprintSeriesPair { + return u.fpToSeries.Iter() +} diff --git a/pkg/ingester/user_state_test.go b/pkg/ingester/user_state_test.go index 5df173dcf12..5f5fb6dac4f 100644 --- a/pkg/ingester/user_state_test.go +++ b/pkg/ingester/user_state_test.go @@ -53,7 +53,7 @@ func TestForSeriesMatchingBatching(t *testing.T) { total, batch, batches := 0, 0, 0 err = instance.forSeriesMatching(ctx, tc.matchers, - func(_ context.Context, _ model.Fingerprint, s *memorySeries) error { + func(_ context.Context, _ model.Fingerprint, s *MemorySeries) error { batch++ return nil }, diff --git a/pkg/ingester/wal.go b/pkg/ingester/wal.go index 031c0194ae7..34e61518ffa 100644 --- a/pkg/ingester/wal.go +++ b/pkg/ingester/wal.go @@ -65,7 +65,7 @@ type walWrapper struct { wait sync.WaitGroup wal *wal.WAL - getUserStates func() map[string]*userState + getUserStates func() map[string]*UserState // Checkpoint metrics. checkpointDeleteFail prometheus.Counter @@ -76,7 +76,7 @@ type walWrapper struct { } // newWAL creates a WAL object. If the WAL is disabled, then the returned WAL is a no-op WAL. -func newWAL(cfg WALConfig, userStatesFunc func() map[string]*userState) (WAL, error) { +func newWAL(cfg WALConfig, userStatesFunc func() map[string]*UserState) (WAL, error) { if !cfg.walEnabled { return &noopWAL{}, nil } @@ -235,10 +235,10 @@ func (w *walWrapper) performCheckpoint() (err error) { var wireChunkBuf []client.Chunk for userID, state := range w.getUserStates() { - for pair := range state.fpToSeries.iter() { - state.fpLocker.Lock(pair.fp) - wireChunkBuf, err = w.checkpointSeries(checkpoint, userID, pair.fp, pair.series, wireChunkBuf) - state.fpLocker.Unlock(pair.fp) + for pair := range state.fpToSeries.Iter() { + state.fpLocker.Lock(pair.Fingerprint) + wireChunkBuf, err = w.checkpointSeries(checkpoint, userID, pair.Fingerprint, pair.Series, wireChunkBuf) + state.fpLocker.Unlock(pair.Fingerprint) if err != nil { return err } @@ -338,7 +338,7 @@ func (w *walWrapper) deleteCheckpoints(maxIndex int) (err error) { } // checkpointSeries write the chunks of the series to the checkpoint. -func (w *walWrapper) checkpointSeries(cp *wal.WAL, userID string, fp model.Fingerprint, series *memorySeries, wireChunks []client.Chunk) ([]client.Chunk, error) { +func (w *walWrapper) checkpointSeries(cp *wal.WAL, userID string, fp model.Fingerprint, series *MemorySeries, wireChunks []client.Chunk) ([]client.Chunk, error) { var err error wireChunks, err = toWireChunks(series.chunkDescs, wireChunks[:0]) if err != nil { @@ -358,30 +358,18 @@ func (w *walWrapper) checkpointSeries(cp *wal.WAL, userID string, fp model.Finge return wireChunks, cp.Log(buf) } -func recoverFromWAL(ingester *Ingester) (err error) { - walDir := ingester.cfg.WALConfig.dir - // Use a local userStates, so we don't need to worry about locking. - userStates := newUserStates(ingester.limiter, ingester.cfg, ingester.metrics) - - defer func() { - if err == nil { - ingester.userStatesMtx.Lock() - ingester.userStates = userStates - ingester.userStatesMtx.Unlock() - } - }() - +func RecoverFromWAL(walDir string, userStates *UserStates) (err error) { lastCheckpointDir, idx, err := lastCheckpoint(walDir) if err != nil { return err } nWorkers := runtime.GOMAXPROCS(0) - stateCache := make([]map[string]*userState, nWorkers) - seriesCache := make([]map[string]map[uint64]*memorySeries, nWorkers) + stateCache := make([]map[string]*UserState, nWorkers) + seriesCache := make([]map[string]map[uint64]*MemorySeries, nWorkers) for i := 0; i < nWorkers; i++ { - stateCache[i] = make(map[string]*userState) - seriesCache[i] = make(map[string]map[uint64]*memorySeries) + stateCache[i] = make(map[string]*UserState) + seriesCache[i] = make(map[string]map[uint64]*MemorySeries) } if idx >= 0 { @@ -432,8 +420,8 @@ func segmentsExist(dir string) (bool, error) { } // processCheckpoint loads the chunks of the series present in the last checkpoint. -func processCheckpoint(name string, userStates *userStates, nWorkers int, - stateCache []map[string]*userState, seriesCache []map[string]map[uint64]*memorySeries) error { +func processCheckpoint(name string, userStates *UserStates, nWorkers int, + stateCache []map[string]*UserState, seriesCache []map[string]map[uint64]*MemorySeries) error { reader, closer, err := newWalReader(name, -1) if err != nil { @@ -457,7 +445,7 @@ func processCheckpoint(name string, userStates *userStates, nWorkers int, wg.Add(nWorkers) for i := 0; i < nWorkers; i++ { inputs[i] = make(chan *Series, 300) - go func(input <-chan *Series, stateCache map[string]*userState, seriesCache map[string]map[uint64]*memorySeries) { + go func(input <-chan *Series, stateCache map[string]*UserState, seriesCache map[string]map[uint64]*MemorySeries) { processCheckpointRecord(userStates, seriesPool, stateCache, seriesCache, input, errChan) wg.Done() }(inputs[i], stateCache[i], seriesCache[i]) @@ -523,15 +511,15 @@ func copyLabelAdapters(las []client.LabelAdapter) []client.LabelAdapter { return las } -func processCheckpointRecord(userStates *userStates, seriesPool *sync.Pool, stateCache map[string]*userState, - seriesCache map[string]map[uint64]*memorySeries, seriesChan <-chan *Series, errChan chan error) { +func processCheckpointRecord(userStates *UserStates, seriesPool *sync.Pool, stateCache map[string]*UserState, + seriesCache map[string]map[uint64]*MemorySeries, seriesChan <-chan *Series, errChan chan error) { var la []client.LabelAdapter for s := range seriesChan { state, ok := stateCache[s.UserId] if !ok { state = userStates.getOrCreate(s.UserId) stateCache[s.UserId] = state - seriesCache[s.UserId] = make(map[uint64]*memorySeries) + seriesCache[s.UserId] = make(map[uint64]*MemorySeries) } la = la[:0] @@ -570,8 +558,8 @@ type samplesWithUserID struct { } // processWAL processes the records in the WAL concurrently. -func processWAL(name string, startSegment int, userStates *userStates, nWorkers int, - stateCache []map[string]*userState, seriesCache []map[string]map[uint64]*memorySeries) error { +func processWAL(name string, startSegment int, userStates *UserStates, nWorkers int, + stateCache []map[string]*UserState, seriesCache []map[string]map[uint64]*MemorySeries) error { reader, closer, err := newWalReader(name, startSegment) if err != nil { @@ -596,7 +584,7 @@ func processWAL(name string, startSegment int, userStates *userStates, nWorkers shards[i] = &samplesWithUserID{} go func(input <-chan *samplesWithUserID, output chan<- *samplesWithUserID, - stateCache map[string]*userState, seriesCache map[string]map[uint64]*memorySeries) { + stateCache map[string]*UserState, seriesCache map[string]map[uint64]*MemorySeries) { processWALSamples(userStates, stateCache, seriesCache, input, output, errChan) wg.Done() }(inputs[i], outputs[i], stateCache[i], seriesCache[i]) @@ -704,7 +692,7 @@ Loop: } } -func processWALSamples(userStates *userStates, stateCache map[string]*userState, seriesCache map[string]map[uint64]*memorySeries, +func processWALSamples(userStates *UserStates, stateCache map[string]*UserState, seriesCache map[string]map[uint64]*MemorySeries, input <-chan *samplesWithUserID, output chan<- *samplesWithUserID, errChan chan error) { defer close(output) @@ -714,7 +702,7 @@ func processWALSamples(userStates *userStates, stateCache map[string]*userState, if !ok { state = userStates.getOrCreate(samples.userID) stateCache[samples.userID] = state - seriesCache[samples.userID] = make(map[uint64]*memorySeries) + seriesCache[samples.userID] = make(map[uint64]*MemorySeries) } sc := seriesCache[samples.userID] for i := range samples.samples { From 9ccc629cff97ff054ea754f759a593f344a953a5 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Thu, 13 Feb 2020 17:34:06 +0530 Subject: [PATCH 2/6] Use ingester.Flush inside flusher Signed-off-by: Ganesh Vernekar --- pkg/flusher/flusher.go | 209 ++++---------------------------- pkg/ingester/flush.go | 24 ++-- pkg/ingester/ingester.go | 87 +++++++++---- pkg/ingester/ingester_v2.go | 4 +- pkg/ingester/mapper_test.go | 14 +-- pkg/ingester/metrics.go | 6 +- pkg/ingester/series.go | 44 +++---- pkg/ingester/series_map.go | 26 ++-- pkg/ingester/transfer.go | 28 ++--- pkg/ingester/user_state.go | 69 ++++++----- pkg/ingester/user_state_test.go | 2 +- pkg/ingester/wal.go | 46 +++---- 12 files changed, 212 insertions(+), 347 deletions(-) diff --git a/pkg/flusher/flusher.go b/pkg/flusher/flusher.go index 796042c3c98..68e711a3d07 100644 --- a/pkg/flusher/flusher.go +++ b/pkg/flusher/flusher.go @@ -1,26 +1,19 @@ package flusher import ( - "context" "flag" + "fmt" "net/http" - "sync" "time" "github.com/go-kit/kit/log/level" - ot "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" - "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/util" ) -const ( - maxFlushRetries = 15 -) - // Config for an Ingester. type Config struct { WALDir string `yaml:"wal_dir,omitempty"` @@ -35,20 +28,17 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.FlushOpTimeout, "flusher.flush-op-timeout", 2*time.Minute, "Timeout for individual flush operations.") } -// Flusher deals with "in flight" chunks. Based on Prometheus 1.x -// MemorySeriesStorage. +// Flusher is designed to be used as a job to flush the chunks from the WAL on disk. type Flusher struct { - cfg Config - - chunkStore ingester.ChunkStore - - // One queue per flush thread. Fingerprint is used to - // pick a queue. - seriesFlushedCounter prometheus.Counter - chunksFlushedCounter prometheus.Counter + ing *ingester.Ingester } -// New constructs a new Ingester. +const ( + postFlushSleepTime = 1 * time.Minute +) + +// New constructs a new Flusher and flushes the data from the WAL. +// The returned Flusher has no other operations. func New( cfg Config, ingesterConfig ingester.Config, @@ -56,185 +46,28 @@ func New( chunkStore ingester.ChunkStore, registerer prometheus.Registerer, ) (*Flusher, error) { - f := &Flusher{ - cfg: cfg, - chunkStore: chunkStore, - } - - f.registerMetrics(registerer) - metrics := ingester.NewIngesterMetrics(nil, true) - userStates := ingester.NewUserStates(nil, ingesterConfig, metrics) + ingesterConfig.WALConfig.Dir = cfg.WALDir - level.Info(util.Logger).Log("msg", "recovering from WAL") - - start := time.Now() - if err := ingester.RecoverFromWAL(cfg.WALDir, userStates); err != nil { - level.Error(util.Logger).Log("msg", "failed to recover from WAL", "time", time.Since(start).String()) + ing, err := ingester.NewForFlusher(ingesterConfig, clientConfig, chunkStore, registerer) + if err != nil { return nil, err } - elapsed := time.Since(start) - level.Info(util.Logger).Log("msg", "recovery from WAL completed", "time", elapsed.String()) + return &Flusher{ + ing: ing, + }, err +} - err := f.flushAllUsers(userStates) +func (f *Flusher) Flush() { + f.ing.Flush() // Sleeping to give a chance to Prometheus // to collect the metrics. - time.Sleep(1 * time.Minute) - - return f, err -} - -func (f *Flusher) registerMetrics(registerer prometheus.Registerer) { - f.seriesFlushedCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "cortex_flusher_series_flushed_total", - Help: "Total number of series flushed.", - }) - f.chunksFlushedCounter = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "cortex_flusher_chunks_flushed_total", - Help: "Total number of chunks flushed.", - }) - - if registerer != nil { - registerer.MustRegister( - f.seriesFlushedCounter, - f.chunksFlushedCounter, - ) - } -} - -func (f *Flusher) flushAllUsers(userStates *ingester.UserStates) error { - level.Info(util.Logger).Log("msg", "flushing all series") - var ( - errChan = make(chan error, f.cfg.ConcurrentFlushes) - flushDataPool = sync.Pool{ - New: func() interface{} { - return &flushData{} - }, - } - flushDataChan = make(chan *flushData, 2*f.cfg.ConcurrentFlushes) - wg sync.WaitGroup - capturedErr error - ) - - wg.Add(f.cfg.ConcurrentFlushes) - for i := 0; i < f.cfg.ConcurrentFlushes; i++ { - go f.flushUserSeries(flushDataChan, errChan, &flushDataPool, &wg) - } - -Loop: - for userID, state := range userStates.Copy() { - for pair := range state.IterateSeries() { - select { - case capturedErr = <-errChan: - break Loop - default: - } - - fd := flushDataPool.Get().(*flushData) - fd.userID = userID - fd.pair = pair - flushDataChan <- fd - } - } - - close(flushDataChan) - wg.Wait() - - // In case there was an error, drain the channel. - for range flushDataChan { - } - - if capturedErr != nil { - level.Error(util.Logger).Log("msg", "error while flushing", "err", capturedErr) - return capturedErr - } - - select { - case err := <-errChan: - level.Error(util.Logger).Log("msg", "error while flushing", "err", err) - return err - default: - level.Info(util.Logger).Log("msg", "flushing done") - return nil - } -} - -type flushData struct { - userID string - pair ingester.FingerprintSeriesPair -} - -func (f *Flusher) flushUserSeries(flushDataChan <-chan *flushData, errChan chan<- error, flushDataPool *sync.Pool, wg *sync.WaitGroup) { - defer wg.Done() - - for fd := range flushDataChan { - err := f.flushSeries(fd) - flushDataPool.Put(fd) - if err != nil { - errChan <- err - return - } - } -} - -func (f *Flusher) flushSeries(fd *flushData) error { - fp := fd.pair.Fingerprint - series := fd.pair.Series - userID := fd.userID - - // shouldFlushSeries() has told us we have at least one chunk - chunkDescs := series.GetChunks() - if len(chunkDescs) == 0 { - return nil - } + level.Info(util.Logger).Log("msg", fmt.Sprintf("sleeping for %s to give chance for collection of metrics", postFlushSleepTime.String())) + time.Sleep(postFlushSleepTime) - // flush the chunks without locking the series, as we don't want to hold the series lock for the duration of the dynamo/s3 rpcs. - ctx, cancel := context.WithTimeout(context.Background(), f.cfg.FlushOpTimeout) - defer cancel() // releases resources if slowOperation completes before timeout elapses - - sp, ctx := ot.StartSpanFromContext(ctx, "flushUserSeries") - defer sp.Finish() - sp.SetTag("organization", userID) - - util.Event().Log("msg", "flush chunks", "userID", userID, "numChunks", len(chunkDescs), "firstTime", chunkDescs[0].FirstTime, "fp", fp, "series", series.Metric(), "nlabels", len(series.Metric())) - - wireChunks := make([]chunk.Chunk, 0, len(chunkDescs)) - for _, chunkDesc := range chunkDescs { - c := chunk.NewChunk(userID, fp, series.Metric(), chunkDesc.C, chunkDesc.FirstTime, chunkDesc.LastTime) - if err := c.Encode(); err != nil { - return err - } - wireChunks = append(wireChunks, c) - } - - backoff := util.NewBackoff(ctx, util.BackoffConfig{ - MinBackoff: 1 * time.Second, - MaxBackoff: 10 * time.Second, - MaxRetries: maxFlushRetries, - }) - var err error - for backoff.Ongoing() { - err = f.chunkStore.Put(ctx, wireChunks) - if err == nil { - break - } - backoff.Wait() - } - if err != nil { - return err - } - - for _, chunkDesc := range chunkDescs { - utilization, length, size := chunkDesc.C.Utilization(), chunkDesc.C.Len(), chunkDesc.C.Size() - util.Event().Log("msg", "chunk flushed", "userID", userID, "fp", fp, "series", series.Metric(), "nlabels", len(series.Metric()), "utilization", utilization, "length", length, "size", size, "firstTime", chunkDesc.FirstTime, "lastTime", chunkDesc.LastTime) - } - - f.seriesFlushedCounter.Inc() - f.chunksFlushedCounter.Add(float64(len(wireChunks))) - - return nil + f.ing.Shutdown() } // ReadinessHandler returns 204 always. diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 4208317260b..fc510d6d364 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -119,13 +119,13 @@ func (i *Ingester) sweepUsers(immediate bool) { oldest := model.Time(0) - for id, state := range i.userStates.Copy() { - for pair := range state.fpToSeries.Iter() { - state.fpLocker.Lock(pair.Fingerprint) - i.sweepSeries(id, pair.Fingerprint, pair.Series, immediate) - i.removeFlushedChunks(state, pair.Fingerprint, pair.Series) - first := pair.Series.firstUnflushedChunkTime() - state.fpLocker.Unlock(pair.Fingerprint) + for id, state := range i.userStates.cp() { + for pair := range state.fpToSeries.iter() { + state.fpLocker.Lock(pair.fp) + i.sweepSeries(id, pair.fp, pair.series, immediate) + i.removeFlushedChunks(state, pair.fp, pair.series) + first := pair.series.firstUnflushedChunkTime() + state.fpLocker.Unlock(pair.fp) if first > 0 && (oldest == 0 || first < oldest) { oldest = first @@ -173,7 +173,7 @@ func (f flushReason) String() string { // // NB we don't close the head chunk here, as the series could wait in the queue // for some time, and we want to encourage chunks to be as full as possible. -func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *MemorySeries, immediate bool) { +func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *memorySeries, immediate bool) { if len(series.chunkDescs) <= 0 { return } @@ -191,7 +191,7 @@ func (i *Ingester) sweepSeries(userID string, fp model.Fingerprint, series *Memo } } -func (i *Ingester) shouldFlushSeries(series *MemorySeries, fp model.Fingerprint, immediate bool) flushReason { +func (i *Ingester) shouldFlushSeries(series *memorySeries, fp model.Fingerprint, immediate bool) flushReason { if len(series.chunkDescs) == 0 { return noFlush } @@ -210,7 +210,7 @@ func (i *Ingester) shouldFlushSeries(series *MemorySeries, fp model.Fingerprint, return i.shouldFlushChunk(series.chunkDescs[0], fp, series.isStale()) } -func (i *Ingester) shouldFlushChunk(c *Desc, fp model.Fingerprint, lastValueIsStale bool) flushReason { +func (i *Ingester) shouldFlushChunk(c *desc, fp model.Fingerprint, lastValueIsStale bool) flushReason { if c.flushed { // don't flush chunks we've already flushed return noFlush } @@ -360,7 +360,7 @@ func (i *Ingester) flushUserSeries(flushQueueIndex int, userID string, fp model. } // must be called under fpLocker lock -func (i *Ingester) removeFlushedChunks(userState *UserState, fp model.Fingerprint, series *MemorySeries) { +func (i *Ingester) removeFlushedChunks(userState *userState, fp model.Fingerprint, series *memorySeries) { now := model.Now() for len(series.chunkDescs) > 0 { if series.chunkDescs[0].flushed && now.Sub(series.chunkDescs[0].LastUpdate) > i.cfg.RetainPeriod { @@ -376,7 +376,7 @@ func (i *Ingester) removeFlushedChunks(userState *UserState, fp model.Fingerprin } } -func (i *Ingester) flushChunks(ctx context.Context, userID string, fp model.Fingerprint, metric labels.Labels, chunkDescs []*Desc) error { +func (i *Ingester) flushChunks(ctx context.Context, userID string, fp model.Fingerprint, metric labels.Labels, chunkDescs []*desc) error { wireChunks := make([]chunk.Chunk, 0, len(chunkDescs)) for _, chunkDesc := range chunkDescs { c := chunk.NewChunk(userID, fp, metric, chunkDesc.C, chunkDesc.FirstTime, chunkDesc.LastTime) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 33b1e2c8faa..2cf7c24e1a9 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -94,7 +94,7 @@ type Ingester struct { cfg Config clientConfig client.Config - metrics *Metrics + metrics *ingesterMetrics chunkStore ChunkStore lifecycler *ring.Lifecycler @@ -105,7 +105,7 @@ type Ingester struct { done sync.WaitGroup userStatesMtx sync.RWMutex // protects userStates and stopped - userStates *UserStates + userStates *userStates stopped bool // protected by userStatesMtx // One queue per flush thread. Fingerprint is used to @@ -154,7 +154,7 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c i := &Ingester{ cfg: cfg, clientConfig: clientConfig, - metrics: NewIngesterMetrics(registerer, true), + metrics: newIngesterMetrics(registerer, true), limits: limits, chunkStore: chunkStore, quit: make(chan struct{}), @@ -173,31 +173,22 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c if cfg.WALConfig.Recover { level.Info(util.Logger).Log("msg", "recovering from WAL") - - // Use a local userStates, so we don't need to worry about locking. - userStates := NewUserStates(i.limiter, i.cfg, i.metrics) - start := time.Now() - if err := RecoverFromWAL(i.cfg.WALConfig.dir, userStates); err != nil { + if err := recoverFromWAL(i); err != nil { level.Error(util.Logger).Log("msg", "failed to recover from WAL", "time", time.Since(start).String()) return nil, err } elapsed := time.Since(start) - - i.userStatesMtx.Lock() - i.userStates = userStates - i.userStatesMtx.Unlock() - level.Info(util.Logger).Log("msg", "recovery from WAL completed", "time", elapsed.String()) i.metrics.walReplayDuration.Set(elapsed.Seconds()) } // If the WAL recover happened, then the userStates would already be set. if i.userStates == nil { - i.userStates = NewUserStates(i.limiter, cfg, i.metrics) + i.userStates = newUserStates(i.limiter, cfg, i.metrics) } - i.wal, err = newWAL(cfg.WALConfig, i.userStates.Copy) + i.wal, err = newWAL(cfg.WALConfig, i.userStates.cp) if err != nil { return nil, err } @@ -217,6 +208,54 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c return i, nil } +// NewForFlusher constructs a new Ingester to be used by flusher target. +func NewForFlusher(cfg Config, clientConfig client.Config, chunkStore ChunkStore, registerer prometheus.Registerer) (*Ingester, error) { + if cfg.ingesterClientFactory == nil { + cfg.ingesterClientFactory = client.MakeIngesterClient + } + + i := &Ingester{ + cfg: cfg, + clientConfig: clientConfig, + metrics: newIngesterMetrics(registerer, true), + chunkStore: chunkStore, + quit: make(chan struct{}), + flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), + } + + level.Info(util.Logger).Log("msg", "recovering from WAL") + + // We recover from WAL always. + start := time.Now() + if err := recoverFromWAL(i); err != nil { + level.Error(util.Logger).Log("msg", "failed to recover from WAL", "time", time.Since(start).String()) + return nil, err + } + elapsed := time.Since(start) + + level.Info(util.Logger).Log("msg", "recovery from WAL completed", "time", elapsed.String()) + i.metrics.walReplayDuration.Set(elapsed.Seconds()) + + // Should be a noop WAL. + cfg.WALConfig.WALEnabled = false + var err error + i.wal, err = newWAL(cfg.WALConfig, i.userStates.cp) + if err != nil { + return nil, err + } + + i.flushQueuesDone.Add(cfg.ConcurrentFlushes) + for j := 0; j < cfg.ConcurrentFlushes; j++ { + i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueueLength) + go i.flushLoop(j) + } + + i.done.Add(1) + go i.loop() + + return i, nil +} + func (i *Ingester) loop() { defer i.done.Done() @@ -253,8 +292,10 @@ func (i *Ingester) Shutdown() { i.wal.Stop() - // Next initiate our graceful exit from the ring. - i.lifecycler.Shutdown() + if i.lifecycler != nil { + // Next initiate our graceful exit from the ring. + i.lifecycler.Shutdown() + } } } @@ -341,7 +382,7 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, labels.removeBlanks() var ( - state *UserState + state *userState fp model.Fingerprint ) i.userStatesMtx.RLock() @@ -443,7 +484,7 @@ func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client result := &client.QueryResponse{} numSeries, numSamples := 0, 0 maxSamplesPerQuery := i.limits.MaxSamplesPerQuery(userID) - err = state.forSeriesMatching(ctx, matchers, func(ctx context.Context, _ model.Fingerprint, series *MemorySeries) error { + err = state.forSeriesMatching(ctx, matchers, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error { values, err := series.samplesForRange(from, through) if err != nil { return err @@ -506,8 +547,8 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ // can iteratively merge them with entries coming from the chunk store. But // that would involve locking all the series & sorting, so until we have // a better solution in the ingesters I'd rather take the hit in the queriers. - err = state.forSeriesMatching(stream.Context(), matchers, func(ctx context.Context, _ model.Fingerprint, series *MemorySeries) error { - chunks := make([]*Desc, 0, len(series.chunkDescs)) + err = state.forSeriesMatching(stream.Context(), matchers, func(ctx context.Context, _ model.Fingerprint, series *memorySeries) error { + chunks := make([]*desc, 0, len(series.chunkDescs)) for _, chunk := range series.chunkDescs { if !(chunk.FirstTime.After(through) || chunk.LastTime.Before(from)) { chunks = append(chunks, chunk.slice(from, through)) @@ -617,7 +658,7 @@ func (i *Ingester) MetricsForLabelMatchers(ctx context.Context, req *client.Metr lss := map[model.Fingerprint]labels.Labels{} for _, matchers := range matchersSet { - if err := state.forSeriesMatching(ctx, matchers, func(ctx context.Context, fp model.Fingerprint, series *MemorySeries) error { + if err := state.forSeriesMatching(ctx, matchers, func(ctx context.Context, fp model.Fingerprint, series *memorySeries) error { if _, ok := lss[fp]; !ok { lss[fp] = series.metric } @@ -670,7 +711,7 @@ func (i *Ingester) AllUserStats(ctx context.Context, req *client.UserStatsReques i.userStatesMtx.RLock() defer i.userStatesMtx.RUnlock() - users := i.userStates.Copy() + users := i.userStates.cp() response := &client.UsersStatsResponse{ Stats: make([]*client.UserIDStatsResponse, 0, len(users)), diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index b675e0b1182..00ea5b242b0 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -81,7 +81,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, i := &Ingester{ cfg: cfg, clientConfig: clientConfig, - metrics: NewIngesterMetrics(registerer, false), + metrics: newIngesterMetrics(registerer, false), limits: limits, chunkStore: nil, quit: make(chan struct{}), @@ -110,7 +110,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, // Init the limter and instantiate the user states which depend on it i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels) - i.userStates = NewUserStates(i.limiter, cfg, i.metrics) + i.userStates = newUserStates(i.limiter, cfg, i.metrics) // Scan and open TSDB's that already exist on disk if err := i.openExistingTSDB(context.Background()); err != nil { diff --git a/pkg/ingester/mapper_test.go b/pkg/ingester/mapper_test.go index 73debec1cdb..bffe3cc473d 100644 --- a/pkg/ingester/mapper_test.go +++ b/pkg/ingester/mapper_test.go @@ -66,12 +66,12 @@ func TestFPMapper(t *testing.T) { // cm11 is in sm. Adding cm11 should do nothing. Mapping cm12 should resolve // the collision. - sm.put(fp1, &MemorySeries{metric: cm11.copyValuesAndSort()}) + sm.put(fp1, &memorySeries{metric: cm11.copyValuesAndSort()}) assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1) assertFingerprintEqual(t, mapper.mapFP(fp1, cm12), model.Fingerprint(1)) // The mapped cm12 is added to sm, too. That should not change the outcome. - sm.put(model.Fingerprint(1), &MemorySeries{metric: cm12.copyValuesAndSort()}) + sm.put(model.Fingerprint(1), &memorySeries{metric: cm12.copyValuesAndSort()}) assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1) assertFingerprintEqual(t, mapper.mapFP(fp1, cm12), model.Fingerprint(1)) @@ -80,27 +80,27 @@ func TestFPMapper(t *testing.T) { assertFingerprintEqual(t, mapper.mapFP(fp1, cm13), model.Fingerprint(2)) // Add cm13 to sm. Should not change anything. - sm.put(model.Fingerprint(2), &MemorySeries{metric: cm13.copyValuesAndSort()}) + sm.put(model.Fingerprint(2), &memorySeries{metric: cm13.copyValuesAndSort()}) assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1) assertFingerprintEqual(t, mapper.mapFP(fp1, cm12), model.Fingerprint(1)) assertFingerprintEqual(t, mapper.mapFP(fp1, cm13), model.Fingerprint(2)) // Now add cm21 and cm22 in the same way, checking the mapped FPs. assertFingerprintEqual(t, mapper.mapFP(fp2, cm21), fp2) - sm.put(fp2, &MemorySeries{metric: cm21.copyValuesAndSort()}) + sm.put(fp2, &memorySeries{metric: cm21.copyValuesAndSort()}) assertFingerprintEqual(t, mapper.mapFP(fp2, cm21), fp2) assertFingerprintEqual(t, mapper.mapFP(fp2, cm22), model.Fingerprint(3)) - sm.put(model.Fingerprint(3), &MemorySeries{metric: cm22.copyValuesAndSort()}) + sm.put(model.Fingerprint(3), &memorySeries{metric: cm22.copyValuesAndSort()}) assertFingerprintEqual(t, mapper.mapFP(fp2, cm21), fp2) assertFingerprintEqual(t, mapper.mapFP(fp2, cm22), model.Fingerprint(3)) // Map cm31, resulting in a mapping straight away. assertFingerprintEqual(t, mapper.mapFP(fp3, cm31), model.Fingerprint(4)) - sm.put(model.Fingerprint(4), &MemorySeries{metric: cm31.copyValuesAndSort()}) + sm.put(model.Fingerprint(4), &memorySeries{metric: cm31.copyValuesAndSort()}) // Map cm32, which is now mapped for two reasons... assertFingerprintEqual(t, mapper.mapFP(fp3, cm32), model.Fingerprint(5)) - sm.put(model.Fingerprint(5), &MemorySeries{metric: cm32.copyValuesAndSort()}) + sm.put(model.Fingerprint(5), &memorySeries{metric: cm32.copyValuesAndSort()}) // Now check ALL the mappings, just to be sure. assertFingerprintEqual(t, mapper.mapFP(fp1, cm11), fp1) diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 575bba6ad76..d4d64daa272 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -16,7 +16,7 @@ const ( memSeriesRemovedTotalHelp = "The total number of series that were removed per user." ) -type Metrics struct { +type ingesterMetrics struct { flushQueueLength prometheus.Gauge ingestedSamples prometheus.Counter ingestedSamplesFail prometheus.Counter @@ -31,8 +31,8 @@ type Metrics struct { walReplayDuration prometheus.Gauge } -func NewIngesterMetrics(r prometheus.Registerer, registerMetricsConflictingWithTSDB bool) *Metrics { - m := &Metrics{ +func newIngesterMetrics(r prometheus.Registerer, registerMetricsConflictingWithTSDB bool) *ingesterMetrics { + m := &ingesterMetrics{ flushQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "cortex_ingester_flush_queue_length", Help: "The total number of series pending in the flush queue.", diff --git a/pkg/ingester/series.go b/pkg/ingester/series.go index 6e46a52c070..5bf2f57f0eb 100644 --- a/pkg/ingester/series.go +++ b/pkg/ingester/series.go @@ -24,11 +24,11 @@ func init() { prometheus.MustRegister(createdChunks) } -type MemorySeries struct { +type memorySeries struct { metric labels.Labels // Sorted by start time, overlapping chunk ranges are forbidden. - chunkDescs []*Desc + chunkDescs []*desc // Whether the current head chunk has already been finished. If true, // the current head chunk must not be modified anymore. @@ -43,8 +43,8 @@ type MemorySeries struct { // newMemorySeries returns a pointer to a newly allocated memorySeries for the // given metric. -func newMemorySeries(m labels.Labels) *MemorySeries { - return &MemorySeries{ +func newMemorySeries(m labels.Labels) *memorySeries { + return &memorySeries{ metric: m, lastTime: model.Earliest, } @@ -52,7 +52,7 @@ func newMemorySeries(m labels.Labels) *MemorySeries { // add adds a sample pair to the series, possibly creating a new chunk. // The caller must have locked the fingerprint of the series. -func (s *MemorySeries) add(v model.SamplePair) error { +func (s *memorySeries) add(v model.SamplePair) error { // If sender has repeated the same timestamp, check more closely and perhaps return error. if v.Timestamp == s.lastTime { // If we don't know what the last sample value is, silently discard. @@ -126,7 +126,7 @@ func firstAndLastTimes(c encoding.Chunk) (model.Time, model.Time, error) { // closeHead marks the head chunk closed. The caller must have locked // the fingerprint of the memorySeries. This method will panic if this // series has no chunk descriptors. -func (s *MemorySeries) closeHead(reason flushReason) { +func (s *memorySeries) closeHead(reason flushReason) { s.chunkDescs[0].flushReason = reason s.headChunkClosed = true } @@ -134,14 +134,14 @@ func (s *MemorySeries) closeHead(reason flushReason) { // firstTime returns the earliest known time for the series. The caller must have // locked the fingerprint of the memorySeries. This method will panic if this // series has no chunk descriptors. -func (s *MemorySeries) firstTime() model.Time { +func (s *memorySeries) firstTime() model.Time { return s.chunkDescs[0].FirstTime } // Returns time of oldest chunk in the series, that isn't flushed. If there are // no chunks, or all chunks are flushed, returns 0. // The caller must have locked the fingerprint of the memorySeries. -func (s *MemorySeries) firstUnflushedChunkTime() model.Time { +func (s *memorySeries) firstUnflushedChunkTime() model.Time { for _, c := range s.chunkDescs { if !c.flushed { return c.FirstTime @@ -154,11 +154,11 @@ func (s *MemorySeries) firstUnflushedChunkTime() model.Time { // head returns a pointer to the head chunk descriptor. The caller must have // locked the fingerprint of the memorySeries. This method will panic if this // series has no chunk descriptors. -func (s *MemorySeries) head() *Desc { +func (s *memorySeries) head() *desc { return s.chunkDescs[len(s.chunkDescs)-1] } -func (s *MemorySeries) samplesForRange(from, through model.Time) ([]model.SamplePair, error) { +func (s *memorySeries) samplesForRange(from, through model.Time) ([]model.SamplePair, error) { // Find first chunk with start time after "from". fromIdx := sort.Search(len(s.chunkDescs), func(i int) bool { return s.chunkDescs[i].FirstTime.After(from) @@ -199,7 +199,7 @@ func (s *MemorySeries) samplesForRange(from, through model.Time) ([]model.Sample return values, nil } -func (s *MemorySeries) setChunks(descs []*Desc) error { +func (s *memorySeries) setChunks(descs []*desc) error { if len(s.chunkDescs) != 0 { return fmt.Errorf("series already has chunks") } @@ -211,19 +211,11 @@ func (s *MemorySeries) setChunks(descs []*Desc) error { return nil } -func (s *MemorySeries) GetChunks() []*Desc { - return s.chunkDescs -} - -func (s *MemorySeries) Metric() labels.Labels { - return s.metric -} - -func (s *MemorySeries) isStale() bool { +func (s *memorySeries) isStale() bool { return s.lastSampleValueSet && value.IsStaleNaN(float64(s.lastSampleValue)) } -type Desc struct { +type desc struct { C encoding.Chunk // nil if chunk is evicted. FirstTime model.Time // Timestamp of first sample. Populated at creation. Immutable. LastTime model.Time // Timestamp of last sample. Populated at creation & on append. @@ -232,8 +224,8 @@ type Desc struct { flushed bool // set to true when flush succeeds } -func newDesc(c encoding.Chunk, firstTime model.Time, lastTime model.Time) *Desc { - return &Desc{ +func newDesc(c encoding.Chunk, firstTime model.Time, lastTime model.Time) *desc { + return &desc{ C: c, FirstTime: firstTime, LastTime: lastTime, @@ -244,7 +236,7 @@ func newDesc(c encoding.Chunk, firstTime model.Time, lastTime model.Time) *Desc // Add adds a sample pair to the underlying chunk. For safe concurrent access, // The chunk must be pinned, and the caller must have locked the fingerprint of // the series. -func (d *Desc) add(s model.SamplePair) (encoding.Chunk, error) { +func (d *desc) add(s model.SamplePair) (encoding.Chunk, error) { cs, err := d.C.Add(s) if err != nil { return nil, err @@ -258,8 +250,8 @@ func (d *Desc) add(s model.SamplePair) (encoding.Chunk, error) { return cs, nil } -func (d *Desc) slice(start, end model.Time) *Desc { - return &Desc{ +func (d *desc) slice(start, end model.Time) *desc { + return &desc{ C: d.C.Slice(start, end), FirstTime: start, LastTime: end, diff --git a/pkg/ingester/series_map.go b/pkg/ingester/series_map.go index 7b7f5102d7f..a622ac910f8 100644 --- a/pkg/ingester/series_map.go +++ b/pkg/ingester/series_map.go @@ -22,15 +22,15 @@ type seriesMap struct { type shard struct { mtx sync.Mutex - m map[model.Fingerprint]*MemorySeries + m map[model.Fingerprint]*memorySeries // Align this struct. - _ [cacheLineSize - unsafe.Sizeof(sync.Mutex{}) - unsafe.Sizeof(map[model.Fingerprint]*MemorySeries{})]byte + _ [cacheLineSize - unsafe.Sizeof(sync.Mutex{}) - unsafe.Sizeof(map[model.Fingerprint]*memorySeries{})]byte } -// FingerprintSeriesPair pairs a fingerprint with a memorySeries pointer. -type FingerprintSeriesPair struct { - Fingerprint model.Fingerprint - Series *MemorySeries +// fingerprintSeriesPair pairs a fingerprint with a memorySeries pointer. +type fingerprintSeriesPair struct { + fp model.Fingerprint + series *memorySeries } // newSeriesMap returns a newly allocated empty seriesMap. To create a seriesMap @@ -38,7 +38,7 @@ type FingerprintSeriesPair struct { func newSeriesMap() *seriesMap { shards := make([]shard, seriesMapShards) for i := 0; i < seriesMapShards; i++ { - shards[i].m = map[model.Fingerprint]*MemorySeries{} + shards[i].m = map[model.Fingerprint]*memorySeries{} } return &seriesMap{ shards: shards, @@ -47,7 +47,7 @@ func newSeriesMap() *seriesMap { // get returns a memorySeries for a fingerprint. Return values have the same // semantics as the native Go map. -func (sm *seriesMap) get(fp model.Fingerprint) (*MemorySeries, bool) { +func (sm *seriesMap) get(fp model.Fingerprint) (*memorySeries, bool) { shard := &sm.shards[util.HashFP(fp)%seriesMapShards] shard.mtx.Lock() ms, ok := shard.m[fp] @@ -56,7 +56,7 @@ func (sm *seriesMap) get(fp model.Fingerprint) (*MemorySeries, bool) { } // put adds a mapping to the seriesMap. -func (sm *seriesMap) put(fp model.Fingerprint, s *MemorySeries) { +func (sm *seriesMap) put(fp model.Fingerprint, s *memorySeries) { shard := &sm.shards[util.HashFP(fp)%seriesMapShards] shard.mtx.Lock() _, ok := shard.m[fp] @@ -80,21 +80,21 @@ func (sm *seriesMap) del(fp model.Fingerprint) { } } -// Iter returns a channel that produces all mappings in the seriesMap. The +// iter returns a channel that produces all mappings in the seriesMap. The // channel will be closed once all fingerprints have been received. Not // consuming all fingerprints from the channel will leak a goroutine. The // semantics of concurrent modification of seriesMap is the similar as the one // for iterating over a map with a 'range' clause. However, if the next element // in iteration order is removed after the current element has been received // from the channel, it will still be produced by the channel. -func (sm *seriesMap) Iter() <-chan FingerprintSeriesPair { - ch := make(chan FingerprintSeriesPair) +func (sm *seriesMap) iter() <-chan fingerprintSeriesPair { + ch := make(chan fingerprintSeriesPair) go func() { for i := range sm.shards { sm.shards[i].mtx.Lock() for fp, ms := range sm.shards[i].m { sm.shards[i].mtx.Unlock() - ch <- FingerprintSeriesPair{fp, ms} + ch <- fingerprintSeriesPair{fp, ms} sm.shards[i].mtx.Lock() } sm.shards[i].mtx.Unlock() diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index eba1300eaa2..e1a1aeb11c9 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -67,7 +67,7 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e fromIngesterID := "" seriesReceived := 0 xfer := func() error { - userStates := NewUserStates(i.limiter, i.cfg, i.metrics) + userStates := newUserStates(i.limiter, i.cfg, i.metrics) for { wireSeries, err := stream.Recv() @@ -349,7 +349,7 @@ func (i *Ingester) TransferTSDB(stream client.Ingester_TransferTSDBServer) error } // The passed wireChunks slice is for re-use. -func toWireChunks(descs []*Desc, wireChunks []client.Chunk) ([]client.Chunk, error) { +func toWireChunks(descs []*desc, wireChunks []client.Chunk) ([]client.Chunk, error) { if cap(wireChunks) < len(descs) { wireChunks = make([]client.Chunk, 0, len(descs)) } @@ -372,10 +372,10 @@ func toWireChunks(descs []*Desc, wireChunks []client.Chunk) ([]client.Chunk, err return wireChunks, nil } -func fromWireChunks(wireChunks []client.Chunk) ([]*Desc, error) { - descs := make([]*Desc, 0, len(wireChunks)) +func fromWireChunks(wireChunks []client.Chunk) ([]*desc, error) { + descs := make([]*desc, 0, len(wireChunks)) for _, c := range wireChunks { - desc := &Desc{ + desc := &desc{ FirstTime: model.Time(c.StartTimestampMs), LastTime: model.Time(c.EndTimestampMs), LastUpdate: model.Now(), @@ -433,7 +433,7 @@ func (i *Ingester) transferOut(ctx context.Context) error { return i.v2TransferOut(ctx) } - userStatesCopy := i.userStates.Copy() + userStatesCopy := i.userStates.cp() if len(userStatesCopy) == 0 { level.Info(util.Logger).Log("msg", "nothing to transfer") return nil @@ -459,27 +459,27 @@ func (i *Ingester) transferOut(ctx context.Context) error { var chunks []client.Chunk for userID, state := range userStatesCopy { - for pair := range state.fpToSeries.Iter() { - state.fpLocker.Lock(pair.Fingerprint) + for pair := range state.fpToSeries.iter() { + state.fpLocker.Lock(pair.fp) - if len(pair.Series.chunkDescs) == 0 { // Nothing to send? - state.fpLocker.Unlock(pair.Fingerprint) + if len(pair.series.chunkDescs) == 0 { // Nothing to send? + state.fpLocker.Unlock(pair.fp) continue } - chunks, err = toWireChunks(pair.Series.chunkDescs, chunks) + chunks, err = toWireChunks(pair.series.chunkDescs, chunks) if err != nil { - state.fpLocker.Unlock(pair.Fingerprint) + state.fpLocker.Unlock(pair.fp) return errors.Wrap(err, "toWireChunks") } err = stream.Send(&client.TimeSeriesChunk{ FromIngesterId: i.lifecycler.ID, UserId: userID, - Labels: client.FromLabelsToLabelAdapters(pair.Series.metric), + Labels: client.FromLabelsToLabelAdapters(pair.series.metric), Chunks: chunks, }) - state.fpLocker.Unlock(pair.Fingerprint) + state.fpLocker.Unlock(pair.fp) if err != nil { return errors.Wrap(err, "Send") } diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 065dfd55fe2..bbe3b0843ab 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -22,16 +22,16 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) -// UserStates holds the userState object for all users (tenants), +// userStates holds the userState object for all users (tenants), // each one containing all the in-memory series for a given user. -type UserStates struct { +type userStates struct { states sync.Map limiter *SeriesLimiter cfg Config - metrics *Metrics + metrics *ingesterMetrics } -type UserState struct { +type userState struct { limiter *SeriesLimiter userID string fpLocker *fingerprintLocker @@ -62,41 +62,51 @@ type metricCounterShard struct { m map[string]int } -func NewUserStates(limiter *SeriesLimiter, cfg Config, metrics *Metrics) *UserStates { - return &UserStates{ +func newUserStates(limiter *SeriesLimiter, cfg Config, metrics *ingesterMetrics) *userStates { + return &userStates{ limiter: limiter, cfg: cfg, metrics: metrics, } } -func (us *UserStates) Copy() map[string]*UserState { - states := map[string]*UserState{} +func (us *userStates) cp() map[string]*userState { + states := map[string]*userState{} us.states.Range(func(key, value interface{}) bool { - states[key.(string)] = value.(*UserState) + states[key.(string)] = value.(*userState) return true }) return states } -func (us *UserStates) updateRates() { +func (us *userStates) gc() { us.states.Range(func(key, value interface{}) bool { - state := value.(*UserState) + state := value.(*userState) + if state.fpToSeries.length() == 0 { + us.states.Delete(key) + } + return true + }) +} + +func (us *userStates) updateRates() { + us.states.Range(func(key, value interface{}) bool { + state := value.(*userState) state.ingestedAPISamples.tick() state.ingestedRuleSamples.tick() return true }) } -func (us *UserStates) get(userID string) (*UserState, bool) { +func (us *userStates) get(userID string) (*userState, bool) { state, ok := us.states.Load(userID) if !ok { return nil, ok } - return state.(*UserState), ok + return state.(*userState), ok } -func (us *UserStates) getOrCreate(userID string) *UserState { +func (us *userStates) getOrCreate(userID string) *userState { state, ok := us.get(userID) if !ok { @@ -110,7 +120,7 @@ func (us *UserStates) getOrCreate(userID string) *UserState { // Speculatively create a userState object and try to store it // in the map. Another goroutine may have got there before // us, in which case this userState will be discarded - state = &UserState{ + state = &userState{ userID: userID, limiter: us.limiter, fpToSeries: newSeriesMap(), @@ -130,13 +140,13 @@ func (us *UserStates) getOrCreate(userID string) *UserState { if !ok { us.metrics.memUsers.Inc() } - state = stored.(*UserState) + state = stored.(*userState) } return state } -func (us *UserStates) getViaContext(ctx context.Context) (*UserState, bool, error) { +func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, error) { userID, err := user.ExtractOrgID(ctx) if err != nil { return nil, false, fmt.Errorf("no user id") @@ -145,13 +155,13 @@ func (us *UserStates) getViaContext(ctx context.Context) (*UserState, bool, erro return state, ok, nil } -func (us *UserStates) getOrCreateSeries(ctx context.Context, userID string, labels []client.LabelAdapter, record *Record) (*UserState, model.Fingerprint, *MemorySeries, error) { +func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labels []client.LabelAdapter, record *Record) (*userState, model.Fingerprint, *memorySeries, error) { state := us.getOrCreate(userID) fp, series, err := state.getSeries(labels, record) return state, fp, series, err } -func (u *UserState) getSeries(metric labelPairs, record *Record) (model.Fingerprint, *MemorySeries, error) { +func (u *userState) getSeries(metric labelPairs, record *Record) (model.Fingerprint, *memorySeries, error) { rawFP := client.FastFingerprint(metric) u.fpLocker.Lock(rawFP) fp := u.mapper.mapFP(rawFP, metric) @@ -174,7 +184,7 @@ func (u *UserState) getSeries(metric labelPairs, record *Record) (model.Fingerpr return fp, series, nil } -func (u *UserState) createSeriesWithFingerprint(fp model.Fingerprint, metric labelPairs, record *Record, recovery bool) (*MemorySeries, error) { +func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric labelPairs, record *Record, recovery bool) (*memorySeries, error) { // There's theoretically a relatively harmless race here if multiple // goroutines get the length of the series map at the same time, then // all proceed to add a new series. This is likely not worth addressing, @@ -216,7 +226,7 @@ func (u *UserState) createSeriesWithFingerprint(fp model.Fingerprint, metric lab return series, nil } -func (u *UserState) canAddSeriesFor(metric string) error { +func (u *userState) canAddSeriesFor(metric string) error { shard := &u.seriesInMetric[util.HashFP(model.Fingerprint(fnv1a.HashString64(string(metric))))%metricCounterShards] shard.mtx.Lock() defer shard.mtx.Unlock() @@ -230,7 +240,7 @@ func (u *UserState) canAddSeriesFor(metric string) error { return nil } -func (u *UserState) removeSeries(fp model.Fingerprint, metric labels.Labels) { +func (u *userState) removeSeries(fp model.Fingerprint, metric labels.Labels) { u.fpToSeries.del(fp) u.index.Delete(labels.Labels(metric), fp) @@ -262,8 +272,8 @@ func (u *UserState) removeSeries(fp model.Fingerprint, metric labels.Labels) { // - The `send` callback is called at certain intervals specified by batchSize // with no locks held, and is intended to be used by the caller to send the // built batches. -func (u *UserState) forSeriesMatching(ctx context.Context, allMatchers []*labels.Matcher, - add func(context.Context, model.Fingerprint, *MemorySeries) error, +func (u *userState) forSeriesMatching(ctx context.Context, allMatchers []*labels.Matcher, + add func(context.Context, model.Fingerprint, *memorySeries) error, send func(context.Context) error, batchSize int, ) error { log, ctx := spanlogger.New(ctx, "forSeriesMatching") @@ -318,14 +328,3 @@ outer: } return nil } - -// IterateSeries returns a channel that produces all mappings in the seriesMap. The -// channel will be closed once all fingerprints have been received. Not -// consuming all fingerprints from the channel will leak a goroutine. The -// semantics of concurrent modification of seriesMap is the similar as the one -// for iterating over a map with a 'range' clause. However, if the next element -// in iteration order is removed after the current element has been received -// from the channel, it will still be produced by the channel. -func (u *UserState) IterateSeries() <-chan FingerprintSeriesPair { - return u.fpToSeries.Iter() -} diff --git a/pkg/ingester/user_state_test.go b/pkg/ingester/user_state_test.go index b614cd0f6aa..7899c78c159 100644 --- a/pkg/ingester/user_state_test.go +++ b/pkg/ingester/user_state_test.go @@ -52,7 +52,7 @@ func TestForSeriesMatchingBatching(t *testing.T) { total, batch, batches := 0, 0, 0 err = instance.forSeriesMatching(ctx, tc.matchers, - func(_ context.Context, _ model.Fingerprint, s *MemorySeries) error { + func(_ context.Context, _ model.Fingerprint, s *memorySeries) error { batch++ return nil }, diff --git a/pkg/ingester/wal.go b/pkg/ingester/wal.go index f545ea68e23..22e3a67c74f 100644 --- a/pkg/ingester/wal.go +++ b/pkg/ingester/wal.go @@ -65,7 +65,7 @@ type walWrapper struct { wait sync.WaitGroup wal *wal.WAL - getUserStates func() map[string]*UserState + getUserStates func() map[string]*userState // Checkpoint metrics. checkpointDeleteFail prometheus.Counter @@ -235,10 +235,10 @@ func (w *walWrapper) performCheckpoint() (err error) { var wireChunkBuf []client.Chunk for userID, state := range w.getUserStates() { - for pair := range state.fpToSeries.Iter() { - state.fpLocker.Lock(pair.Fingerprint) - wireChunkBuf, err = w.checkpointSeries(checkpoint, userID, pair.Fingerprint, pair.Series, wireChunkBuf) - state.fpLocker.Unlock(pair.Fingerprint) + for pair := range state.fpToSeries.iter() { + state.fpLocker.Lock(pair.fp) + wireChunkBuf, err = w.checkpointSeries(checkpoint, userID, pair.fp, pair.series, wireChunkBuf) + state.fpLocker.Unlock(pair.fp) if err != nil { return err } @@ -338,7 +338,7 @@ func (w *walWrapper) deleteCheckpoints(maxIndex int) (err error) { } // checkpointSeries write the chunks of the series to the checkpoint. -func (w *walWrapper) checkpointSeries(cp *wal.WAL, userID string, fp model.Fingerprint, series *MemorySeries, wireChunks []client.Chunk) ([]client.Chunk, error) { +func (w *walWrapper) checkpointSeries(cp *wal.WAL, userID string, fp model.Fingerprint, series *memorySeries, wireChunks []client.Chunk) ([]client.Chunk, error) { var err error wireChunks, err = toWireChunks(series.chunkDescs, wireChunks[:0]) if err != nil { @@ -358,10 +358,10 @@ func (w *walWrapper) checkpointSeries(cp *wal.WAL, userID string, fp model.Finge return wireChunks, cp.Log(buf) } -func RecoverFromWAL(ingester *Ingester) (err error) { +func recoverFromWAL(ingester *Ingester) (err error) { walDir := ingester.cfg.WALConfig.Dir // Use a local userStates, so we don't need to worry about locking. - userStates := NewUserStates(ingester.limiter, ingester.cfg, ingester.metrics) + userStates := newUserStates(ingester.limiter, ingester.cfg, ingester.metrics) defer func() { if err == nil { @@ -377,11 +377,11 @@ func RecoverFromWAL(ingester *Ingester) (err error) { } nWorkers := runtime.GOMAXPROCS(0) - stateCache := make([]map[string]*UserState, nWorkers) - seriesCache := make([]map[string]map[uint64]*MemorySeries, nWorkers) + stateCache := make([]map[string]*userState, nWorkers) + seriesCache := make([]map[string]map[uint64]*memorySeries, nWorkers) for i := 0; i < nWorkers; i++ { - stateCache[i] = make(map[string]*UserState) - seriesCache[i] = make(map[string]map[uint64]*MemorySeries) + stateCache[i] = make(map[string]*userState) + seriesCache[i] = make(map[string]map[uint64]*memorySeries) } if idx >= 0 { @@ -432,8 +432,8 @@ func segmentsExist(dir string) (bool, error) { } // processCheckpoint loads the chunks of the series present in the last checkpoint. -func processCheckpoint(name string, userStates *UserStates, nWorkers int, - stateCache []map[string]*UserState, seriesCache []map[string]map[uint64]*MemorySeries) error { +func processCheckpoint(name string, userStates *userStates, nWorkers int, + stateCache []map[string]*userState, seriesCache []map[string]map[uint64]*memorySeries) error { reader, closer, err := newWalReader(name, -1) if err != nil { @@ -457,7 +457,7 @@ func processCheckpoint(name string, userStates *UserStates, nWorkers int, wg.Add(nWorkers) for i := 0; i < nWorkers; i++ { inputs[i] = make(chan *Series, 300) - go func(input <-chan *Series, stateCache map[string]*UserState, seriesCache map[string]map[uint64]*MemorySeries) { + go func(input <-chan *Series, stateCache map[string]*userState, seriesCache map[string]map[uint64]*memorySeries) { processCheckpointRecord(userStates, seriesPool, stateCache, seriesCache, input, errChan) wg.Done() }(inputs[i], stateCache[i], seriesCache[i]) @@ -523,15 +523,15 @@ func copyLabelAdapters(las []client.LabelAdapter) []client.LabelAdapter { return las } -func processCheckpointRecord(userStates *UserStates, seriesPool *sync.Pool, stateCache map[string]*UserState, - seriesCache map[string]map[uint64]*MemorySeries, seriesChan <-chan *Series, errChan chan error) { +func processCheckpointRecord(userStates *userStates, seriesPool *sync.Pool, stateCache map[string]*userState, + seriesCache map[string]map[uint64]*memorySeries, seriesChan <-chan *Series, errChan chan error) { var la []client.LabelAdapter for s := range seriesChan { state, ok := stateCache[s.UserId] if !ok { state = userStates.getOrCreate(s.UserId) stateCache[s.UserId] = state - seriesCache[s.UserId] = make(map[uint64]*MemorySeries) + seriesCache[s.UserId] = make(map[uint64]*memorySeries) } la = la[:0] @@ -570,8 +570,8 @@ type samplesWithUserID struct { } // processWAL processes the records in the WAL concurrently. -func processWAL(name string, startSegment int, userStates *UserStates, nWorkers int, - stateCache []map[string]*UserState, seriesCache []map[string]map[uint64]*MemorySeries) error { +func processWAL(name string, startSegment int, userStates *userStates, nWorkers int, + stateCache []map[string]*userState, seriesCache []map[string]map[uint64]*memorySeries) error { reader, closer, err := newWalReader(name, startSegment) if err != nil { @@ -596,7 +596,7 @@ func processWAL(name string, startSegment int, userStates *UserStates, nWorkers shards[i] = &samplesWithUserID{} go func(input <-chan *samplesWithUserID, output chan<- *samplesWithUserID, - stateCache map[string]*UserState, seriesCache map[string]map[uint64]*MemorySeries) { + stateCache map[string]*userState, seriesCache map[string]map[uint64]*memorySeries) { processWALSamples(userStates, stateCache, seriesCache, input, output, errChan) wg.Done() }(inputs[i], outputs[i], stateCache[i], seriesCache[i]) @@ -704,7 +704,7 @@ Loop: } } -func processWALSamples(userStates *UserStates, stateCache map[string]*UserState, seriesCache map[string]map[uint64]*MemorySeries, +func processWALSamples(userStates *userStates, stateCache map[string]*userState, seriesCache map[string]map[uint64]*memorySeries, input <-chan *samplesWithUserID, output chan<- *samplesWithUserID, errChan chan error) { defer close(output) @@ -714,7 +714,7 @@ func processWALSamples(userStates *UserStates, stateCache map[string]*UserState, if !ok { state = userStates.getOrCreate(samples.userID) stateCache[samples.userID] = state - seriesCache[samples.userID] = make(map[uint64]*MemorySeries) + seriesCache[samples.userID] = make(map[uint64]*memorySeries) } sc := seriesCache[samples.userID] for i := range samples.samples { From a20527d17ad42a952766f853c023d7e2eba96173 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Thu, 5 Mar 2020 15:50:47 +0530 Subject: [PATCH 3/6] Adapt services model for flusher Signed-off-by: Ganesh Vernekar --- cmd/cortex/main.go | 22 ++-------- pkg/cortex/cortex.go | 10 +++-- pkg/cortex/module_service_wrapper.go | 5 +-- pkg/cortex/modules.go | 2 - pkg/flusher/flusher.go | 63 +++++++++++++++++++++------- pkg/ingester/ingester.go | 4 +- pkg/util/errors.go | 6 +++ 7 files changed, 68 insertions(+), 44 deletions(-) create mode 100644 pkg/util/errors.go diff --git a/cmd/cortex/main.go b/cmd/cortex/main.go index 7f81bb7237b..6153cb023df 100644 --- a/cmd/cortex/main.go +++ b/cmd/cortex/main.go @@ -8,7 +8,6 @@ import ( "os" "runtime" "strings" - "sync" "time" "github.com/go-kit/kit/log/level" @@ -106,24 +105,9 @@ func main() { level.Info(util.Logger).Log("msg", "Starting Cortex", "version", version.Info()) - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - if err := t.Run(); err != nil { - level.Error(util.Logger).Log("msg", "error running Cortex", "err", err) - } - }() - - // if cfg.Target.IsJob() { - // err = t.Stop() - // } - - wg.Wait() - - // if !cfg.Target.IsJob() { - // err = t.Stop() - // } + if err := t.Run(); err != nil { + level.Error(util.Logger).Log("msg", "error running Cortex", "err", err) + } runtime.KeepAlive(ballast) } diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 37f30b86709..87911da9cef 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -86,8 +86,8 @@ type Config struct { QueryRange queryrange.Config `yaml:"query_range,omitempty"` TableManager chunk.TableManagerConfig `yaml:"table_manager,omitempty"` Encoding encoding.Config `yaml:"-"` // No yaml for this, it only works with flags. - TSDB tsdb.Config `yaml:"tsdb" doc:"hidden"` - Compactor compactor.Config `yaml:"compactor,omitempty" doc:"hidden"` + TSDB tsdb.Config `yaml:"tsdb"` + Compactor compactor.Config `yaml:"compactor,omitempty"` DataPurgerConfig purger.Config `yaml:"purger,omitempty"` Ruler ruler.Config `yaml:"ruler,omitempty"` @@ -322,7 +322,11 @@ func (t *Cortex) Run() error { // let's find out which module failed for m, s := range t.serviceMap { if s == service { - level.Error(util.Logger).Log("msg", "module failed", "module", m, "error", service.FailureCase()) + if service.FailureCase() == util.ErrStopCortex { + level.Info(util.Logger).Log("msg", "received stop signal via return error", "module", m, "error", service.FailureCase()) + } else { + level.Error(util.Logger).Log("msg", "module failed", "module", m, "error", service.FailureCase()) + } return } } diff --git a/pkg/cortex/module_service_wrapper.go b/pkg/cortex/module_service_wrapper.go index c3f060aab35..56c08e6cb57 100644 --- a/pkg/cortex/module_service_wrapper.go +++ b/pkg/cortex/module_service_wrapper.go @@ -85,9 +85,8 @@ func (w *moduleServiceWrapper) stop() error { level.Debug(util.Logger).Log("msg", "stopping", "module", w.module) - w.service.StopAsync() - err := w.service.AwaitTerminated(context.Background()) - if err != nil { + err := services.StopAndAwaitTerminated(context.Background(), w.service) + if err != nil && err != util.ErrStopCortex { level.Warn(util.Logger).Log("msg", "error stopping module", "module", w.module, "err", err) } else { level.Info(util.Logger).Log("msg", "module stopped", "module", w.module) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 2c1464ac19d..0969d3e42bc 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -383,8 +383,6 @@ func (t *Cortex) initIngester(cfg *Config) (serv services.Service, err error) { } func (t *Cortex) initFlusher(cfg *Config) (serv services.Service, err error) { - // By the end of this call, the chunks should be recovered - // from the WAL and flushed. t.flusher, err = flusher.New( cfg.Flusher, cfg.Ingester, diff --git a/pkg/flusher/flusher.go b/pkg/flusher/flusher.go index 68e711a3d07..434dfbdbddb 100644 --- a/pkg/flusher/flusher.go +++ b/pkg/flusher/flusher.go @@ -1,17 +1,20 @@ package flusher import ( + "context" "flag" "fmt" "net/http" "time" "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/cortexproject/cortex/pkg/ingester" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/services" ) // Config for an Ingester. @@ -30,7 +33,15 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { // Flusher is designed to be used as a job to flush the chunks from the WAL on disk. type Flusher struct { - ing *ingester.Ingester + services.Service + + cfg Config + ingesterConfig ingester.Config + clientConfig client.Config + chunkStore ingester.ChunkStore + registerer prometheus.Registerer + + ingester *ingester.Ingester } const ( @@ -48,33 +59,55 @@ func New( ) (*Flusher, error) { ingesterConfig.WALConfig.Dir = cfg.WALDir - - ing, err := ingester.NewForFlusher(ingesterConfig, clientConfig, chunkStore, registerer) - if err != nil { - return nil, err + ingesterConfig.ConcurrentFlushes = cfg.ConcurrentFlushes + ingesterConfig.FlushOpTimeout = cfg.FlushOpTimeout + + f := &Flusher{ + cfg: cfg, + ingesterConfig: ingesterConfig, + clientConfig: clientConfig, + chunkStore: chunkStore, + registerer: registerer, } + f.Service = services.NewBasicService(f.starting, f.running, f.stopping) + return f, nil +} - return &Flusher{ - ing: ing, - }, err +func (f *Flusher) starting(ctx context.Context) error { + // WAL replay happens here. We have it in the starting function and not New + // so that metrics can be collected in parallel with WAL replay. + var err error + f.ingester, err = ingester.NewForFlusher(f.ingesterConfig, f.clientConfig, f.chunkStore, f.registerer) + return err } -func (f *Flusher) Flush() { - f.ing.Flush() +func (f *Flusher) running(ctx context.Context) error { + if err := f.ingester.StartAsync(ctx); err != nil { + return errors.Wrap(err, "start ingester") + } + if err := f.ingester.AwaitRunning(ctx); err != nil { + return errors.Wrap(err, "awaing running ingester") + } + + f.ingester.Flush() // Sleeping to give a chance to Prometheus // to collect the metrics. level.Info(util.Logger).Log("msg", fmt.Sprintf("sleeping for %s to give chance for collection of metrics", postFlushSleepTime.String())) time.Sleep(postFlushSleepTime) - f.ing.Shutdown() + f.ingester.StopAsync() + if err := f.ingester.AwaitTerminated(ctx); err != nil { + return err + } + return util.ErrStopCortex +} +func (f *Flusher) stopping() error { + // Nothing to do here. + return nil } // ReadinessHandler returns 204 always. func (f *Flusher) ReadinessHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } - -func (f *Flusher) Close() error { - return nil -} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 078cf422cd6..455e194d19c 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -252,11 +252,11 @@ func NewForFlusher(cfg Config, clientConfig client.Config, chunkStore ChunkStore return nil, err } - i.Service = services.NewBasicService(i.starting, i.loop, i.stopping) + i.Service = services.NewBasicService(i.startingForFlusher, i.loop, i.stopping) return i, nil } -func (i *Ingester) startingFlusher(ctx context.Context) error { +func (i *Ingester) startingForFlusher(ctx context.Context) error { i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes) for j := 0; j < i.cfg.ConcurrentFlushes; j++ { i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueueLength) diff --git a/pkg/util/errors.go b/pkg/util/errors.go new file mode 100644 index 00000000000..b830c0bc59e --- /dev/null +++ b/pkg/util/errors.go @@ -0,0 +1,6 @@ +package util + +import "errors" + +// ErrStopCortex is the error returned by a service as a hint to stop the Cortex server entirely. +var ErrStopCortex = errors.New("stop cortex") From 574c43b7c6520a48d468715e50fef8c4b0ed9ce6 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Tue, 10 Mar 2020 13:01:02 +0530 Subject: [PATCH 4/6] Fix review comments Signed-off-by: Ganesh Vernekar --- pkg/cortex/modules.go | 1 - pkg/flusher/flusher.go | 19 +++++-------------- pkg/ingester/ingester.go | 41 +++++++++++++++++++++------------------- 3 files changed, 27 insertions(+), 34 deletions(-) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 0969d3e42bc..5e7ceca0127 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -394,7 +394,6 @@ func (t *Cortex) initFlusher(cfg *Config) (serv services.Service, err error) { return } - t.server.HTTP.Path("/ready").Handler(http.HandlerFunc(t.flusher.ReadinessHandler)) return t.flusher, nil } diff --git a/pkg/flusher/flusher.go b/pkg/flusher/flusher.go index 434dfbdbddb..942d4e6ef70 100644 --- a/pkg/flusher/flusher.go +++ b/pkg/flusher/flusher.go @@ -4,7 +4,6 @@ import ( "context" "flag" "fmt" - "net/http" "time" "github.com/go-kit/kit/log/level" @@ -82,11 +81,8 @@ func (f *Flusher) starting(ctx context.Context) error { } func (f *Flusher) running(ctx context.Context) error { - if err := f.ingester.StartAsync(ctx); err != nil { - return errors.Wrap(err, "start ingester") - } - if err := f.ingester.AwaitRunning(ctx); err != nil { - return errors.Wrap(err, "awaing running ingester") + if err := services.StartAndAwaitRunning(ctx, f.ingester); err != nil { + return errors.Wrap(err, "start and await running ingester") } f.ingester.Flush() @@ -96,18 +92,13 @@ func (f *Flusher) running(ctx context.Context) error { level.Info(util.Logger).Log("msg", fmt.Sprintf("sleeping for %s to give chance for collection of metrics", postFlushSleepTime.String())) time.Sleep(postFlushSleepTime) - f.ingester.StopAsync() - if err := f.ingester.AwaitTerminated(ctx); err != nil { - return err + if err := services.StopAndAwaitTerminated(ctx, f.ingester); err != nil { + return errors.Wrap(err, "stop and await terminated ingester") } return util.ErrStopCortex } + func (f *Flusher) stopping() error { // Nothing to do here. return nil } - -// ReadinessHandler returns 204 always. -func (f *Flusher) ReadinessHandler(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusNoContent) -} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 455e194d19c..c782d5aeceb 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -208,16 +208,24 @@ func (i *Ingester) starting(ctx context.Context) error { return errors.Wrap(err, "failed to start lifecycler") } + i.startFlushLoops() + + return nil +} + +func (i *Ingester) startFlushLoops() { i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes) for j := 0; j < i.cfg.ConcurrentFlushes; j++ { i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueueLength) go i.flushLoop(j) } - - return nil } // NewForFlusher constructs a new Ingester to be used by flusher target. +// Compared to the 'New' method: +// * Always replays the WAL. +// * Does not start the lifecycler. +// * No ingester v2. func NewForFlusher(cfg Config, clientConfig client.Config, chunkStore ChunkStore, registerer prometheus.Registerer) (*Ingester, error) { if cfg.ingesterClientFactory == nil { cfg.ingesterClientFactory = client.MakeIngesterClient @@ -231,19 +239,6 @@ func NewForFlusher(cfg Config, clientConfig client.Config, chunkStore ChunkStore flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), } - level.Info(util.Logger).Log("msg", "recovering from WAL") - - // We recover from WAL always. - start := time.Now() - if err := recoverFromWAL(i); err != nil { - level.Error(util.Logger).Log("msg", "failed to recover from WAL", "time", time.Since(start).String()) - return nil, err - } - elapsed := time.Since(start) - - level.Info(util.Logger).Log("msg", "recovery from WAL completed", "time", elapsed.String()) - i.metrics.walReplayDuration.Set(elapsed.Seconds()) - // Should be a noop WAL. cfg.WALConfig.WALEnabled = false var err error @@ -257,12 +252,20 @@ func NewForFlusher(cfg Config, clientConfig client.Config, chunkStore ChunkStore } func (i *Ingester) startingForFlusher(ctx context.Context) error { - i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes) - for j := 0; j < i.cfg.ConcurrentFlushes; j++ { - i.flushQueues[j] = util.NewPriorityQueue(i.metrics.flushQueueLength) - go i.flushLoop(j) + level.Info(util.Logger).Log("msg", "recovering from WAL") + + // We recover from WAL always. + start := time.Now() + if err := recoverFromWAL(i); err != nil { + level.Error(util.Logger).Log("msg", "failed to recover from WAL", "time", time.Since(start).String()) + return err } + elapsed := time.Since(start) + + level.Info(util.Logger).Log("msg", "recovery from WAL completed", "time", elapsed.String()) + i.metrics.walReplayDuration.Set(elapsed.Seconds()) + i.startFlushLoops() return nil } From 02fddf5658f20d37089446826b2f77a0fd3074fe Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Tue, 10 Mar 2020 14:24:10 +0530 Subject: [PATCH 5/6] Ignore util.ErrStopCortex in cortex Run() return error Signed-off-by: Ganesh Vernekar --- pkg/cortex/cortex.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 1f6f6a80cc0..f43c3457e23 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -362,8 +362,13 @@ func (t *Cortex) Run() error { // if any service failed, report that as an error to caller if err == nil { if failed := sm.ServicesByState()[services.Failed]; len(failed) > 0 { - // Details were reported via failure listener before - err = errors.New("failed services") + for _, f := range failed { + if f.FailureCase() != util.ErrStopCortex { + // Details were reported via failure listener before + err = errors.New("failed services") + break + } + } } } return err From 3922604992798443e2e0430495409c3c1f2b2363 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Tue, 10 Mar 2020 16:14:35 +0530 Subject: [PATCH 6/6] Fix review comments Signed-off-by: Ganesh Vernekar --- pkg/cortex/modules.go | 9 --------- pkg/flusher/flusher.go | 31 ++++++++++--------------------- pkg/ingester/ingester.go | 9 +-------- 3 files changed, 11 insertions(+), 38 deletions(-) diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 09f616c03d6..5bdfcdbd217 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -184,15 +184,6 @@ func (m *moduleName) UnmarshalYAML(unmarshal func(interface{}) error) error { return m.Set(s) } -func (m moduleName) IsJob() bool { - switch m { - case Flusher: - return true - } - - return false -} - func (t *Cortex) initServer(cfg *Config) (services.Service, error) { serv, err := server.New(cfg.Server) if err != nil { diff --git a/pkg/flusher/flusher.go b/pkg/flusher/flusher.go index 370498d6806..c5f7fe71cf0 100644 --- a/pkg/flusher/flusher.go +++ b/pkg/flusher/flusher.go @@ -3,7 +3,6 @@ package flusher import ( "context" "flag" - "fmt" "time" "github.com/go-kit/kit/log/level" @@ -39,8 +38,6 @@ type Flusher struct { clientConfig client.Config chunkStore ingester.ChunkStore registerer prometheus.Registerer - - ingester *ingester.Ingester } const ( @@ -68,37 +65,29 @@ func New( chunkStore: chunkStore, registerer: registerer, } - f.Service = services.NewBasicService(f.starting, f.running, f.stopping) + f.Service = services.NewBasicService(nil, f.running, nil) return f, nil } -func (f *Flusher) starting(ctx context.Context) error { - // WAL replay happens here. We have it in the starting function and not New - // so that metrics can be collected in parallel with WAL replay. - var err error - f.ingester, err = ingester.NewForFlusher(f.ingesterConfig, f.clientConfig, f.chunkStore, f.registerer) - return err -} - func (f *Flusher) running(ctx context.Context) error { - if err := services.StartAndAwaitRunning(ctx, f.ingester); err != nil { + ing, err := ingester.NewForFlusher(f.ingesterConfig, f.clientConfig, f.chunkStore, f.registerer) + if err != nil { + return errors.Wrap(err, "create ingester") + } + + if err := services.StartAndAwaitRunning(ctx, ing); err != nil { return errors.Wrap(err, "start and await running ingester") } - f.ingester.Flush() + ing.Flush() // Sleeping to give a chance to Prometheus // to collect the metrics. - level.Info(util.Logger).Log("msg", fmt.Sprintf("sleeping for %s to give chance for collection of metrics", postFlushSleepTime.String())) + level.Info(util.Logger).Log("msg", "sleeping to give chance for collection of metrics", "duration", postFlushSleepTime.String()) time.Sleep(postFlushSleepTime) - if err := services.StopAndAwaitTerminated(ctx, f.ingester); err != nil { + if err := services.StopAndAwaitTerminated(ctx, ing); err != nil { return errors.Wrap(err, "stop and await terminated ingester") } return util.ErrStopCortex } - -func (f *Flusher) stopping(_ error) error { - // Nothing to do here. - return nil -} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index fc6a1f03300..8b9ff09b29d 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -237,14 +237,7 @@ func NewForFlusher(cfg Config, clientConfig client.Config, chunkStore ChunkStore metrics: newIngesterMetrics(registerer, true), chunkStore: chunkStore, flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), - } - - // Should be a noop WAL. - cfg.WALConfig.WALEnabled = false - var err error - i.wal, err = newWAL(cfg.WALConfig, i.userStates.cp) - if err != nil { - return nil, err + wal: &noopWAL{}, } i.Service = services.NewBasicService(i.startingForFlusher, i.loop, i.stopping)