diff --git a/pkg/ingester/errors.go b/pkg/ingester/errors.go new file mode 100644 index 00000000000..6cc00b978e6 --- /dev/null +++ b/pkg/ingester/errors.go @@ -0,0 +1,68 @@ +package ingester + +import ( + "fmt" + "net/http" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/weaveworks/common/httpgrpc" +) + +type validationError struct { + err error // underlying error + errorType string + code int + noReport bool // if true, error will be counted but not reported to caller + labels labels.Labels +} + +func makeLimitError(errorType string, err error) error { + return &validationError{ + errorType: errorType, + err: err, + code: http.StatusTooManyRequests, + } +} + +func makeNoReportError(errorType string) error { + return &validationError{ + errorType: errorType, + noReport: true, + } +} + +func makeMetricValidationError(errorType string, labels labels.Labels, err error) error { + return &validationError{ + errorType: errorType, + err: err, + code: http.StatusBadRequest, + labels: labels, + } +} + +func makeMetricLimitError(errorType string, labels labels.Labels, err error) error { + return &validationError{ + errorType: errorType, + err: err, + code: http.StatusTooManyRequests, + labels: labels, + } +} + +func (e *validationError) Error() string { + if e.err == nil { + return e.errorType + } + if e.labels == nil { + return e.err.Error() + } + return fmt.Sprintf("%s for series %s", e.err.Error(), e.labels.String()) +} + +// WrappedError returns a HTTP gRPC error than is correctly forwarded over gRPC. +func (e *validationError) WrappedError() error { + return httpgrpc.ErrorFromHTTPResponse(&httpgrpc.HTTPResponse{ + Code: int32(e.code), + Body: []byte(e.Error()), + }) +} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 2cbb5899d75..ba8f4597f46 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -293,7 +293,7 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. if err != nil { return nil, fmt.Errorf("no user id") } - var lastPartialErr error + var lastPartialErr *validationError for _, ts := range req.Timeseries { for _, s := range ts.Samples { @@ -303,12 +303,9 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. } i.metrics.ingestedSamplesFail.Inc() - if httpResp, ok := httpgrpc.HTTPResponseFromError(err); ok { - switch httpResp.Code { - case http.StatusBadRequest, http.StatusTooManyRequests: - lastPartialErr = err - continue - } + if ve, ok := err.(*validationError); ok { + lastPartialErr = ve + continue } return nil, err @@ -316,7 +313,10 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. } client.ReuseSlice(req.Timeseries) - return &client.WriteResponse{}, lastPartialErr + if lastPartialErr != nil { + return &client.WriteResponse{}, lastPartialErr.WrappedError() + } + return &client.WriteResponse{}, nil } func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum) error { @@ -338,6 +338,9 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, } state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels) if err != nil { + if ve, ok := err.(*validationError); ok { + state.discardedSamples.WithLabelValues(ve.errorType).Inc() + } state = nil // don't want to unlock the fp if there is an error return err } @@ -357,13 +360,11 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, Value: value, Timestamp: timestamp, }); err != nil { - if mse, ok := err.(*memorySeriesError); ok { - state.discardedSamples.WithLabelValues(mse.errorType).Inc() - if mse.noReport { + if ve, ok := err.(*validationError); ok { + state.discardedSamples.WithLabelValues(ve.errorType).Inc() + if ve.noReport { return nil } - // Use a dumb string template to avoid the message being parsed as a template - err = httpgrpc.Errorf(http.StatusBadRequest, "%s", mse.message) } return err } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 02bff771827..f9574405bd8 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -317,16 +317,16 @@ func TestIngesterAppendOutOfOrderAndDuplicate(t *testing.T) { // Earlier sample than previous one. err = ing.append(ctx, userID, m, 0, 0, client.API) require.Contains(t, err.Error(), "sample timestamp out of order") - errResp, ok := httpgrpc.HTTPResponseFromError(err) + errResp, ok := err.(*validationError) require.True(t, ok) - require.Equal(t, errResp.Code, int32(400)) + require.Equal(t, errResp.code, 400) // Same timestamp as previous sample, but different value. err = ing.append(ctx, userID, m, 1, 1, client.API) require.Contains(t, err.Error(), "sample with repeated timestamp but different value") - errResp, ok = httpgrpc.HTTPResponseFromError(err) + errResp, ok = err.(*validationError) require.True(t, ok) - require.Equal(t, errResp.Code, int32(400)) + require.Equal(t, errResp.code, 400) } // Test that blank labels are removed by the ingester @@ -523,9 +523,19 @@ func benchmarkIngesterSeriesCreationLocking(b *testing.B, parallelism int) { } func BenchmarkIngesterPush(b *testing.B) { + limits := defaultLimitsTestConfig() + benchmarkIngesterPush(b, limits, false) +} + +func BenchmarkIngesterPushErrors(b *testing.B) { + limits := defaultLimitsTestConfig() + limits.MaxLocalSeriesPerMetric = 1 + benchmarkIngesterPush(b, limits, true) +} + +func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpected bool) { cfg := defaultIngesterTestConfig() clientCfg := defaultClientTestConfig() - limits := defaultLimitsTestConfig() const ( series = 100 @@ -567,7 +577,9 @@ func BenchmarkIngesterPush(b *testing.B) { allSamples[i].TimestampMs = int64(j + 1) } _, err := ing.Push(ctx, client.ToWriteRequest(allLabels, allSamples, client.API)) - require.NoError(b, err) + if !errorsExpected { + require.NoError(b, err) + } } ing.Shutdown() } diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 5eebd7d0cca..b86dee3d13d 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -111,7 +111,7 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien // 400 error to the client. The client (Prometheus) will not retry on 400, and // we actually ingested all samples which haven't failed. if err == tsdb.ErrOutOfBounds || err == tsdb.ErrOutOfOrderSample || err == tsdb.ErrAmendSample { - lastPartialErr = httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + lastPartialErr = err continue } @@ -135,7 +135,10 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien client.ReuseSlice(req.Timeseries) - return &client.WriteResponse{}, lastPartialErr + if lastPartialErr != nil { + return &client.WriteResponse{}, httpgrpc.Errorf(http.StatusBadRequest, lastPartialErr.Error()) + } + return &client.WriteResponse{}, nil } func (i *Ingester) v2Query(ctx old_ctx.Context, req *client.QueryRequest) (*client.QueryResponse, error) { diff --git a/pkg/ingester/series.go b/pkg/ingester/series.go index ddbc470fb02..7ce8f42f8bc 100644 --- a/pkg/ingester/series.go +++ b/pkg/ingester/series.go @@ -41,19 +41,6 @@ type memorySeries struct { lastSampleValue model.SampleValue } -type memorySeriesError struct { - message string - errorType string - noReport bool // if true, error will be counted but not reported to caller -} - -func (error *memorySeriesError) Error() string { - if error.message == "" { - return error.errorType - } - return error.message -} - // newMemorySeries returns a pointer to a newly allocated memorySeries for the // given metric. func newMemorySeries(m labels.Labels) *memorySeries { @@ -71,24 +58,20 @@ func (s *memorySeries) add(v model.SamplePair) error { // If we don't know what the last sample value is, silently discard. // This will mask some errors but better than complaining when we don't really know. if !s.lastSampleValueSet { - return &memorySeriesError{errorType: "duplicate-timestamp", noReport: true} + return makeNoReportError("duplicate-timestamp") } // If both timestamp and sample value are the same as for the last append, // ignore as they are a common occurrence when using client-side timestamps // (e.g. Pushgateway or federation). if v.Value.Equal(s.lastSampleValue) { - return &memorySeriesError{errorType: "duplicate-sample", noReport: true} - } - return &memorySeriesError{ - message: fmt.Sprintf("sample with repeated timestamp but different value for series %v; last value: %v, incoming value: %v", s.metric, s.lastSampleValue, v.Value), - errorType: "new-value-for-timestamp", + return makeNoReportError("duplicate-sample") } + return makeMetricValidationError("new-value-for-timestamp", s.metric, + fmt.Errorf("sample with repeated timestamp but different value; last value: %v, incoming value: %v", s.lastSampleValue, v.Value)) } if v.Timestamp < s.lastTime { - return &memorySeriesError{ - message: fmt.Sprintf("sample timestamp out of order for series %v; last timestamp: %v, incoming timestamp: %v", s.metric, s.lastTime, v.Timestamp), - errorType: "sample-out-of-order", - } + return makeMetricValidationError("sample-out-of-order", s.metric, + fmt.Errorf("sample timestamp out of order; last timestamp: %v, incoming timestamp: %v", s.lastTime, v.Timestamp)) } if len(s.chunkDescs) == 0 || s.headChunkClosed { diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index e27d68411fd..0999564ac5a 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -195,8 +195,7 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri err := u.limiter.AssertMaxSeriesPerUser(u.userID, u.fpToSeries.length()) if err != nil { u.fpLocker.Unlock(fp) - u.discardedSamples.WithLabelValues(perUserSeriesLimit).Inc() - return fp, nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error()) + return fp, nil, makeLimitError(perUserSeriesLimit, err) } metricName, err := extract.MetricNameFromLabelAdapters(metric) @@ -209,8 +208,7 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri err = u.canAddSeriesFor(string(metricName)) if err != nil { u.fpLocker.Unlock(fp) - u.discardedSamples.WithLabelValues(perMetricSeriesLimit).Inc() - return fp, nil, httpgrpc.Errorf(http.StatusTooManyRequests, "%s for: %s", err.Error(), metric) + return fp, nil, makeMetricLimitError(perMetricSeriesLimit, client.FromLabelAdaptersToLabels(metric), err) } u.memSeriesCreatedTotal.Inc()