diff --git a/pkg/ingester/errors.go b/pkg/ingester/errors.go index 8d822240ba5..c4c357e2406 100644 --- a/pkg/ingester/errors.go +++ b/pkg/ingester/errors.go @@ -4,7 +4,6 @@ import ( "fmt" "net/http" - "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/httpgrpc" ) @@ -50,11 +49,6 @@ func makeMetricLimitError(errorType string, labels labels.Labels, err error) err } } -func (e *validationError) WrapWithUser(userID string) *validationError { - e.err = wrapWithUser(e.err, userID) - return e -} - func (e *validationError) Error() string { if e.err == nil { return e.errorType @@ -65,14 +59,15 @@ func (e *validationError) Error() string { return fmt.Sprintf("%s for series %s", e.err.Error(), e.labels.String()) } -// WrappedError returns a HTTP gRPC error than is correctly forwarded over gRPC. -func (e *validationError) WrappedError() error { +// returns a HTTP gRPC error than is correctly forwarded over gRPC, with no reference to `e` retained. +func grpcForwardableError(userID string, code int, e error) error { return httpgrpc.ErrorFromHTTPResponse(&httpgrpc.HTTPResponse{ - Code: int32(e.code), - Body: []byte(e.Error()), + Code: int32(code), + Body: []byte(wrapWithUser(e, userID).Error()), }) } +// Note: does not retain a reference to `err` func wrapWithUser(err error, userID string) error { - return errors.Wrapf(err, "user=%s", userID) + return fmt.Errorf("user=%s: %s", userID, err) } diff --git a/pkg/ingester/index/index.go b/pkg/ingester/index/index.go index 6e072e70335..562eb4eeb57 100644 --- a/pkg/ingester/index/index.go +++ b/pkg/ingester/index/index.go @@ -32,9 +32,11 @@ func New() *InvertedIndex { } // Add a fingerprint under the specified labels. +// NOTE: memory for `labels` is unsafe; anything retained beyond the +// life of this function must be copied func (ii *InvertedIndex) Add(labels []client.LabelAdapter, fp model.Fingerprint) labels.Labels { shard := &ii.shards[util.HashFP(fp)%indexShards] - return shard.add(labels, fp) + return shard.add(labels, fp) // add() returns 'interned' values so the original labels are not retained } // Lookup all fingerprints for the provided matchers. @@ -108,7 +110,9 @@ func copyString(s string) string { return string([]byte(s)) } -// add metric to the index; return all the name/value pairs as strings from the index, sorted +// add metric to the index; return all the name/value pairs as a fresh +// sorted slice, referencing 'interned' strings from the index so that +// no references are retained to the memory of `metric`. func (shard *indexShard) add(metric []client.LabelAdapter, fp model.Fingerprint) labels.Labels { shard.mtx.Lock() defer shard.mtx.Unlock() diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 2c8a32327dd..c59b7231bbf 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -277,6 +277,8 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. return i.v2Push(ctx, req) } + // NOTE: because we use `unsafe` in deserialisation, we must not + // retain anything from `req` past the call to ReuseSlice defer client.ReuseSlice(req.Timeseries) userID, err := user.ExtractOrgID(ctx) @@ -301,6 +303,7 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. for _, ts := range req.Timeseries { for _, s := range ts.Samples { + // append() copies the memory in `ts.Labels` except on the error path err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source, record) if err == nil { continue @@ -312,12 +315,14 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. continue } - return nil, wrapWithUser(err, userID) + // non-validation error: abandon this request + return nil, grpcForwardableError(userID, http.StatusInternalServerError, err) } } if lastPartialErr != nil { - return &client.WriteResponse{}, lastPartialErr.WrapWithUser(userID).WrappedError() + // grpcForwardableError turns the error into a string so it no longer references `req` + return &client.WriteResponse{}, grpcForwardableError(userID, lastPartialErr.code, lastPartialErr) } if record != nil { @@ -331,6 +336,8 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. return &client.WriteResponse{}, nil } +// NOTE: memory for `labels` is unsafe; anything retained beyond the +// life of this function must be copied func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum, record *Record) error { labels.removeBlanks() @@ -349,6 +356,7 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, return fmt.Errorf("ingester stopping") } + // getOrCreateSeries copies the memory for `labels`, except on the error path. state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels, record) if err != nil { if ve, ok := err.(*validationError); ok { diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 309a2e2ce20..de18b1497f8 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -105,6 +105,8 @@ func NewV2(cfg Config, clientConfig client.Config, limits *validation.Overrides, func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.WriteResponse, error) { var firstPartialErr error + // NOTE: because we use `unsafe` in deserialisation, we must not + // retain anything from `req` past the call to ReuseSlice defer client.ReuseSlice(req.Timeseries) userID, err := user.ExtractOrgID(ctx) diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index f735fc0b013..214cb146cac 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -155,12 +155,17 @@ func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, erro return state, ok, nil } +// NOTE: memory for `labels` is unsafe; anything retained beyond the +// life of this function must be copied func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labels []client.LabelAdapter, record *Record) (*userState, model.Fingerprint, *memorySeries, error) { state := us.getOrCreate(userID) + // WARNING: `err` may have a reference to unsafe memory in `labels` fp, series, err := state.getSeries(labels, record) return state, fp, series, err } +// NOTE: memory for `metric` is unsafe; anything retained beyond the +// life of this function must be copied func (u *userState) getSeries(metric labelPairs, record *Record) (model.Fingerprint, *memorySeries, error) { rawFP := client.FastFingerprint(metric) u.fpLocker.Lock(rawFP) @@ -197,6 +202,7 @@ func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric lab } } + // MetricNameFromLabelAdapters returns a copy of the string in `metric` metricName, err := extract.MetricNameFromLabelAdapters(metric) if err != nil { return nil, err @@ -205,6 +211,7 @@ func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric lab if !recovery { // Check if the per-metric limit has been exceeded if err = u.canAddSeriesFor(string(metricName)); err != nil { + // WARNING: returns a reference to `metric` return nil, makeMetricLimitError(perMetricSeriesLimit, client.FromLabelAdaptersToLabels(metric), err) } } @@ -219,7 +226,7 @@ func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric lab }) } - labels := u.index.Add(metric, fp) + labels := u.index.Add(metric, fp) // Add() returns 'interned' values so the original labels are not retained series := newMemorySeries(labels) u.fpToSeries.put(fp, series)