Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions pkg/ingester/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ func New() *InvertedIndex {
}

// Add a fingerprint under the specified labels.
// NOTE: memory for `labels` is unsafe; anything retained beyond the
// life of this function must be copied
func (ii *InvertedIndex) Add(labels []client.LabelAdapter, fp model.Fingerprint) labels.Labels {
shard := &ii.shards[util.HashFP(fp)%indexShards]
return shard.add(labels, fp)
return shard.add(labels, fp) // add() returns 'interned' values so the original labels are not retained
}

// Lookup all fingerprints for the provided matchers.
Expand Down Expand Up @@ -108,7 +110,9 @@ func copyString(s string) string {
return string([]byte(s))
}

// add metric to the index; return all the name/value pairs as strings from the index, sorted
// add metric to the index; return all the name/value pairs as a fresh
// sorted slice, referencing 'interned' strings from the index so that
// no references are retained to the memory of `metric`.
func (shard *indexShard) add(metric []client.LabelAdapter, fp model.Fingerprint) labels.Labels {
shard.mtx.Lock()
defer shard.mtx.Unlock()
Expand Down
6 changes: 6 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,8 +295,11 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
}
var lastPartialErr *validationError

// NOTE: because we use `unsafe` in deserialisation, we must not
// retain anything from `req` past the call to ReuseSlice
for _, ts := range req.Timeseries {
for _, s := range ts.Samples {
// append() copies the memory in `ts.Labels`
err := i.append(ctx, userID, ts.Labels, model.Time(s.TimestampMs), model.SampleValue(s.Value), req.Source)
if err == nil {
continue
Expand All @@ -319,6 +322,8 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
return &client.WriteResponse{}, nil
}

// NOTE: memory for `labels` is unsafe; anything retained beyond the
// life of this function must be copied
func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum) error {
labels.removeBlanks()

Expand All @@ -336,6 +341,7 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs,
if i.stopped {
return fmt.Errorf("ingester stopping")
}
// getOrCreateSeries copies the memory for `labels`.
state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels)
if err != nil {
if ve, ok := err.(*validationError); ok {
Expand Down
10 changes: 8 additions & 2 deletions pkg/ingester/user_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, erro
return state, ok, nil
}

// NOTE: memory for `labels` is unsafe; anything retained beyond the
// life of this function must be copied
func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labels []client.LabelAdapter) (*userState, model.Fingerprint, *memorySeries, error) {

state, ok := us.get(userID)
Expand Down Expand Up @@ -173,6 +175,8 @@ func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labe
return state, fp, series, err
}

// NOTE: memory for `metric` is unsafe; anything retained beyond the
// life of this function must be copied
func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeries, error) {
rawFP := client.FastFingerprint(metric)
u.fpLocker.Lock(rawFP)
Expand All @@ -198,6 +202,7 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri
return fp, nil, makeLimitError(perUserSeriesLimit, err)
}

// MetricNameFromLabelAdapters returns a copy of the string in `metric`
metricName, err := extract.MetricNameFromLabelAdapters(metric)
if err != nil {
u.fpLocker.Unlock(fp)
Expand All @@ -208,13 +213,14 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri
err = u.canAddSeriesFor(string(metricName))
if err != nil {
u.fpLocker.Unlock(fp)
return fp, nil, makeMetricLimitError(perMetricSeriesLimit, client.FromLabelAdaptersToLabels(metric), err)
// note we copy `metric` as retaining a reference is unsafe.
return fp, nil, makeMetricLimitError(perMetricSeriesLimit, client.FromLabelAdaptersToLabelsWithCopy(metric), err)
}

u.memSeriesCreatedTotal.Inc()
memSeries.Inc()

labels := u.index.Add(metric, fp)
labels := u.index.Add(metric, fp) // Add() returns 'interned' values so the original labels are not retained
series = newMemorySeries(labels)
u.fpToSeries.put(fp, series)

Expand Down