diff --git a/Makefile b/Makefile index 4cb9016e1c2..2a760fd3803 100644 --- a/Makefile +++ b/Makefile @@ -49,12 +49,14 @@ $(foreach exe, $(EXES), $(eval $(call dep_exe, $(exe)))) # Manually declared dependencies And what goes into each exe pkg/ingester/client/cortex.pb.go: pkg/ingester/client/cortex.proto +pkg/ingester/wal.pb.go: pkg/ingester/wal.proto pkg/ring/ring.pb.go: pkg/ring/ring.proto pkg/querier/frontend/frontend.pb.go: pkg/querier/frontend/frontend.proto pkg/querier/queryrange/queryrange.pb.go: pkg/querier/queryrange/queryrange.proto pkg/chunk/storage/caching_index_client.pb.go: pkg/chunk/storage/caching_index_client.proto pkg/distributor/ha_tracker.pb.go: pkg/distributor/ha_tracker.proto pkg/ruler/rules/rules.pb.go: pkg/ruler/rules/rules.proto + all: $(UPTODATE_FILES) test: protos mod-check: protos diff --git a/docs/configuration/arguments.md b/docs/configuration/arguments.md index d129adbe712..c6d236d448e 100644 --- a/docs/configuration/arguments.md +++ b/docs/configuration/arguments.md @@ -305,6 +305,24 @@ It also talks to a KVStore and has it's own copies of the same flags used by the Where you don't want to cache every chunk written by ingesters, but you do want to take advantage of chunk write deduplication, this option will make ingesters write a placeholder to the cache for each chunk. Make sure you configure ingesters with a different cache to queriers, which need the whole value. +#### WAL + +- `--ingester.wal-dir` + Directory where the WAL data should be stores and/or recovered from. + +- `--ingester.wal-enabled` + + Setting this to `true` enables writing to WAL during ingestion. + +- `--ingester.checkpoint-enabled` + Set this to `true` to enable checkpointing of in-memory chunks to disk. This is optional which helps in speeding up the replay process. + +- `--ingester.checkpoint-duration` + This is the interval at which checkpoints should be created. + +- `--ingester.recover-from-wal` + Set this to to `true` to recover data from an existing WAL. The data is recovered even if WAL is disabled and this is set to `true`. The WAL dir needs to be set for this. + ## Runtime Configuration file Cortex has a concept of "runtime config" file, which is simply a file that is reloaded while Cortex is running. It is used by some Cortex components to allow operator to change some aspects of Cortex configuration without restarting it. File is specified by using `-runtime-config.file=` flag and reload period (which defaults to 10 seconds) can be changed by `-runtime-config.reload-period=` flag. Previously this mechanism was only used by limits overrides, and flags were called `-limits.per-user-override-config=` and `-limits.per-user-override-period=10s` respectively. These are still used, if `-runtime-config.file=` is not specified. diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 50235707e78..d295e67409f 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -323,6 +323,27 @@ ring: The `ingester_config` configures the Cortex ingester. ```yaml +walconfig: + # Enable writing of ingested data into WAL. + # CLI flag: -ingester.wal-enabled + [wal_enabled: | default = false] + + # Enable checkpointing of in-memory chunks. + # CLI flag: -ingester.checkpoint-enabled + [checkpoint_enabled: | default = false] + + # Recover data from existing WAL irrespective of WAL enabled/disabled. + # CLI flag: -ingester.recover-from-wal + [recover_from_wal: | default = false] + + # Directory to store the WAL and/or recover from WAL. + # CLI flag: -ingester.wal-dir + [wal_dir: | default = "wal"] + + # Interval at which checkpoints should be created. + # CLI flag: -ingester.checkpoint-duration + [checkpoint_duration: | default = 30m0s] + lifecycler: ring: kvstore: diff --git a/docs/guides/ingesters-with-wal.md b/docs/guides/ingesters-with-wal.md new file mode 100644 index 00000000000..2c9fe28bffe --- /dev/null +++ b/docs/guides/ingesters-with-wal.md @@ -0,0 +1,76 @@ +--- +title: "Ingesters with WAL" +linkTitle: "Ingesters with WAL" +weight: 5 +slug: ingesters-with-wal +--- + +Currently the ingesters running in the chunks storage mode, store all their data in memory. If there is a crash, there could be loss of data. WAL helps fill this gap in reliability. + +To use WAL, there are some changes that needs to be made in the deployment. + +## Changes to deployment + +1. Since ingesters need to have the same persistent volume across restarts/rollout, all the ingesters should be run on [statefulset](https://kubernetes.io/docs/concepts/workloads/controllers/statefulset/) with fixed volumes. + +2. Following flags needs to be set + * `--ingester.wal-dir` to the directory where the WAL data should be stores and/or recovered from. Note that this should be on the mounted volume. + * `--ingester.wal-enabled` to `true` which enables writing to WAL during ingestion. + * `--ingester.checkpoint-enabled` to `true` to enable checkpointing of in-memory chunks to disk. This is optional which helps in speeding up the replay process. + * `--ingester.checkpoint-duration` to the interval at which checkpoints should be created. Default is `30m`, and depending on the number of series, it can be brought down to `15m` if there are less series per ingester (say 1M). + * `--ingester.recover-from-wal` to `true` to recover data from an existing WAL. The data is recovered even if WAL is disabled and this is set to `true`. The WAL dir needs to be set for this. + * If you are going to enable WAL, it is advisable to always set this to `true`. + * `--ingester.tokens-file-path` should be set to the filepath where the tokens should be stored. Note that this should be on the mounted volume. Why this is required is described below. + +## Changes in lifecycle when WAL is enabled + +1. Flushing of data to chunk store during rollouts or scale down is disabled. This is because during a rollout of statefulset there are no ingesters that are simultaneously leaving and joining, rather the same ingester is shut down and brought back again with updated config. Hence flushing is skipped and the data is recovered from the WAL. + +2. As there are no transfers between ingesters, the tokens are stored and recovered from disk between rollout/restarts. This is [not a new thing](https://github.com/cortexproject/cortex/pull/1750) but it is effective when using statefulsets. + +## Migrating from stateless deployments + +The ingester _deployment without WAL_ and _statefulset with WAL_ should be scaled down and up respectively in sync without transfer of data between them to ensure that any ingestion after migration is reliable immediately. + +Let's take an example of 4 ingesters. The migration would look something like this: + +1. Bring up one stateful ingester `ingester-0` and wait till it's ready (accepting read and write requests). +2. Scale down old ingester deployment to 3 and wait till the leaving ingester flushes all the data to chunk store. +3. Once that ingester has disappeared from `kc get pods ...`, add another stateful ingester and wait till it's ready. This assures not transfer. Now you have `ingester-0 ingester-1`. +4. Repeat step 2 to reduce remove another ingester from old deployment. +5. Repeat step 3 to add another stateful ingester. Now you have `ingester-0 ingester-1 ingester-2`. +6. Repeat step 4 and 5, and now you will finally have `ingester-0 ingester-1 ingester-2 ingester-3`. + +## How to scale up/down + +### Scale up + +Scaling up is same as what you would do without WAL or statefulsets. Nothing to change here. + +### Scale down + +Since Kubernetes doesn't differentiate between rollout and scale down when sending a signal, the flushing of chunks is disabled by default. Hence the only thing to take care during scale down is flushing of chunks. + +There are 2 ways to do it, with the latter being a fallback option. + +**First option** +Consider you have 4 ingesters `ingester-0 ingester-1 ingester-2 ingester-3` and you want to scale down to 2 ingesters, the ingesters which will be shutdown according to statefulset rules are `ingester-3` and then `ingester-2`. + +Hence before actually scaling down in Kubernetes, port forward those ingesters and hit the [`/shutdown`](https://github.com/cortexproject/cortex/pull/1746) endpoint. This will flush the chunks and shut down the ingesters (while also removing itself from the ring). + +After hitting the endpoint for `ingester-2 ingester-3`, scale down the ingesters to 2. + +PS: Given you have to scale down 1 ingester at a time, you can pipeline the shutdown and scaledown process instead of hitting shutdown endpoint for all to-be-scaled-down ingesters at the same time. + +**Fallback option** + +There is a [flush mode ingester](https://github.com/cortexproject/cortex/pull/1747) in progress, and with recent discussions there will be a separate target called flusher in it's place. + +You can run it as a kubernetes job which will +* Attach to the volume of the scaled down ingester +* Recover from the WAL +* And flush all the chunks. + +This job is to be run for all the ingesters that you missed hitting the shutdown endpoint as a first option. + +More info about the flusher target will be added once it's upstream. \ No newline at end of file diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index eea70985655..0eaec36d44d 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -188,7 +188,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove if !canJoinDistributorsRing { ingestionRateStrategy = newInfiniteIngestionRateStrategy() } else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy { - distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey) + distributorsRing, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ring.DistributorRingKey, true) if err != nil { return nil, err } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 65c112114fc..2c8a32327dd 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -35,8 +35,14 @@ const ( queryStreamBatchSize = 128 ) +var ( + // This is initialised if the WAL is enabled and the records are fetched from this pool. + recordPool sync.Pool +) + // Config for an Ingester. type Config struct { + WALConfig WALConfig LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"` // Config for transferring chunks. Zero or negative = no retries. @@ -70,6 +76,7 @@ type Config struct { // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.LifecyclerConfig.RegisterFlags(f) + cfg.WALConfig.RegisterFlags(f) f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing. Negative value or zero disables hand-over.") f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-period", 1*time.Minute, "Period with which to attempt to flush chunks.") @@ -109,6 +116,9 @@ type Ingester struct { flushQueues []*util.PriorityQueue flushQueuesDone sync.WaitGroup + // This should never be nil. + wal WAL + // Hook for injecting behaviour from tests. preFlushUserSeries func() @@ -131,6 +141,19 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c return NewV2(cfg, clientConfig, limits, registerer) } + if cfg.WALConfig.walEnabled { + // If WAL is enabled, we don't transfer out the data to any ingester. + // Either the next ingester which takes it's place should recover from WAL + // or the data has to be flushed during scaledown. + cfg.MaxTransferRetries = 0 + + recordPool = sync.Pool{ + New: func() interface{} { + return &Record{} + }, + } + } + i := &Ingester{ cfg: cfg, clientConfig: clientConfig, @@ -142,14 +165,36 @@ func New(cfg Config, clientConfig client.Config, limits *validation.Overrides, c } var err error - i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey) + // During WAL recovery, it will create new user states which requires the limiter. + // Hence initialise the limiter before creating the WAL. + // The '!cfg.WALConfig.walEnabled' argument says don't flush on shutdown if the WAL is enabled. + i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WALConfig.walEnabled) if err != nil { return nil, err } - - // Init the limter and instantiate the user states which depend on it i.limiter = NewSeriesLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor, cfg.ShardByAllLabels) - i.userStates = newUserStates(i.limiter, cfg, i.metrics) + + if cfg.WALConfig.recover { + level.Info(util.Logger).Log("msg", "recovering from WAL") + start := time.Now() + if err := recoverFromWAL(i); err != nil { + level.Error(util.Logger).Log("msg", "failed to recover from WAL", "time", time.Since(start).String()) + return nil, err + } + elapsed := time.Since(start) + level.Info(util.Logger).Log("msg", "recovery from WAL completed", "time", elapsed.String()) + i.metrics.walReplayDuration.Set(elapsed.Seconds()) + } + + // If the WAL recover happened, then the userStates would already be set. + if i.userStates == nil { + i.userStates = newUserStates(i.limiter, cfg, i.metrics) + } + + i.wal, err = newWAL(cfg.WALConfig, i.userStates.cp) + if err != nil { + return nil, err + } // Now that user states have been created, we can start the lifecycler i.lifecycler.Start() @@ -200,6 +245,8 @@ func (i *Ingester) Shutdown() { close(i.quit) i.done.Wait() + i.wal.Stop() + // Next initiate our graceful exit from the ring. i.lifecycler.Shutdown() } @@ -209,7 +256,11 @@ func (i *Ingester) Shutdown() { // * Change the state of ring to stop accepting writes. // * Flush all the chunks. func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { + originalState := i.lifecycler.FlushOnShutdown() + // We want to flush the chunks if transfer fails irrespective of original flag. + i.lifecycler.SetFlushOnShutdown(true) i.Shutdown() + i.lifecycler.SetFlushOnShutdown(originalState) w.WriteHeader(http.StatusNoContent) } @@ -232,11 +283,25 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. if err != nil { return nil, fmt.Errorf("no user id") } + var lastPartialErr *validationError + var record *Record + if i.cfg.WALConfig.walEnabled { + record = recordPool.Get().(*Record) + record.UserId = userID + // Assuming there is not much churn in most cases, there is no use + // keeping the record.Labels slice hanging around. + record.Labels = nil + if cap(record.Samples) < len(req.Timeseries) { + record.Samples = make([]Sample, 0, len(req.Timeseries)) + } else { + record.Samples = record.Samples[:0] + } + } for _, ts := range req.Timeseries { for _, s := range ts.Samples { - err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source) + err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source, record) if err == nil { continue } @@ -254,10 +319,19 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. if lastPartialErr != nil { return &client.WriteResponse{}, lastPartialErr.WrapWithUser(userID).WrappedError() } + + if record != nil { + // Log the record only if there was no error in ingestion. + if err := i.wal.Log(record); err != nil { + return nil, err + } + recordPool.Put(record) + } + return &client.WriteResponse{}, nil } -func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum) error { +func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum, record *Record) error { labels.removeBlanks() var ( @@ -274,7 +348,8 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, if i.stopped { return fmt.Errorf("ingester stopping") } - state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels) + + state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels, record) if err != nil { if ve, ok := err.(*validationError); ok { state.discardedSamples.WithLabelValues(ve.errorType).Inc() @@ -310,6 +385,14 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, return err } + if record != nil { + record.Samples = append(record.Samples, Sample{ + Fingerprint: uint64(fp), + Timestamp: uint64(timestamp), + Value: float64(value), + }) + } + memoryChunks.Add(float64(len(series.chunkDescs) - prevNumChunks)) i.metrics.ingestedSamples.Inc() switch source { @@ -430,7 +513,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_ } numSeries++ - wireChunks, err := toWireChunks(chunks) + wireChunks, err := toWireChunks(chunks, nil) if err != nil { return err } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 93f8897aebb..2c9a74cc799 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -177,10 +177,7 @@ func pushTestSamples(t *testing.T, ing *Ingester, numSeries, samplesPerSeries, o return userIDs, testData } -func TestIngesterAppend(t *testing.T) { - store, ing := newDefaultTestStore(t) - userIDs, testData := pushTestSamples(t, ing, 10, 1000, 0) - +func retrieveTestSamples(t *testing.T, ing *Ingester, userIDs []string, testData map[string]model.Matrix) { // Read samples back via ingester queries. for _, userID := range userIDs { ctx := user.InjectOrgID(context.Background(), userID) @@ -198,6 +195,12 @@ func TestIngesterAppend(t *testing.T) { require.NoError(t, err) assert.Equal(t, testData[userID].String(), res.String()) } +} + +func TestIngesterAppend(t *testing.T) { + store, ing := newDefaultTestStore(t) + userIDs, testData := pushTestSamples(t, ing, 10, 1000, 0) + retrieveTestSamples(t, ing, userIDs, testData) // Read samples back via chunk store. ing.Shutdown() @@ -307,22 +310,22 @@ func TestIngesterAppendOutOfOrderAndDuplicate(t *testing.T) { {Name: model.MetricNameLabel, Value: "testmetric"}, } ctx := context.Background() - err := ing.append(ctx, userID, m, 1, 0, client.API) + err := ing.append(ctx, userID, m, 1, 0, client.API, nil) require.NoError(t, err) // Two times exactly the same sample (noop). - err = ing.append(ctx, userID, m, 1, 0, client.API) + err = ing.append(ctx, userID, m, 1, 0, client.API, nil) require.NoError(t, err) // Earlier sample than previous one. - err = ing.append(ctx, userID, m, 0, 0, client.API) + err = ing.append(ctx, userID, m, 0, 0, client.API, nil) require.Contains(t, err.Error(), "sample timestamp out of order") errResp, ok := err.(*validationError) require.True(t, ok) require.Equal(t, errResp.code, 400) // Same timestamp as previous sample, but different value. - err = ing.append(ctx, userID, m, 1, 1, client.API) + err = ing.append(ctx, userID, m, 1, 1, client.API, nil) require.Contains(t, err.Error(), "sample with repeated timestamp but different value") errResp, ok = err.(*validationError) require.True(t, ok) @@ -340,7 +343,7 @@ func TestIngesterAppendBlankLabel(t *testing.T) { {Name: "bar", Value: ""}, } ctx := user.InjectOrgID(context.Background(), userID) - err := ing.append(ctx, userID, lp, 1, 0, client.API) + err := ing.append(ctx, userID, lp, 1, 0, client.API, nil) require.NoError(t, err) res, _, err := runTestQuery(ctx, t, ing, labels.MatchEqual, labels.MetricName, "testmetric") diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index c4ddca2a12c..ada51bec5a0 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -63,7 +63,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, limits: limits, chunkStore: nil, quit: make(chan struct{}), - + wal: &noopWAL{}, TSDBState: TSDBState{ dbs: make(map[string]*tsdb.DB), bucket: bucketClient, @@ -81,7 +81,7 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, }, i.numSeriesInTSDB)) } - i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey) + i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true) if err != nil { return nil, err } diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go index 7a97258b97b..09bccdd373b 100644 --- a/pkg/ingester/metrics.go +++ b/pkg/ingester/metrics.go @@ -29,6 +29,7 @@ type ingesterMetrics struct { memUsers prometheus.Gauge memSeriesCreatedTotal *prometheus.CounterVec memSeriesRemovedTotal *prometheus.CounterVec + walReplayDuration prometheus.Gauge } func newIngesterMetrics(r prometheus.Registerer, registerMetricsConflictingWithTSDB bool) *ingesterMetrics { @@ -83,6 +84,10 @@ func newIngesterMetrics(r prometheus.Registerer, registerMetricsConflictingWithT Name: memSeriesRemovedTotalName, Help: memSeriesRemovedTotalHelp, }, []string{"user"}), + walReplayDuration: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "cortex_ingester_wal_replay_duration_seconds", + Help: "Time taken to replay the checkpoint and the WAL.", + }), } if r != nil { @@ -96,6 +101,7 @@ func newIngesterMetrics(r prometheus.Registerer, registerMetricsConflictingWithT m.queriedChunks, m.memSeries, m.memUsers, + m.walReplayDuration, ) if registerMetricsConflictingWithTSDB { diff --git a/pkg/ingester/query_test.go b/pkg/ingester/query_test.go index 7f46c774e57..cc93aad14d9 100644 --- a/pkg/ingester/query_test.go +++ b/pkg/ingester/query_test.go @@ -55,7 +55,7 @@ func BenchmarkQueryStream(b *testing.B) { {Name: "cpu", Value: cpus[i%numCPUs]}, } - state, fp, series, err := ing.userStates.getOrCreateSeries(ctx, "1", labels) + state, fp, series, err := ing.userStates.getOrCreateSeries(ctx, "1", labels, nil) require.NoError(b, err) for j := 0; j < numSamples; j++ { diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 05d3dad6ea5..7c4ceddfa3e 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -97,7 +97,7 @@ func (i *Ingester) TransferChunks(stream client.Ingester_TransferChunksServer) e return errors.Wrap(err, "TransferChunks: fromWireChunks") } - state, fp, series, err := userStates.getOrCreateSeries(stream.Context(), wireSeries.UserId, wireSeries.Labels) + state, fp, series, err := userStates.getOrCreateSeries(stream.Context(), wireSeries.UserId, wireSeries.Labels, nil) if err != nil { return errors.Wrapf(err, "TransferChunks: getOrCreateSeries: user %s series %s", wireSeries.UserId, wireSeries.Labels) } @@ -349,8 +349,12 @@ func (i *Ingester) TransferTSDB(stream client.Ingester_TransferTSDBServer) error return nil } -func toWireChunks(descs []*desc) ([]client.Chunk, error) { - wireChunks := make([]client.Chunk, 0, len(descs)) +// The passed wireChunks slice is for re-use. +func toWireChunks(descs []*desc, wireChunks []client.Chunk) ([]client.Chunk, error) { + if cap(wireChunks) < len(descs) { + wireChunks = make([]client.Chunk, 0, len(descs)) + } + wireChunks = wireChunks[:0] for _, d := range descs { wireChunk := client.Chunk{ StartTimestampMs: int64(d.FirstTime), @@ -454,6 +458,7 @@ func (i *Ingester) transferOut(ctx context.Context) error { return errors.Wrap(err, "TransferChunks") } + var chunks []client.Chunk for userID, state := range userStatesCopy { for pair := range state.fpToSeries.iter() { state.fpLocker.Lock(pair.fp) @@ -463,7 +468,7 @@ func (i *Ingester) transferOut(ctx context.Context) error { continue } - chunks, err := toWireChunks(pair.series.chunkDescs) + chunks, err = toWireChunks(pair.series.chunkDescs, chunks) if err != nil { state.fpLocker.Unlock(pair.fp) return errors.Wrap(err, "toWireChunks") diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 89732c5aed8..f735fc0b013 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -106,17 +106,7 @@ func (us *userStates) get(userID string) (*userState, bool) { return state.(*userState), ok } -func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, error) { - userID, err := user.ExtractOrgID(ctx) - if err != nil { - return nil, false, fmt.Errorf("no user id") - } - state, ok := us.get(userID) - return state, ok, nil -} - -func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labels []client.LabelAdapter) (*userState, model.Fingerprint, *memorySeries, error) { - +func (us *userStates) getOrCreate(userID string) *userState { state, ok := us.get(userID) if !ok { @@ -153,11 +143,25 @@ func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labe state = stored.(*userState) } - fp, series, err := state.getSeries(labels) + return state +} + +func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, error) { + userID, err := user.ExtractOrgID(ctx) + if err != nil { + return nil, false, fmt.Errorf("no user id") + } + state, ok := us.get(userID) + return state, ok, nil +} + +func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labels []client.LabelAdapter, record *Record) (*userState, model.Fingerprint, *memorySeries, error) { + state := us.getOrCreate(userID) + fp, series, err := state.getSeries(labels, record) return state, fp, series, err } -func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeries, error) { +func (u *userState) getSeries(metric labelPairs, record *Record) (model.Fingerprint, *memorySeries, error) { rawFP := client.FastFingerprint(metric) u.fpLocker.Lock(rawFP) fp := u.mapper.mapFP(rawFP, metric) @@ -171,38 +175,55 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri return fp, series, nil } + series, err := u.createSeriesWithFingerprint(fp, metric, record, false) + if err != nil { + u.fpLocker.Unlock(fp) + return 0, nil, err + } + + return fp, series, nil +} + +func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric labelPairs, record *Record, recovery bool) (*memorySeries, error) { // There's theoretically a relatively harmless race here if multiple // goroutines get the length of the series map at the same time, then // all proceed to add a new series. This is likely not worth addressing, // as this should happen rarely (all samples from one push are added // serially), and the overshoot in allowed series would be minimal. - err := u.limiter.AssertMaxSeriesPerUser(u.userID, u.fpToSeries.length()) - if err != nil { - u.fpLocker.Unlock(fp) - return fp, nil, makeLimitError(perUserSeriesLimit, err) + + if !recovery { + if err := u.limiter.AssertMaxSeriesPerUser(u.userID, u.fpToSeries.length()); err != nil { + return nil, makeLimitError(perUserSeriesLimit, err) + } } metricName, err := extract.MetricNameFromLabelAdapters(metric) if err != nil { - u.fpLocker.Unlock(fp) - return fp, nil, err + return nil, err } - // Check if the per-metric limit has been exceeded - err = u.canAddSeriesFor(string(metricName)) - if err != nil { - u.fpLocker.Unlock(fp) - return fp, nil, makeMetricLimitError(perMetricSeriesLimit, client.FromLabelAdaptersToLabels(metric), err) + if !recovery { + // Check if the per-metric limit has been exceeded + if err = u.canAddSeriesFor(string(metricName)); err != nil { + return nil, makeMetricLimitError(perMetricSeriesLimit, client.FromLabelAdaptersToLabels(metric), err) + } } u.memSeriesCreatedTotal.Inc() u.memSeries.Inc() + if record != nil { + record.Labels = append(record.Labels, Labels{ + Fingerprint: uint64(fp), + Labels: metric, + }) + } + labels := u.index.Add(metric, fp) - series = newMemorySeries(labels) + series := newMemorySeries(labels) u.fpToSeries.put(fp, series) - return fp, series, nil + return series, nil } func (u *userState) canAddSeriesFor(metric string) error { diff --git a/pkg/ingester/wal.go b/pkg/ingester/wal.go new file mode 100644 index 00000000000..031c0194ae7 --- /dev/null +++ b/pkg/ingester/wal.go @@ -0,0 +1,808 @@ +package ingester + +import ( + "flag" + "fmt" + "io" + "io/ioutil" + "math" + "os" + "path/filepath" + "runtime" + "strconv" + "strings" + "sync" + "time" + + "github.com/go-kit/kit/log/level" + "github.com/golang/protobuf/proto" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/prometheus/prometheus/tsdb/wal" + + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util" +) + +// WALConfig is config for the Write Ahead Log. +type WALConfig struct { + walEnabled bool `yaml:"wal_enabled,omitempty"` + checkpointEnabled bool `yaml:"checkpoint_enabled,omitempty"` + recover bool `yaml:"recover_from_wal,omitempty"` + dir string `yaml:"wal_dir,omitempty"` + checkpointDuration time.Duration `yaml:"checkpoint_duration,omitempty"` + metricsRegisterer prometheus.Registerer +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&cfg.dir, "ingester.wal-dir", "wal", "Directory to store the WAL and/or recover from WAL.") + f.BoolVar(&cfg.recover, "ingester.recover-from-wal", false, "Recover data from existing WAL irrespective of WAL enabled/disabled.") + f.BoolVar(&cfg.walEnabled, "ingester.wal-enabled", false, "Enable writing of ingested data into WAL.") + f.BoolVar(&cfg.checkpointEnabled, "ingester.checkpoint-enabled", false, "Enable checkpointing of in-memory chunks.") + f.DurationVar(&cfg.checkpointDuration, "ingester.checkpoint-duration", 30*time.Minute, "Interval at which checkpoints should be created.") +} + +// WAL interface allows us to have a no-op WAL when the WAL is disabled. +type WAL interface { + // Log marshalls the records and writes it into the WAL. + Log(*Record) error + // Stop stops all the WAL operations. + Stop() +} + +type noopWAL struct{} + +func (noopWAL) Log(*Record) error { return nil } +func (noopWAL) Stop() {} + +type walWrapper struct { + cfg WALConfig + quit chan struct{} + wait sync.WaitGroup + + wal *wal.WAL + getUserStates func() map[string]*userState + + // Checkpoint metrics. + checkpointDeleteFail prometheus.Counter + checkpointDeleteTotal prometheus.Counter + checkpointCreationFail prometheus.Counter + checkpointCreationTotal prometheus.Counter + checkpointDuration prometheus.Summary +} + +// newWAL creates a WAL object. If the WAL is disabled, then the returned WAL is a no-op WAL. +func newWAL(cfg WALConfig, userStatesFunc func() map[string]*userState) (WAL, error) { + if !cfg.walEnabled { + return &noopWAL{}, nil + } + + var walRegistry prometheus.Registerer + if cfg.metricsRegisterer != nil { + walRegistry = prometheus.WrapRegistererWith(prometheus.Labels{"kind": "wal"}, cfg.metricsRegisterer) + } + tsdbWAL, err := wal.NewSize(util.Logger, walRegistry, cfg.dir, wal.DefaultSegmentSize/4, true) + if err != nil { + return nil, err + } + + w := &walWrapper{ + cfg: cfg, + quit: make(chan struct{}), + wal: tsdbWAL, + getUserStates: userStatesFunc, + } + + w.checkpointDeleteFail = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "ingester_checkpoint_deletions_failed_total", + Help: "Total number of checkpoint deletions that failed.", + }) + w.checkpointDeleteTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "ingester_checkpoint_deletions_total", + Help: "Total number of checkpoint deletions attempted.", + }) + w.checkpointCreationFail = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "ingester_checkpoint_creations_failed_total", + Help: "Total number of checkpoint creations that failed.", + }) + w.checkpointCreationTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "ingester_checkpoint_creations_total", + Help: "Total number of checkpoint creations attempted.", + }) + w.checkpointDuration = prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "ingester_checkpoint_duration_seconds", + Help: "Time taken to create a checkpoint.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }) + if cfg.metricsRegisterer != nil { + cfg.metricsRegisterer.MustRegister( + w.checkpointDeleteFail, + w.checkpointDeleteTotal, + w.checkpointCreationFail, + w.checkpointCreationTotal, + w.checkpointDuration, + ) + } + + w.wait.Add(1) + go w.run() + return w, nil +} + +func (w *walWrapper) Stop() { + close(w.quit) + w.wait.Wait() + w.wal.Close() +} + +func (w *walWrapper) Log(record *Record) error { + select { + case <-w.quit: + return nil + default: + if record == nil { + return nil + } + buf, err := proto.Marshal(record) + if err != nil { + return err + } + return w.wal.Log(buf) + } +} + +func (w *walWrapper) run() { + defer w.wait.Done() + + if !w.cfg.checkpointEnabled { + return + } + + ticker := time.NewTicker(w.cfg.checkpointDuration) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + start := time.Now() + level.Info(util.Logger).Log("msg", "starting checkpoint") + if err := w.performCheckpoint(); err != nil { + level.Error(util.Logger).Log("msg", "error checkpointing series", "err", err) + continue + } + elapsed := time.Since(start) + level.Info(util.Logger).Log("msg", "checkpoint done", "time", elapsed.String()) + w.checkpointDuration.Observe(elapsed.Seconds()) + case <-w.quit: + level.Info(util.Logger).Log("msg", "creating checkpoint before shutdown") + if err := w.performCheckpoint(); err != nil { + level.Error(util.Logger).Log("msg", "error checkpointing series during shutdown", "err", err) + } + return + } + } +} + +const checkpointPrefix = "checkpoint." + +func (w *walWrapper) performCheckpoint() (err error) { + if !w.cfg.checkpointEnabled { + return nil + } + + w.checkpointCreationTotal.Inc() + defer func() { + if err != nil { + w.checkpointCreationFail.Inc() + } + }() + + if w.getUserStates == nil { + return errors.New("function to get user states not initialised") + } + + _, lastSegment, err := w.wal.Segments() + if err != nil { + return err + } + + _, lastCh, err := lastCheckpoint(w.wal.Dir()) + if err != nil { + return err + } + + // Checkpoint is named after the last WAL segment present so that when replaying the WAL + // we can start from that particular WAL segment. + checkpointDir := filepath.Join(w.wal.Dir(), fmt.Sprintf(checkpointPrefix+"%06d", lastSegment)) + level.Info(util.Logger).Log("msg", "attempting checkpoint for", "dir", checkpointDir) + checkpointDirTemp := checkpointDir + ".tmp" + + if err := os.MkdirAll(checkpointDirTemp, 0777); err != nil { + return errors.Wrap(err, "create checkpoint dir") + } + checkpoint, err := wal.New(nil, nil, checkpointDirTemp, true) + if err != nil { + return errors.Wrap(err, "open checkpoint") + } + defer func() { + checkpoint.Close() + os.RemoveAll(checkpointDirTemp) + }() + + var wireChunkBuf []client.Chunk + for userID, state := range w.getUserStates() { + for pair := range state.fpToSeries.iter() { + state.fpLocker.Lock(pair.fp) + wireChunkBuf, err = w.checkpointSeries(checkpoint, userID, pair.fp, pair.series, wireChunkBuf) + state.fpLocker.Unlock(pair.fp) + if err != nil { + return err + } + } + } + + if err := checkpoint.Close(); err != nil { + return errors.Wrap(err, "close checkpoint") + } + if err := fileutil.Replace(checkpointDirTemp, checkpointDir); err != nil { + return errors.Wrap(err, "rename checkpoint directory") + } + + // The last segment might still have been active during the checkpointing, + // hence delete only the segments before that. + if err := w.wal.Truncate(lastSegment - 1); err != nil { + // It is fine to have old WAL segments hanging around if deletion failed. + // We can try again next time. + level.Error(util.Logger).Log("msg", "error deleting old WAL segments", "err", err) + } + + if lastCh >= 0 { + if err := w.deleteCheckpoints(lastCh); err != nil { + // It is fine to have old checkpoints hanging around if deletion failed. + // We can try again next time. + level.Error(util.Logger).Log("msg", "error deleting old checkpoint", "err", err) + } + } + + return nil +} + +// lastCheckpoint returns the directory name and index of the most recent checkpoint. +// If dir does not contain any checkpoints, -1 is returned as index. +func lastCheckpoint(dir string) (string, int, error) { + dirs, err := ioutil.ReadDir(dir) + if err != nil { + return "", -1, err + } + var ( + maxIdx = -1 + checkpointDir string + ) + // There may be multiple checkpoints left, so select the one with max index. + for i := 0; i < len(dirs); i++ { + di := dirs[i] + + if !strings.HasPrefix(di.Name(), checkpointPrefix) { + continue + } + if !di.IsDir() { + return "", -1, fmt.Errorf("checkpoint %s is not a directory", di.Name()) + } + idx, err := strconv.Atoi(di.Name()[len(checkpointPrefix):]) + if err != nil { + continue + } + if idx > maxIdx { + checkpointDir = di.Name() + maxIdx = idx + } + } + if maxIdx >= 0 { + return filepath.Join(dir, checkpointDir), maxIdx, nil + } + return "", -1, nil +} + +// deleteCheckpoints deletes all checkpoints in a directory which is <= maxIndex. +func (w *walWrapper) deleteCheckpoints(maxIndex int) (err error) { + w.checkpointDeleteTotal.Inc() + defer func() { + if err != nil { + w.checkpointDeleteFail.Inc() + } + }() + + var errs tsdb_errors.MultiError + + files, err := ioutil.ReadDir(w.wal.Dir()) + if err != nil { + return err + } + for _, fi := range files { + if !strings.HasPrefix(fi.Name(), checkpointPrefix) { + continue + } + index, err := strconv.Atoi(fi.Name()[len(checkpointPrefix):]) + if err != nil || index >= maxIndex { + continue + } + if err := os.RemoveAll(filepath.Join(w.wal.Dir(), fi.Name())); err != nil { + errs.Add(err) + } + } + return errs.Err() +} + +// checkpointSeries write the chunks of the series to the checkpoint. +func (w *walWrapper) checkpointSeries(cp *wal.WAL, userID string, fp model.Fingerprint, series *memorySeries, wireChunks []client.Chunk) ([]client.Chunk, error) { + var err error + wireChunks, err = toWireChunks(series.chunkDescs, wireChunks[:0]) + if err != nil { + return wireChunks, err + } + + buf, err := proto.Marshal(&Series{ + UserId: userID, + Fingerprint: uint64(fp), + Labels: client.FromLabelsToLabelAdapters(series.metric), + Chunks: wireChunks, + }) + if err != nil { + return wireChunks, err + } + + return wireChunks, cp.Log(buf) +} + +func recoverFromWAL(ingester *Ingester) (err error) { + walDir := ingester.cfg.WALConfig.dir + // Use a local userStates, so we don't need to worry about locking. + userStates := newUserStates(ingester.limiter, ingester.cfg, ingester.metrics) + + defer func() { + if err == nil { + ingester.userStatesMtx.Lock() + ingester.userStates = userStates + ingester.userStatesMtx.Unlock() + } + }() + + lastCheckpointDir, idx, err := lastCheckpoint(walDir) + if err != nil { + return err + } + + nWorkers := runtime.GOMAXPROCS(0) + stateCache := make([]map[string]*userState, nWorkers) + seriesCache := make([]map[string]map[uint64]*memorySeries, nWorkers) + for i := 0; i < nWorkers; i++ { + stateCache[i] = make(map[string]*userState) + seriesCache[i] = make(map[string]map[uint64]*memorySeries) + } + + if idx >= 0 { + // Checkpoint exists. + level.Info(util.Logger).Log("msg", "recovering from checkpoint", "checkpoint", lastCheckpointDir) + start := time.Now() + if err := processCheckpoint(lastCheckpointDir, userStates, nWorkers, stateCache, seriesCache); err != nil { + return err + } + elapsed := time.Since(start) + level.Info(util.Logger).Log("msg", "recovered from checkpoint", "time", elapsed.String()) + } else { + level.Info(util.Logger).Log("msg", "no checkpoint found") + } + + if segExists, err := segmentsExist(walDir); err == nil && !segExists { + level.Info(util.Logger).Log("msg", "no segments found, skipping recover from segments") + return nil + } + + level.Info(util.Logger).Log("msg", "recovering from WAL", "dir", walDir, "start_segment", idx) + start := time.Now() + if err := processWAL(walDir, idx, userStates, nWorkers, stateCache, seriesCache); err != nil { + return err + } + elapsed := time.Since(start) + level.Info(util.Logger).Log("msg", "recovered from WAL", "time", elapsed.String()) + + return nil +} + +// segmentsExist is a stripped down version of +// https://github.com/prometheus/prometheus/blob/4c648eddf47d7e07fbc74d0b18244402200dca9e/tsdb/wal/wal.go#L739-L760. +func segmentsExist(dir string) (bool, error) { + files, err := fileutil.ReadDir(dir) + if err != nil { + return false, err + } + for _, fn := range files { + if _, err := strconv.Atoi(fn); err == nil { + // First filename which is a number. + // This is how Prometheus stores and this + // is how it checks too. + return true, nil + } + } + return false, nil +} + +// processCheckpoint loads the chunks of the series present in the last checkpoint. +func processCheckpoint(name string, userStates *userStates, nWorkers int, + stateCache []map[string]*userState, seriesCache []map[string]map[uint64]*memorySeries) error { + + reader, closer, err := newWalReader(name, -1) + if err != nil { + return err + } + defer closer.Close() + + var ( + inputs = make([]chan *Series, nWorkers) + // errChan is to capture the errors from goroutine. + // The channel size is nWorkers+1 to not block any worker if all of them error out. + errChan = make(chan error, nWorkers) + wg = sync.WaitGroup{} + seriesPool = &sync.Pool{ + New: func() interface{} { + return &Series{} + }, + } + ) + + wg.Add(nWorkers) + for i := 0; i < nWorkers; i++ { + inputs[i] = make(chan *Series, 300) + go func(input <-chan *Series, stateCache map[string]*userState, seriesCache map[string]map[uint64]*memorySeries) { + processCheckpointRecord(userStates, seriesPool, stateCache, seriesCache, input, errChan) + wg.Done() + }(inputs[i], stateCache[i], seriesCache[i]) + } + + var capturedErr error +Loop: + for reader.Next() { + s := seriesPool.Get().(*Series) + if err := proto.Unmarshal(reader.Record(), s); err != nil { + // We don't return here in order to close/drain all the channels and + // make sure all goroutines exit. + capturedErr = err + break Loop + } + // The yoloString from the unmarshal of LabelAdapter gets corrupted + // when travelling through the channel. Hence making a copy of that. + // This extra alloc during the read path is fine as it's only 1 time + // and saves extra allocs during write path by having LabelAdapter. + s.Labels = copyLabelAdapters(s.Labels) + + select { + case capturedErr = <-errChan: + // Exit early on an error. + // Only acts upon the first error received. + break Loop + default: + mod := s.Fingerprint % uint64(nWorkers) + inputs[mod] <- s + } + } + + for i := 0; i < nWorkers; i++ { + close(inputs[i]) + } + wg.Wait() + // If any worker errored out, some input channels might not be empty. + // Hence drain them. + for i := 0; i < nWorkers; i++ { + for range inputs[i] { + } + } + + if capturedErr != nil { + return capturedErr + } + select { + case capturedErr = <-errChan: + return capturedErr + default: + return reader.Err() + } +} + +func copyLabelAdapters(las []client.LabelAdapter) []client.LabelAdapter { + for i := range las { + n, v := make([]byte, len(las[i].Name)), make([]byte, len(las[i].Value)) + copy(n, las[i].Name) + copy(v, las[i].Value) + las[i].Name = string(n) + las[i].Value = string(v) + } + return las +} + +func processCheckpointRecord(userStates *userStates, seriesPool *sync.Pool, stateCache map[string]*userState, + seriesCache map[string]map[uint64]*memorySeries, seriesChan <-chan *Series, errChan chan error) { + var la []client.LabelAdapter + for s := range seriesChan { + state, ok := stateCache[s.UserId] + if !ok { + state = userStates.getOrCreate(s.UserId) + stateCache[s.UserId] = state + seriesCache[s.UserId] = make(map[uint64]*memorySeries) + } + + la = la[:0] + for _, l := range s.Labels { + la = append(la, client.LabelAdapter{ + Name: string(l.Name), + Value: string(l.Value), + }) + } + series, err := state.createSeriesWithFingerprint(model.Fingerprint(s.Fingerprint), la, nil, true) + if err != nil { + errChan <- err + return + } + + descs, err := fromWireChunks(s.Chunks) + if err != nil { + errChan <- err + return + } + + if err := series.setChunks(descs); err != nil { + errChan <- err + return + } + memoryChunks.Add(float64(len(descs))) + + seriesCache[s.UserId][s.Fingerprint] = series + seriesPool.Put(s) + } +} + +type samplesWithUserID struct { + samples []Sample + userID string +} + +// processWAL processes the records in the WAL concurrently. +func processWAL(name string, startSegment int, userStates *userStates, nWorkers int, + stateCache []map[string]*userState, seriesCache []map[string]map[uint64]*memorySeries) error { + + reader, closer, err := newWalReader(name, startSegment) + if err != nil { + return err + } + defer closer.Close() + + var ( + wg sync.WaitGroup + inputs = make([]chan *samplesWithUserID, nWorkers) + outputs = make([]chan *samplesWithUserID, nWorkers) + // errChan is to capture the errors from goroutine. + // The channel size is nWorkers to not block any worker if all of them error out. + errChan = make(chan error, nWorkers) + shards = make([]*samplesWithUserID, nWorkers) + ) + + wg.Add(nWorkers) + for i := 0; i < nWorkers; i++ { + outputs[i] = make(chan *samplesWithUserID, 300) + inputs[i] = make(chan *samplesWithUserID, 300) + shards[i] = &samplesWithUserID{} + + go func(input <-chan *samplesWithUserID, output chan<- *samplesWithUserID, + stateCache map[string]*userState, seriesCache map[string]map[uint64]*memorySeries) { + processWALSamples(userStates, stateCache, seriesCache, input, output, errChan) + wg.Done() + }(inputs[i], outputs[i], stateCache[i], seriesCache[i]) + } + + var ( + capturedErr error + record = &Record{} + ) +Loop: + for reader.Next() { + select { + case capturedErr = <-errChan: + // Exit early on an error. + // Only acts upon the first error received. + break Loop + default: + } + if err := proto.Unmarshal(reader.Record(), record); err != nil { + // We don't return here in order to close/drain all the channels and + // make sure all goroutines exit. + capturedErr = err + break Loop + } + + if len(record.Labels) > 0 { + state := userStates.getOrCreate(record.UserId) + // Create the series from labels which do not exist. + for _, labels := range record.Labels { + _, ok := state.fpToSeries.get(model.Fingerprint(labels.Fingerprint)) + if ok { + continue + } + _, err := state.createSeriesWithFingerprint(model.Fingerprint(labels.Fingerprint), labels.Labels, nil, true) + if err != nil { + // We don't return here in order to close/drain all the channels and + // make sure all goroutines exit. + capturedErr = err + break Loop + } + } + } + + // We split up the samples into chunks of 5000 samples or less. + // With O(300 * #cores) in-flight sample batches, large scrapes could otherwise + // cause thousands of very large in flight buffers occupying large amounts + // of unused memory. + for len(record.Samples) > 0 { + m := 5000 + if len(record.Samples) < m { + m = len(record.Samples) + } + for i := 0; i < nWorkers; i++ { + if len(shards[i].samples) == 0 { + // It is possible that the previous iteration did not put + // anything in this shard. In that case no need to get a new buffer. + shards[i].userID = record.UserId + continue + } + select { + case buf := <-outputs[i]: + buf.samples = buf.samples[:0] + buf.userID = record.UserId + shards[i] = buf + default: + shards[i] = &samplesWithUserID{ + userID: record.UserId, + } + } + } + for _, sam := range record.Samples[:m] { + mod := sam.Fingerprint % uint64(nWorkers) + shards[mod].samples = append(shards[mod].samples, sam) + } + for i := 0; i < nWorkers; i++ { + if len(shards[i].samples) > 0 { + inputs[i] <- shards[i] + } + } + record.Samples = record.Samples[m:] + } + } + + for i := 0; i < nWorkers; i++ { + close(inputs[i]) + for range outputs[i] { + } + } + wg.Wait() + // If any worker errored out, some input channels might not be empty. + // Hence drain them. + for i := 0; i < nWorkers; i++ { + for range inputs[i] { + } + } + + if capturedErr != nil { + return capturedErr + } + select { + case capturedErr = <-errChan: + return capturedErr + default: + return reader.Err() + } +} + +func processWALSamples(userStates *userStates, stateCache map[string]*userState, seriesCache map[string]map[uint64]*memorySeries, + input <-chan *samplesWithUserID, output chan<- *samplesWithUserID, errChan chan error) { + defer close(output) + + sp := model.SamplePair{} + for samples := range input { + state, ok := stateCache[samples.userID] + if !ok { + state = userStates.getOrCreate(samples.userID) + stateCache[samples.userID] = state + seriesCache[samples.userID] = make(map[uint64]*memorySeries) + } + sc := seriesCache[samples.userID] + for i := range samples.samples { + series, ok := sc[samples.samples[i].Fingerprint] + if !ok { + series, ok = state.fpToSeries.get(model.Fingerprint(samples.samples[i].Fingerprint)) + if !ok { + // This should ideally not happen. + // If the series was not created in recovering checkpoint or + // from the labels of any records previous to this, there + // is no way to get the labels for this fingerprint. + level.Warn(util.Logger).Log("msg", "series not found for sample during wal recovery", "userid", samples.userID, "fingerprint", model.Fingerprint(samples.samples[i].Fingerprint).String()) + continue + } + } + + sp.Timestamp = model.Time(samples.samples[i].Timestamp) + sp.Value = model.SampleValue(samples.samples[i].Value) + // There can be many out of order samples because of checkpoint and WAL overlap. + // Checking this beforehand avoids the allocation of lots of error messages. + if sp.Timestamp.After(series.lastTime) { + if err := series.add(sp); err != nil { + errChan <- err + return + } + } + } + output <- samples + } +} + +// If startSegment is <0, it means all the segments. +func newWalReader(name string, startSegment int) (*wal.Reader, io.Closer, error) { + var ( + segmentReader io.ReadCloser + err error + ) + if startSegment < 0 { + segmentReader, err = wal.NewSegmentsReader(name) + if err != nil { + return nil, nil, err + } + } else { + first, last, err := SegmentRange(name) + if err != nil { + return nil, nil, err + } + if startSegment > last { + return nil, nil, errors.New("start segment is beyond the last WAL segment") + } + if first > startSegment { + startSegment = first + } + segmentReader, err = wal.NewSegmentsRangeReader(wal.SegmentRange{ + Dir: name, + First: startSegment, + Last: -1, // Till the end. + }) + if err != nil { + return nil, nil, err + } + } + return wal.NewReader(segmentReader), segmentReader, nil +} + +// SegmentRange returns the first and last segment index of the WAL in the dir. +// If https://github.com/prometheus/prometheus/pull/6477 is merged, get rid of this +// method and use from Prometheus directly. +func SegmentRange(dir string) (int, int, error) { + files, err := fileutil.ReadDir(dir) + if err != nil { + return 0, 0, err + } + first, last := math.MaxInt32, math.MinInt32 + for _, fn := range files { + k, err := strconv.Atoi(fn) + if err != nil { + continue + } + if k < first { + first = k + } + if k > last { + last = k + } + } + if first == math.MaxInt32 || last == math.MinInt32 { + return -1, -1, nil + } + return first, last, nil +} diff --git a/pkg/ingester/wal.pb.go b/pkg/ingester/wal.pb.go new file mode 100644 index 00000000000..c3fcc096001 --- /dev/null +++ b/pkg/ingester/wal.pb.go @@ -0,0 +1,1489 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: wal.proto + +package ingester + +import ( + encoding_binary "encoding/binary" + fmt "fmt" + client "github.com/cortexproject/cortex/pkg/ingester/client" + github_com_cortexproject_cortex_pkg_ingester_client "github.com/cortexproject/cortex/pkg/ingester/client" + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" + reflect "reflect" + strings "strings" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type Record struct { + UserId string `protobuf:"bytes,1,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` + Labels []Labels `protobuf:"bytes,2,rep,name=labels,proto3" json:"labels"` + Samples []Sample `protobuf:"bytes,3,rep,name=samples,proto3" json:"samples"` +} + +func (m *Record) Reset() { *m = Record{} } +func (*Record) ProtoMessage() {} +func (*Record) Descriptor() ([]byte, []int) { + return fileDescriptor_ae6364fc8077884f, []int{0} +} +func (m *Record) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Record.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Record) XXX_Merge(src proto.Message) { + xxx_messageInfo_Record.Merge(m, src) +} +func (m *Record) XXX_Size() int { + return m.Size() +} +func (m *Record) XXX_DiscardUnknown() { + xxx_messageInfo_Record.DiscardUnknown(m) +} + +var xxx_messageInfo_Record proto.InternalMessageInfo + +func (m *Record) GetUserId() string { + if m != nil { + return m.UserId + } + return "" +} + +func (m *Record) GetLabels() []Labels { + if m != nil { + return m.Labels + } + return nil +} + +func (m *Record) GetSamples() []Sample { + if m != nil { + return m.Samples + } + return nil +} + +type Labels struct { + Fingerprint uint64 `protobuf:"varint,1,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` + Labels []github_com_cortexproject_cortex_pkg_ingester_client.LabelAdapter `protobuf:"bytes,2,rep,name=labels,proto3,customtype=github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter" json:"labels"` +} + +func (m *Labels) Reset() { *m = Labels{} } +func (*Labels) ProtoMessage() {} +func (*Labels) Descriptor() ([]byte, []int) { + return fileDescriptor_ae6364fc8077884f, []int{1} +} +func (m *Labels) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Labels) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Labels.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Labels) XXX_Merge(src proto.Message) { + xxx_messageInfo_Labels.Merge(m, src) +} +func (m *Labels) XXX_Size() int { + return m.Size() +} +func (m *Labels) XXX_DiscardUnknown() { + xxx_messageInfo_Labels.DiscardUnknown(m) +} + +var xxx_messageInfo_Labels proto.InternalMessageInfo + +func (m *Labels) GetFingerprint() uint64 { + if m != nil { + return m.Fingerprint + } + return 0 +} + +type Sample struct { + Fingerprint uint64 `protobuf:"varint,1,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` + Timestamp uint64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Value float64 `protobuf:"fixed64,3,opt,name=value,proto3" json:"value,omitempty"` +} + +func (m *Sample) Reset() { *m = Sample{} } +func (*Sample) ProtoMessage() {} +func (*Sample) Descriptor() ([]byte, []int) { + return fileDescriptor_ae6364fc8077884f, []int{2} +} +func (m *Sample) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Sample) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Sample.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Sample) XXX_Merge(src proto.Message) { + xxx_messageInfo_Sample.Merge(m, src) +} +func (m *Sample) XXX_Size() int { + return m.Size() +} +func (m *Sample) XXX_DiscardUnknown() { + xxx_messageInfo_Sample.DiscardUnknown(m) +} + +var xxx_messageInfo_Sample proto.InternalMessageInfo + +func (m *Sample) GetFingerprint() uint64 { + if m != nil { + return m.Fingerprint + } + return 0 +} + +func (m *Sample) GetTimestamp() uint64 { + if m != nil { + return m.Timestamp + } + return 0 +} + +func (m *Sample) GetValue() float64 { + if m != nil { + return m.Value + } + return 0 +} + +type Series struct { + UserId string `protobuf:"bytes,1,opt,name=user_id,json=userId,proto3" json:"user_id,omitempty"` + Fingerprint uint64 `protobuf:"varint,2,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` + Labels []github_com_cortexproject_cortex_pkg_ingester_client.LabelAdapter `protobuf:"bytes,3,rep,name=labels,proto3,customtype=github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter" json:"labels"` + Chunks []client.Chunk `protobuf:"bytes,4,rep,name=chunks,proto3" json:"chunks"` +} + +func (m *Series) Reset() { *m = Series{} } +func (*Series) ProtoMessage() {} +func (*Series) Descriptor() ([]byte, []int) { + return fileDescriptor_ae6364fc8077884f, []int{3} +} +func (m *Series) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Series) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Series.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Series) XXX_Merge(src proto.Message) { + xxx_messageInfo_Series.Merge(m, src) +} +func (m *Series) XXX_Size() int { + return m.Size() +} +func (m *Series) XXX_DiscardUnknown() { + xxx_messageInfo_Series.DiscardUnknown(m) +} + +var xxx_messageInfo_Series proto.InternalMessageInfo + +func (m *Series) GetUserId() string { + if m != nil { + return m.UserId + } + return "" +} + +func (m *Series) GetFingerprint() uint64 { + if m != nil { + return m.Fingerprint + } + return 0 +} + +func (m *Series) GetChunks() []client.Chunk { + if m != nil { + return m.Chunks + } + return nil +} + +func init() { + proto.RegisterType((*Record)(nil), "ingester.Record") + proto.RegisterType((*Labels)(nil), "ingester.Labels") + proto.RegisterType((*Sample)(nil), "ingester.Sample") + proto.RegisterType((*Series)(nil), "ingester.Series") +} + +func init() { proto.RegisterFile("wal.proto", fileDescriptor_ae6364fc8077884f) } + +var fileDescriptor_ae6364fc8077884f = []byte{ + // 415 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xbc, 0x53, 0xcd, 0xca, 0xd3, 0x40, + 0x14, 0xcd, 0x34, 0x75, 0x6a, 0xa7, 0x08, 0x3a, 0x08, 0x86, 0x22, 0xd3, 0x90, 0x55, 0x41, 0x4c, + 0x44, 0xf7, 0xa2, 0x75, 0xa3, 0xe0, 0x42, 0xd2, 0x9d, 0x0b, 0x25, 0x3f, 0xd3, 0x74, 0x6c, 0x92, + 0x09, 0x33, 0x13, 0x75, 0x29, 0xf8, 0x02, 0xbe, 0x81, 0x5b, 0x1f, 0xa5, 0xcb, 0x2e, 0x8b, 0x8b, + 0x62, 0x53, 0x04, 0x97, 0x7d, 0x04, 0xc9, 0x24, 0xd1, 0x52, 0x50, 0x3e, 0xbe, 0xc5, 0xb7, 0xcb, + 0x39, 0xf7, 0xdc, 0x73, 0xcf, 0xdc, 0xcc, 0xa0, 0xe1, 0x87, 0x20, 0x75, 0x0b, 0xc1, 0x15, 0xc7, + 0xd7, 0x59, 0x9e, 0x50, 0xa9, 0xa8, 0x18, 0xdf, 0x4f, 0x98, 0x5a, 0x96, 0xa1, 0x1b, 0xf1, 0xcc, + 0x4b, 0x78, 0xc2, 0x3d, 0x2d, 0x08, 0xcb, 0x85, 0x46, 0x1a, 0xe8, 0xaf, 0xa6, 0x71, 0xfc, 0xe4, + 0x44, 0x1e, 0x71, 0xa1, 0xe8, 0xc7, 0x42, 0xf0, 0x77, 0x34, 0x52, 0x2d, 0xf2, 0x8a, 0x55, 0xe2, + 0x75, 0xe6, 0x5e, 0x94, 0x32, 0x9a, 0x77, 0xa5, 0xc6, 0xc1, 0xf9, 0x0c, 0x10, 0xf4, 0x69, 0xc4, + 0x45, 0x8c, 0xef, 0xa0, 0x41, 0x29, 0xa9, 0x78, 0xcb, 0x62, 0x0b, 0xd8, 0x60, 0x3a, 0xf4, 0x61, + 0x0d, 0x5f, 0xc4, 0xd8, 0x45, 0x30, 0x0d, 0x42, 0x9a, 0x4a, 0xab, 0x67, 0x9b, 0xd3, 0xd1, 0xc3, + 0x9b, 0x6e, 0x67, 0xe9, 0xbe, 0xd4, 0xfc, 0xac, 0xbf, 0xde, 0x4d, 0x0c, 0xbf, 0x55, 0xe1, 0x07, + 0x68, 0x20, 0x83, 0xac, 0x48, 0xa9, 0xb4, 0xcc, 0xf3, 0x86, 0xb9, 0x2e, 0xb4, 0x0d, 0x9d, 0xcc, + 0xf9, 0x0a, 0x10, 0x6c, 0xac, 0xb0, 0x8d, 0x46, 0x8b, 0x5a, 0x2d, 0x0a, 0xc1, 0x72, 0xa5, 0x93, + 0xf4, 0xfd, 0x53, 0x0a, 0xcb, 0xb3, 0x38, 0xb7, 0xdc, 0xf6, 0x44, 0xda, 0xe1, 0x55, 0xc0, 0xc4, + 0xec, 0x79, 0x6d, 0xff, 0x7d, 0x37, 0xb9, 0xcc, 0x7e, 0x1a, 0x9b, 0xa7, 0x71, 0x50, 0x28, 0x2a, + 0xba, 0x33, 0x39, 0x6f, 0x10, 0x6c, 0xa2, 0x5f, 0x20, 0xe0, 0x5d, 0x34, 0x54, 0x2c, 0xa3, 0x52, + 0x05, 0x59, 0x61, 0xf5, 0x74, 0xfd, 0x2f, 0x81, 0x6f, 0xa3, 0x6b, 0xef, 0x83, 0xb4, 0xa4, 0x96, + 0x69, 0x83, 0x29, 0xf0, 0x1b, 0xe0, 0xfc, 0x04, 0x08, 0xce, 0xa9, 0x60, 0x54, 0xfe, 0xfb, 0x3f, + 0x9c, 0x4d, 0xee, 0xfd, 0x6f, 0x35, 0xe6, 0x95, 0xad, 0x06, 0xdf, 0x43, 0x30, 0x5a, 0x96, 0xf9, + 0x4a, 0x5a, 0x7d, 0x3d, 0xf4, 0x46, 0x37, 0xf4, 0x59, 0xcd, 0x76, 0x77, 0xa3, 0x91, 0xcc, 0x1e, + 0x6f, 0xf6, 0xc4, 0xd8, 0xee, 0x89, 0x71, 0xdc, 0x13, 0xf0, 0xa9, 0x22, 0xe0, 0x5b, 0x45, 0xc0, + 0xba, 0x22, 0x60, 0x53, 0x11, 0xf0, 0xa3, 0x22, 0xe0, 0x57, 0x45, 0x8c, 0x63, 0x45, 0xc0, 0x97, + 0x03, 0x31, 0x36, 0x07, 0x62, 0x6c, 0x0f, 0xc4, 0x78, 0xfd, 0xe7, 0x81, 0x84, 0x50, 0x5f, 0xdb, + 0x47, 0xbf, 0x03, 0x00, 0x00, 0xff, 0xff, 0x83, 0xd9, 0xb8, 0x9a, 0x3e, 0x03, 0x00, 0x00, +} + +func (this *Record) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Record) + if !ok { + that2, ok := that.(Record) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.UserId != that1.UserId { + return false + } + if len(this.Labels) != len(that1.Labels) { + return false + } + for i := range this.Labels { + if !this.Labels[i].Equal(&that1.Labels[i]) { + return false + } + } + if len(this.Samples) != len(that1.Samples) { + return false + } + for i := range this.Samples { + if !this.Samples[i].Equal(&that1.Samples[i]) { + return false + } + } + return true +} +func (this *Labels) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Labels) + if !ok { + that2, ok := that.(Labels) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Fingerprint != that1.Fingerprint { + return false + } + if len(this.Labels) != len(that1.Labels) { + return false + } + for i := range this.Labels { + if !this.Labels[i].Equal(that1.Labels[i]) { + return false + } + } + return true +} +func (this *Sample) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Sample) + if !ok { + that2, ok := that.(Sample) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Fingerprint != that1.Fingerprint { + return false + } + if this.Timestamp != that1.Timestamp { + return false + } + if this.Value != that1.Value { + return false + } + return true +} +func (this *Series) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*Series) + if !ok { + that2, ok := that.(Series) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.UserId != that1.UserId { + return false + } + if this.Fingerprint != that1.Fingerprint { + return false + } + if len(this.Labels) != len(that1.Labels) { + return false + } + for i := range this.Labels { + if !this.Labels[i].Equal(that1.Labels[i]) { + return false + } + } + if len(this.Chunks) != len(that1.Chunks) { + return false + } + for i := range this.Chunks { + if !this.Chunks[i].Equal(&that1.Chunks[i]) { + return false + } + } + return true +} +func (this *Record) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&ingester.Record{") + s = append(s, "UserId: "+fmt.Sprintf("%#v", this.UserId)+",\n") + if this.Labels != nil { + vs := make([]*Labels, len(this.Labels)) + for i := range vs { + vs[i] = &this.Labels[i] + } + s = append(s, "Labels: "+fmt.Sprintf("%#v", vs)+",\n") + } + if this.Samples != nil { + vs := make([]*Sample, len(this.Samples)) + for i := range vs { + vs[i] = &this.Samples[i] + } + s = append(s, "Samples: "+fmt.Sprintf("%#v", vs)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *Labels) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&ingester.Labels{") + s = append(s, "Fingerprint: "+fmt.Sprintf("%#v", this.Fingerprint)+",\n") + s = append(s, "Labels: "+fmt.Sprintf("%#v", this.Labels)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *Sample) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 7) + s = append(s, "&ingester.Sample{") + s = append(s, "Fingerprint: "+fmt.Sprintf("%#v", this.Fingerprint)+",\n") + s = append(s, "Timestamp: "+fmt.Sprintf("%#v", this.Timestamp)+",\n") + s = append(s, "Value: "+fmt.Sprintf("%#v", this.Value)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func (this *Series) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 8) + s = append(s, "&ingester.Series{") + s = append(s, "UserId: "+fmt.Sprintf("%#v", this.UserId)+",\n") + s = append(s, "Fingerprint: "+fmt.Sprintf("%#v", this.Fingerprint)+",\n") + s = append(s, "Labels: "+fmt.Sprintf("%#v", this.Labels)+",\n") + if this.Chunks != nil { + vs := make([]*client.Chunk, len(this.Chunks)) + for i := range vs { + vs[i] = &this.Chunks[i] + } + s = append(s, "Chunks: "+fmt.Sprintf("%#v", vs)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringWal(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} +func (m *Record) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Record) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.UserId) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintWal(dAtA, i, uint64(len(m.UserId))) + i += copy(dAtA[i:], m.UserId) + } + if len(m.Labels) > 0 { + for _, msg := range m.Labels { + dAtA[i] = 0x12 + i++ + i = encodeVarintWal(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if len(m.Samples) > 0 { + for _, msg := range m.Samples { + dAtA[i] = 0x1a + i++ + i = encodeVarintWal(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *Labels) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Labels) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Fingerprint != 0 { + dAtA[i] = 0x8 + i++ + i = encodeVarintWal(dAtA, i, uint64(m.Fingerprint)) + } + if len(m.Labels) > 0 { + for _, msg := range m.Labels { + dAtA[i] = 0x12 + i++ + i = encodeVarintWal(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func (m *Sample) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Sample) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Fingerprint != 0 { + dAtA[i] = 0x8 + i++ + i = encodeVarintWal(dAtA, i, uint64(m.Fingerprint)) + } + if m.Timestamp != 0 { + dAtA[i] = 0x10 + i++ + i = encodeVarintWal(dAtA, i, uint64(m.Timestamp)) + } + if m.Value != 0 { + dAtA[i] = 0x19 + i++ + encoding_binary.LittleEndian.PutUint64(dAtA[i:], uint64(math.Float64bits(float64(m.Value)))) + i += 8 + } + return i, nil +} + +func (m *Series) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Series) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.UserId) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintWal(dAtA, i, uint64(len(m.UserId))) + i += copy(dAtA[i:], m.UserId) + } + if m.Fingerprint != 0 { + dAtA[i] = 0x10 + i++ + i = encodeVarintWal(dAtA, i, uint64(m.Fingerprint)) + } + if len(m.Labels) > 0 { + for _, msg := range m.Labels { + dAtA[i] = 0x1a + i++ + i = encodeVarintWal(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if len(m.Chunks) > 0 { + for _, msg := range m.Chunks { + dAtA[i] = 0x22 + i++ + i = encodeVarintWal(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + return i, nil +} + +func encodeVarintWal(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *Record) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.UserId) + if l > 0 { + n += 1 + l + sovWal(uint64(l)) + } + if len(m.Labels) > 0 { + for _, e := range m.Labels { + l = e.Size() + n += 1 + l + sovWal(uint64(l)) + } + } + if len(m.Samples) > 0 { + for _, e := range m.Samples { + l = e.Size() + n += 1 + l + sovWal(uint64(l)) + } + } + return n +} + +func (m *Labels) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Fingerprint != 0 { + n += 1 + sovWal(uint64(m.Fingerprint)) + } + if len(m.Labels) > 0 { + for _, e := range m.Labels { + l = e.Size() + n += 1 + l + sovWal(uint64(l)) + } + } + return n +} + +func (m *Sample) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Fingerprint != 0 { + n += 1 + sovWal(uint64(m.Fingerprint)) + } + if m.Timestamp != 0 { + n += 1 + sovWal(uint64(m.Timestamp)) + } + if m.Value != 0 { + n += 9 + } + return n +} + +func (m *Series) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.UserId) + if l > 0 { + n += 1 + l + sovWal(uint64(l)) + } + if m.Fingerprint != 0 { + n += 1 + sovWal(uint64(m.Fingerprint)) + } + if len(m.Labels) > 0 { + for _, e := range m.Labels { + l = e.Size() + n += 1 + l + sovWal(uint64(l)) + } + } + if len(m.Chunks) > 0 { + for _, e := range m.Chunks { + l = e.Size() + n += 1 + l + sovWal(uint64(l)) + } + } + return n +} + +func sovWal(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozWal(x uint64) (n int) { + return sovWal(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *Record) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Record{`, + `UserId:` + fmt.Sprintf("%v", this.UserId) + `,`, + `Labels:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Labels), "Labels", "Labels", 1), `&`, ``, 1) + `,`, + `Samples:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Samples), "Sample", "Sample", 1), `&`, ``, 1) + `,`, + `}`, + }, "") + return s +} +func (this *Labels) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Labels{`, + `Fingerprint:` + fmt.Sprintf("%v", this.Fingerprint) + `,`, + `Labels:` + fmt.Sprintf("%v", this.Labels) + `,`, + `}`, + }, "") + return s +} +func (this *Sample) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Sample{`, + `Fingerprint:` + fmt.Sprintf("%v", this.Fingerprint) + `,`, + `Timestamp:` + fmt.Sprintf("%v", this.Timestamp) + `,`, + `Value:` + fmt.Sprintf("%v", this.Value) + `,`, + `}`, + }, "") + return s +} +func (this *Series) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Series{`, + `UserId:` + fmt.Sprintf("%v", this.UserId) + `,`, + `Fingerprint:` + fmt.Sprintf("%v", this.Fingerprint) + `,`, + `Labels:` + fmt.Sprintf("%v", this.Labels) + `,`, + `Chunks:` + strings.Replace(strings.Replace(fmt.Sprintf("%v", this.Chunks), "Chunk", "client.Chunk", 1), `&`, ``, 1) + `,`, + `}`, + }, "") + return s +} +func valueToStringWal(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *Record) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Record: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Record: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field UserId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthWal + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthWal + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.UserId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthWal + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthWal + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Labels = append(m.Labels, Labels{}) + if err := m.Labels[len(m.Labels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Samples", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthWal + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthWal + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Samples = append(m.Samples, Sample{}) + if err := m.Samples[len(m.Samples)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipWal(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthWal + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthWal + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Labels) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Labels: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Labels: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Fingerprint", wireType) + } + m.Fingerprint = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Fingerprint |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthWal + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthWal + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Labels = append(m.Labels, github_com_cortexproject_cortex_pkg_ingester_client.LabelAdapter{}) + if err := m.Labels[len(m.Labels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipWal(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthWal + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthWal + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Sample) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Sample: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Sample: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Fingerprint", wireType) + } + m.Fingerprint = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Fingerprint |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) + } + m.Timestamp = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Timestamp |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 1 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var v uint64 + if (iNdEx + 8) > l { + return io.ErrUnexpectedEOF + } + v = uint64(encoding_binary.LittleEndian.Uint64(dAtA[iNdEx:])) + iNdEx += 8 + m.Value = float64(math.Float64frombits(v)) + default: + iNdEx = preIndex + skippy, err := skipWal(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthWal + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthWal + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Series) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Series: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Series: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field UserId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthWal + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthWal + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.UserId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Fingerprint", wireType) + } + m.Fingerprint = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Fingerprint |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthWal + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthWal + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Labels = append(m.Labels, github_com_cortexproject_cortex_pkg_ingester_client.LabelAdapter{}) + if err := m.Labels[len(m.Labels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Chunks", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowWal + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthWal + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthWal + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Chunks = append(m.Chunks, client.Chunk{}) + if err := m.Chunks[len(m.Chunks)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipWal(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthWal + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthWal + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipWal(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowWal + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowWal + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowWal + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthWal + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthWal + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowWal + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipWal(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthWal + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthWal = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowWal = fmt.Errorf("proto: integer overflow") +) diff --git a/pkg/ingester/wal.proto b/pkg/ingester/wal.proto new file mode 100644 index 00000000000..25b5361dde2 --- /dev/null +++ b/pkg/ingester/wal.proto @@ -0,0 +1,32 @@ +syntax = "proto3"; + +package ingester; + +option go_package = "ingester"; + +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; +import "github.com/cortexproject/cortex/pkg/ingester/client/cortex.proto"; + +message Record { + string user_id = 1; + repeated Labels labels = 2 [(gogoproto.nullable) = false]; + repeated Sample samples = 3 [(gogoproto.nullable) = false]; +} + +message Labels { + uint64 fingerprint = 1; + repeated cortex.LabelPair labels = 2 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter"]; +} + +message Sample { + uint64 fingerprint = 1; + uint64 timestamp = 2; + double value = 3; +} + +message Series { + string user_id = 1; + uint64 fingerprint = 2; + repeated cortex.LabelPair labels = 3 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter"]; + repeated cortex.Chunk chunks = 4 [(gogoproto.nullable) = false]; +} diff --git a/pkg/ingester/wal_test.go b/pkg/ingester/wal_test.go new file mode 100644 index 00000000000..c0baa0989a9 --- /dev/null +++ b/pkg/ingester/wal_test.go @@ -0,0 +1,58 @@ +package ingester + +import ( + "io/ioutil" + "os" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log" + "github.com/stretchr/testify/require" +) + +func init() { + util.Logger = log.NewLogfmtLogger(os.Stdout) +} + +func TestWAL(t *testing.T) { + dirname, err := ioutil.TempDir("", "cortex-wal") + require.NoError(t, err) + + cfg := defaultIngesterTestConfig() + cfg.WALConfig.walEnabled = true + cfg.WALConfig.checkpointEnabled = true + cfg.WALConfig.recover = true + cfg.WALConfig.dir = dirname + cfg.WALConfig.checkpointDuration = 100 * time.Millisecond + + numSeries := 100 + numSamplesPerSeriesPerPush := 10 + numRestarts := 3 + + // Build an ingester, add some samples, then shut it down. + _, ing := newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig()) + userIDs, testData := pushTestSamples(t, ing, numSeries, numSamplesPerSeriesPerPush, 0) + ing.Shutdown() + + for r := 0; r < numRestarts; r++ { + if r == numRestarts-1 { + cfg.WALConfig.walEnabled = false + cfg.WALConfig.checkpointEnabled = false + } + // Start a new ingester and recover the WAL. + _, ing = newTestStore(t, cfg, defaultClientTestConfig(), defaultLimitsTestConfig()) + + for i, userID := range userIDs { + testData[userID] = buildTestMatrix(numSeries, (r+1)*numSamplesPerSeriesPerPush, i) + } + // Check the samples are still there! + retrieveTestSamples(t, ing, userIDs, testData) + + if r != numRestarts-1 { + userIDs, testData = pushTestSamples(t, ing, numSeries, numSamplesPerSeriesPerPush, (r+1)*numSamplesPerSeriesPerPush) + } + + ing.Shutdown() + } +} diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index 5fda43b2274..bd1841a4368 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -119,6 +119,9 @@ type Lifecycler struct { RingName string RingKey string + // Whether to flush if transfer fails on shutdown. + flushOnShutdown bool + // We need to remember the ingester state just in case consul goes away and comes // back empty. And it changes during lifecycle of ingester. stateMtx sync.RWMutex @@ -136,7 +139,8 @@ type Lifecycler struct { } // NewLifecycler makes and starts a new Lifecycler. -func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringName, ringKey string) (*Lifecycler, error) { +func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringName, ringKey string, flushOnShutdown bool) (*Lifecycler, error) { + addr := cfg.Addr if addr == "" { var err error @@ -166,10 +170,11 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa flushTransferer: flushTransferer, KVStore: store, - Addr: fmt.Sprintf("%s:%d", addr, port), - ID: cfg.ID, - RingName: ringName, - RingKey: ringKey, + Addr: fmt.Sprintf("%s:%d", addr, port), + ID: cfg.ID, + RingName: ringName, + RingKey: ringKey, + flushOnShutdown: flushOnShutdown, quit: make(chan struct{}), actorChan: make(chan func()), @@ -694,8 +699,19 @@ func (i *Lifecycler) updateCounters(ringDesc *Desc) { i.countersLock.Unlock() } +// FlushOnShutdown returns if flushing is enabled if transfer fails on a shutdown. +func (i *Lifecycler) FlushOnShutdown() bool { + return i.flushOnShutdown +} + +// SetFlushOnShutdown enables/disables flush on shutdown if transfer fails. +// Passing 'true' enables it, and 'false' disabled it. +func (i *Lifecycler) SetFlushOnShutdown(flushOnShutdown bool) { + i.flushOnShutdown = flushOnShutdown +} + func (i *Lifecycler) processShutdown(ctx context.Context) { - flushRequired := true + flushRequired := i.flushOnShutdown transferStart := time.Now() if err := i.flushTransferer.TransferOut(ctx); err != nil { if err == ErrTransferDisabled { diff --git a/pkg/ring/lifecycler_test.go b/pkg/ring/lifecycler_test.go index ed4921fb5b0..3f27560488d 100644 --- a/pkg/ring/lifecycler_test.go +++ b/pkg/ring/lifecycler_test.go @@ -107,7 +107,7 @@ func TestRingNormaliseMigration(t *testing.T) { var lifecyclerConfig2 = testLifecyclerConfig(ringConfig, "ing2") lifecyclerConfig2.JoinAfter = 100 * time.Second - l2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{}, "ingester", IngesterRingKey) + l2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{}, "ingester", IngesterRingKey, true) require.NoError(t, err) l2.Start() @@ -145,7 +145,7 @@ func TestLifecycler_HealthyInstancesCount(t *testing.T) { lifecyclerConfig1.HeartbeatPeriod = 100 * time.Millisecond lifecyclerConfig1.JoinAfter = 100 * time.Millisecond - lifecycler1, err := NewLifecycler(lifecyclerConfig1, &flushTransferer{}, "ingester", IngesterRingKey) + lifecycler1, err := NewLifecycler(lifecyclerConfig1, &flushTransferer{}, "ingester", IngesterRingKey, true) require.NoError(t, err) assert.Equal(t, 0, lifecycler1.HealthyInstancesCount()) @@ -161,7 +161,7 @@ func TestLifecycler_HealthyInstancesCount(t *testing.T) { lifecyclerConfig2.HeartbeatPeriod = 100 * time.Millisecond lifecyclerConfig2.JoinAfter = 100 * time.Millisecond - lifecycler2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{}, "ingester", IngesterRingKey) + lifecycler2, err := NewLifecycler(lifecyclerConfig2, &flushTransferer{}, "ingester", IngesterRingKey, true) require.NoError(t, err) assert.Equal(t, 0, lifecycler2.HealthyInstancesCount()) @@ -185,7 +185,7 @@ func TestLifecycler_NilFlushTransferer(t *testing.T) { lifecyclerConfig := testLifecyclerConfig(ringConfig, "ing1") // Create a lifecycler with nil FlushTransferer to make sure it operates correctly - lifecycler, err := NewLifecycler(lifecyclerConfig, nil, "ingester", IngesterRingKey) + lifecycler, err := NewLifecycler(lifecyclerConfig, nil, "ingester", IngesterRingKey, true) require.NoError(t, err) lifecycler.Start() @@ -208,12 +208,12 @@ func TestLifecycler_TwoRingsWithDifferentKeysOnTheSameKVStore(t *testing.T) { lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "instance-1") lifecyclerConfig2 := testLifecyclerConfig(ringConfig, "instance-2") - lifecycler1, err := NewLifecycler(lifecyclerConfig1, nil, "service-1", "ring-1") + lifecycler1, err := NewLifecycler(lifecyclerConfig1, nil, "service-1", "ring-1", true) require.NoError(t, err) lifecycler1.Start() defer lifecycler1.Shutdown() - lifecycler2, err := NewLifecycler(lifecyclerConfig2, nil, "service-2", "ring-2") + lifecycler2, err := NewLifecycler(lifecyclerConfig2, nil, "service-2", "ring-2", true) require.NoError(t, err) lifecycler2.Start() defer lifecycler2.Shutdown() @@ -249,7 +249,7 @@ func TestRingRestart(t *testing.T) { // Add an 'ingester' with normalised tokens. lifecyclerConfig1 := testLifecyclerConfig(ringConfig, "ing1") - l1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey) + l1, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey, true) require.NoError(t, err) l1.Start() @@ -263,7 +263,7 @@ func TestRingRestart(t *testing.T) { token := l1.tokens[0] // Add a second ingester with the same settings, so it will think it has restarted - l2, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey) + l2, err := NewLifecycler(lifecyclerConfig1, &nopFlushTransferer{}, "ingester", IngesterRingKey, true) require.NoError(t, err) l2.Start() @@ -328,11 +328,11 @@ func TestCheckReady(t *testing.T) { defer r.Stop() cfg := testLifecyclerConfig(ringConfig, "ring1") cfg.MinReadyDuration = 1 * time.Nanosecond - l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey) + l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true) l1.Start() require.NoError(t, err) - l1.setTokens([]uint32{1}) + l1.setTokens(Tokens([]uint32{1})) // Delete the ring key before checking ready err = l1.CheckReady(context.Background()) @@ -366,7 +366,7 @@ func TestTokensOnDisk(t *testing.T) { lifecyclerConfig.TokensFilePath = tokenDir + "/tokens" // Start first ingester. - l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey) + l1, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true) require.NoError(t, err) l1.Start() // Check this ingester joined, is active, and has 512 token. @@ -389,7 +389,7 @@ func TestTokensOnDisk(t *testing.T) { // Start new ingester at same token directory. lifecyclerConfig.ID = "ing2" - l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey) + l2, err := NewLifecycler(lifecyclerConfig, &noopFlushTransferer{}, "ingester", IngesterRingKey, true) require.NoError(t, err) l2.Start() defer l2.Shutdown() @@ -458,7 +458,7 @@ func TestJoinInLeavingState(t *testing.T) { }) require.NoError(t, err) - l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey) + l1, err := NewLifecycler(cfg, &nopFlushTransferer{}, "ingester", IngesterRingKey, true) l1.Start() require.NoError(t, err) diff --git a/pkg/ruler/ruler.go b/pkg/ruler/ruler.go index 441292e3a22..81943f0652e 100644 --- a/pkg/ruler/ruler.go +++ b/pkg/ruler/ruler.go @@ -149,7 +149,7 @@ func NewRuler(cfg Config, engine *promql.Engine, queryable promStorage.Queryable // If sharding is enabled, create/join a ring to distribute tokens to // the ruler if cfg.EnableSharding { - ruler.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, ruler, "ruler", ring.RulerRingKey) + ruler.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, ruler, "ruler", ring.RulerRingKey, true) if err != nil { return nil, err }