From 7ed396d33b798031645928273932e08be5631745 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 20 Jan 2020 12:05:59 +0000 Subject: [PATCH 1/2] Comment unsafe memory usage in ingester push path Signed-off-by: Bryan Boreham --- pkg/ingester/index/index.go | 8 ++++++-- pkg/ingester/ingester.go | 4 ++++ pkg/ingester/user_state.go | 7 ++++++- 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/pkg/ingester/index/index.go b/pkg/ingester/index/index.go index 6e072e70335..562eb4eeb57 100644 --- a/pkg/ingester/index/index.go +++ b/pkg/ingester/index/index.go @@ -32,9 +32,11 @@ func New() *InvertedIndex { } // Add a fingerprint under the specified labels. +// NOTE: memory for `labels` is unsafe; anything retained beyond the +// life of this function must be copied func (ii *InvertedIndex) Add(labels []client.LabelAdapter, fp model.Fingerprint) labels.Labels { shard := &ii.shards[util.HashFP(fp)%indexShards] - return shard.add(labels, fp) + return shard.add(labels, fp) // add() returns 'interned' values so the original labels are not retained } // Lookup all fingerprints for the provided matchers. @@ -108,7 +110,9 @@ func copyString(s string) string { return string([]byte(s)) } -// add metric to the index; return all the name/value pairs as strings from the index, sorted +// add metric to the index; return all the name/value pairs as a fresh +// sorted slice, referencing 'interned' strings from the index so that +// no references are retained to the memory of `metric`. func (shard *indexShard) add(metric []client.LabelAdapter, fp model.Fingerprint) labels.Labels { shard.mtx.Lock() defer shard.mtx.Unlock() diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index ffa80ced26f..507d1059d98 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -295,6 +295,8 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. } var lastPartialErr *validationError + // NOTE: because we use `unsafe` in deserialisation, we must not + // retain anything from `req` past the call to ReuseSlice for _, ts := range req.Timeseries { for _, s := range ts.Samples { err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source) @@ -319,6 +321,8 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. return &client.WriteResponse{}, nil } +// NOTE: memory for `labels` is unsafe; anything retained beyond the +// life of this function must be copied func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum) error { labels.removeBlanks() diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 0999564ac5a..629861a5728 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -132,6 +132,8 @@ func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, erro return state, ok, nil } +// NOTE: memory for `labels` is unsafe; anything retained beyond the +// life of this function must be copied func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labels []client.LabelAdapter) (*userState, model.Fingerprint, *memorySeries, error) { state, ok := us.get(userID) @@ -173,6 +175,8 @@ func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labe return state, fp, series, err } +// NOTE: memory for `metric` is unsafe; anything retained beyond the +// life of this function must be copied func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeries, error) { rawFP := client.FastFingerprint(metric) u.fpLocker.Lock(rawFP) @@ -198,6 +202,7 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri return fp, nil, makeLimitError(perUserSeriesLimit, err) } + // MetricNameFromLabelAdapters returns a copy of the string in `metric` metricName, err := extract.MetricNameFromLabelAdapters(metric) if err != nil { u.fpLocker.Unlock(fp) @@ -214,7 +219,7 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri u.memSeriesCreatedTotal.Inc() memSeries.Inc() - labels := u.index.Add(metric, fp) + labels := u.index.Add(metric, fp) // Add() returns 'interned' values so the original labels are not retained series = newMemorySeries(labels) u.fpToSeries.put(fp, series) From 0b05de8130395ef3a216298c68c5a9a6cf6e92a2 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Mon, 20 Jan 2020 12:09:10 +0000 Subject: [PATCH 2/2] Copy label values to avoid unsafe memory use Signed-off-by: Bryan Boreham --- pkg/ingester/ingester.go | 2 ++ pkg/ingester/user_state.go | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 507d1059d98..af0b42b31f4 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -299,6 +299,7 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. // retain anything from `req` past the call to ReuseSlice for _, ts := range req.Timeseries { for _, s := range ts.Samples { + // append() copies the memory in `ts.Labels` err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source) if err == nil { continue @@ -340,6 +341,7 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, if i.stopped { return fmt.Errorf("ingester stopping") } + // getOrCreateSeries copies the memory for `labels`. state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels) if err != nil { if ve, ok := err.(*validationError); ok { diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index 629861a5728..cdebe120a18 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -213,7 +213,8 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri err = u.canAddSeriesFor(string(metricName)) if err != nil { u.fpLocker.Unlock(fp) - return fp, nil, makeMetricLimitError(perMetricSeriesLimit, client.FromLabelAdaptersToLabels(metric), err) + // note we copy `metric` as retaining a reference is unsafe. + return fp, nil, makeMetricLimitError(perMetricSeriesLimit, client.FromLabelAdaptersToLabelsWithCopy(metric), err) } u.memSeriesCreatedTotal.Inc()