diff --git a/pkg/ingester/index/index.go b/pkg/ingester/index/index.go index 6e072e70335..562eb4eeb57 100644 --- a/pkg/ingester/index/index.go +++ b/pkg/ingester/index/index.go @@ -32,9 +32,11 @@ func New() *InvertedIndex { } // Add a fingerprint under the specified labels. +// NOTE: memory for `labels` is unsafe; anything retained beyond the +// life of this function must be copied func (ii *InvertedIndex) Add(labels []client.LabelAdapter, fp model.Fingerprint) labels.Labels { shard := &ii.shards[util.HashFP(fp)%indexShards] - return shard.add(labels, fp) + return shard.add(labels, fp) // add() returns 'interned' values so the original labels are not retained } // Lookup all fingerprints for the provided matchers. @@ -108,7 +110,9 @@ func copyString(s string) string { return string([]byte(s)) } -// add metric to the index; return all the name/value pairs as strings from the index, sorted +// add metric to the index; return all the name/value pairs as a fresh +// sorted slice, referencing 'interned' strings from the index so that +// no references are retained to the memory of `metric`. func (shard *indexShard) add(metric []client.LabelAdapter, fp model.Fingerprint) labels.Labels { shard.mtx.Lock() defer shard.mtx.Unlock() diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index ffa80ced26f..af0b42b31f4 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -295,8 +295,11 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. } var lastPartialErr *validationError + // NOTE: because we use `unsafe` in deserialisation, we must not + // retain anything from `req` past the call to ReuseSlice for _, ts := range req.Timeseries { for _, s := range ts.Samples { + // append() copies the memory in `ts.Labels` err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source) if err == nil { continue @@ -319,6 +322,8 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. return &client.WriteResponse{}, nil } +// NOTE: memory for `labels` is unsafe; anything retained beyond the +// life of this function must be copied func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum) error { labels.removeBlanks() @@ -336,6 +341,7 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, if i.stopped { return fmt.Errorf("ingester stopping") } + // getOrCreateSeries copies the memory for `labels`. state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels) if err != nil { if ve, ok := err.(*validationError); ok { diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 0999564ac5a..cdebe120a18 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -132,6 +132,8 @@ func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, erro return state, ok, nil } +// NOTE: memory for `labels` is unsafe; anything retained beyond the +// life of this function must be copied func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labels []client.LabelAdapter) (*userState, model.Fingerprint, *memorySeries, error) { state, ok := us.get(userID) @@ -173,6 +175,8 @@ func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labe return state, fp, series, err } +// NOTE: memory for `metric` is unsafe; anything retained beyond the +// life of this function must be copied func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeries, error) { rawFP := client.FastFingerprint(metric) u.fpLocker.Lock(rawFP) @@ -198,6 +202,7 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri return fp, nil, makeLimitError(perUserSeriesLimit, err) } + // MetricNameFromLabelAdapters returns a copy of the string in `metric` metricName, err := extract.MetricNameFromLabelAdapters(metric) if err != nil { u.fpLocker.Unlock(fp) @@ -208,13 +213,14 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri err = u.canAddSeriesFor(string(metricName)) if err != nil { u.fpLocker.Unlock(fp) - return fp, nil, makeMetricLimitError(perMetricSeriesLimit, client.FromLabelAdaptersToLabels(metric), err) + // note we copy `metric` as retaining a reference is unsafe. + return fp, nil, makeMetricLimitError(perMetricSeriesLimit, client.FromLabelAdaptersToLabelsWithCopy(metric), err) } u.memSeriesCreatedTotal.Inc() memSeries.Inc() - labels := u.index.Add(metric, fp) + labels := u.index.Add(metric, fp) // Add() returns 'interned' values so the original labels are not retained series = newMemorySeries(labels) u.fpToSeries.put(fp, series)