From d5da4730b16fde432b0823f876a234ebec6a304b Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 18 Dec 2019 09:49:40 +0000 Subject: [PATCH 1/3] Benchmark ingester.Push error case Signed-off-by: Bryan Boreham --- pkg/ingester/ingester_test.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 02bff771827..bdf2c0f99b9 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -523,9 +523,19 @@ func benchmarkIngesterSeriesCreationLocking(b *testing.B, parallelism int) { } func BenchmarkIngesterPush(b *testing.B) { + limits := defaultLimitsTestConfig() + benchmarkIngesterPush(b, limits, false) +} + +func BenchmarkIngesterPushErrors(b *testing.B) { + limits := defaultLimitsTestConfig() + limits.MaxLocalSeriesPerMetric = 1 + benchmarkIngesterPush(b, limits, true) +} + +func benchmarkIngesterPush(b *testing.B, limits validation.Limits, errorsExpected bool) { cfg := defaultIngesterTestConfig() clientCfg := defaultClientTestConfig() - limits := defaultLimitsTestConfig() const ( series = 100 @@ -567,7 +577,9 @@ func BenchmarkIngesterPush(b *testing.B) { allSamples[i].TimestampMs = int64(j + 1) } _, err := ing.Push(ctx, client.ToWriteRequest(allLabels, allSamples, client.API)) - require.NoError(b, err) + if !errorsExpected { + require.NoError(b, err) + } } ing.Shutdown() } From bac85f86948fcc6c2539e6fa775fa9b4c41f2c94 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 18 Dec 2019 12:47:43 +0000 Subject: [PATCH 2/3] Refactor: move memorySeriesError to more generic validationError Signed-off-by: Bryan Boreham --- pkg/ingester/errors.go | 68 ++++++++++++++++++++++++++++++++++++++++++ pkg/ingester/series.go | 29 ++++-------------- 2 files changed, 74 insertions(+), 23 deletions(-) create mode 100644 pkg/ingester/errors.go diff --git a/pkg/ingester/errors.go b/pkg/ingester/errors.go new file mode 100644 index 00000000000..6cc00b978e6 --- /dev/null +++ b/pkg/ingester/errors.go @@ -0,0 +1,68 @@ +package ingester + +import ( + "fmt" + "net/http" + + "github.com/prometheus/prometheus/pkg/labels" + "github.com/weaveworks/common/httpgrpc" +) + +type validationError struct { + err error // underlying error + errorType string + code int + noReport bool // if true, error will be counted but not reported to caller + labels labels.Labels +} + +func makeLimitError(errorType string, err error) error { + return &validationError{ + errorType: errorType, + err: err, + code: http.StatusTooManyRequests, + } +} + +func makeNoReportError(errorType string) error { + return &validationError{ + errorType: errorType, + noReport: true, + } +} + +func makeMetricValidationError(errorType string, labels labels.Labels, err error) error { + return &validationError{ + errorType: errorType, + err: err, + code: http.StatusBadRequest, + labels: labels, + } +} + +func makeMetricLimitError(errorType string, labels labels.Labels, err error) error { + return &validationError{ + errorType: errorType, + err: err, + code: http.StatusTooManyRequests, + labels: labels, + } +} + +func (e *validationError) Error() string { + if e.err == nil { + return e.errorType + } + if e.labels == nil { + return e.err.Error() + } + return fmt.Sprintf("%s for series %s", e.err.Error(), e.labels.String()) +} + +// WrappedError returns a HTTP gRPC error than is correctly forwarded over gRPC. +func (e *validationError) WrappedError() error { + return httpgrpc.ErrorFromHTTPResponse(&httpgrpc.HTTPResponse{ + Code: int32(e.code), + Body: []byte(e.Error()), + }) +} diff --git a/pkg/ingester/series.go b/pkg/ingester/series.go index ddbc470fb02..7ce8f42f8bc 100644 --- a/pkg/ingester/series.go +++ b/pkg/ingester/series.go @@ -41,19 +41,6 @@ type memorySeries struct { lastSampleValue model.SampleValue } -type memorySeriesError struct { - message string - errorType string - noReport bool // if true, error will be counted but not reported to caller -} - -func (error *memorySeriesError) Error() string { - if error.message == "" { - return error.errorType - } - return error.message -} - // newMemorySeries returns a pointer to a newly allocated memorySeries for the // given metric. func newMemorySeries(m labels.Labels) *memorySeries { @@ -71,24 +58,20 @@ func (s *memorySeries) add(v model.SamplePair) error { // If we don't know what the last sample value is, silently discard. // This will mask some errors but better than complaining when we don't really know. if !s.lastSampleValueSet { - return &memorySeriesError{errorType: "duplicate-timestamp", noReport: true} + return makeNoReportError("duplicate-timestamp") } // If both timestamp and sample value are the same as for the last append, // ignore as they are a common occurrence when using client-side timestamps // (e.g. Pushgateway or federation). if v.Value.Equal(s.lastSampleValue) { - return &memorySeriesError{errorType: "duplicate-sample", noReport: true} - } - return &memorySeriesError{ - message: fmt.Sprintf("sample with repeated timestamp but different value for series %v; last value: %v, incoming value: %v", s.metric, s.lastSampleValue, v.Value), - errorType: "new-value-for-timestamp", + return makeNoReportError("duplicate-sample") } + return makeMetricValidationError("new-value-for-timestamp", s.metric, + fmt.Errorf("sample with repeated timestamp but different value; last value: %v, incoming value: %v", s.lastSampleValue, v.Value)) } if v.Timestamp < s.lastTime { - return &memorySeriesError{ - message: fmt.Sprintf("sample timestamp out of order for series %v; last timestamp: %v, incoming timestamp: %v", s.metric, s.lastTime, v.Timestamp), - errorType: "sample-out-of-order", - } + return makeMetricValidationError("sample-out-of-order", s.metric, + fmt.Errorf("sample timestamp out of order; last timestamp: %v, incoming timestamp: %v", s.lastTime, v.Timestamp)) } if len(s.chunkDescs) == 0 || s.headChunkClosed { From d4cd56c12e6243e5636eb5312323180c1eaa6f06 Mon Sep 17 00:00:00 2001 From: Bryan Boreham Date: Wed, 18 Dec 2019 12:48:43 +0000 Subject: [PATCH 3/3] Use validationError instead of httpgrpc error to reduce memory allocation It's more efficient to defer constructing the httpgrpc error until the end, because we only report the last error out of possibly hundreds. Signed-off-by: Bryan Boreham --- pkg/ingester/ingester.go | 27 ++++++++++++++------------- pkg/ingester/ingester_test.go | 8 ++++---- pkg/ingester/ingester_v2.go | 7 +++++-- pkg/ingester/user_state.go | 6 ++---- 4 files changed, 25 insertions(+), 23 deletions(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 2cbb5899d75..ba8f4597f46 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -293,7 +293,7 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. if err != nil { return nil, fmt.Errorf("no user id") } - var lastPartialErr error + var lastPartialErr *validationError for _, ts := range req.Timeseries { for _, s := range ts.Samples { @@ -303,12 +303,9 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. } i.metrics.ingestedSamplesFail.Inc() - if httpResp, ok := httpgrpc.HTTPResponseFromError(err); ok { - switch httpResp.Code { - case http.StatusBadRequest, http.StatusTooManyRequests: - lastPartialErr = err - continue - } + if ve, ok := err.(*validationError); ok { + lastPartialErr = ve + continue } return nil, err @@ -316,7 +313,10 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. } client.ReuseSlice(req.Timeseries) - return &client.WriteResponse{}, lastPartialErr + if lastPartialErr != nil { + return &client.WriteResponse{}, lastPartialErr.WrappedError() + } + return &client.WriteResponse{}, nil } func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, timestamp model.Time, value model.SampleValue, source client.WriteRequest_SourceEnum) error { @@ -338,6 +338,9 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, } state, fp, series, err := i.userStates.getOrCreateSeries(ctx, userID, labels) if err != nil { + if ve, ok := err.(*validationError); ok { + state.discardedSamples.WithLabelValues(ve.errorType).Inc() + } state = nil // don't want to unlock the fp if there is an error return err } @@ -357,13 +360,11 @@ func (i *Ingester) append(ctx context.Context, userID string, labels labelPairs, Value: value, Timestamp: timestamp, }); err != nil { - if mse, ok := err.(*memorySeriesError); ok { - state.discardedSamples.WithLabelValues(mse.errorType).Inc() - if mse.noReport { + if ve, ok := err.(*validationError); ok { + state.discardedSamples.WithLabelValues(ve.errorType).Inc() + if ve.noReport { return nil } - // Use a dumb string template to avoid the message being parsed as a template - err = httpgrpc.Errorf(http.StatusBadRequest, "%s", mse.message) } return err } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index bdf2c0f99b9..f9574405bd8 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -317,16 +317,16 @@ func TestIngesterAppendOutOfOrderAndDuplicate(t *testing.T) { // Earlier sample than previous one. err = ing.append(ctx, userID, m, 0, 0, client.API) require.Contains(t, err.Error(), "sample timestamp out of order") - errResp, ok := httpgrpc.HTTPResponseFromError(err) + errResp, ok := err.(*validationError) require.True(t, ok) - require.Equal(t, errResp.Code, int32(400)) + require.Equal(t, errResp.code, 400) // Same timestamp as previous sample, but different value. err = ing.append(ctx, userID, m, 1, 1, client.API) require.Contains(t, err.Error(), "sample with repeated timestamp but different value") - errResp, ok = httpgrpc.HTTPResponseFromError(err) + errResp, ok = err.(*validationError) require.True(t, ok) - require.Equal(t, errResp.Code, int32(400)) + require.Equal(t, errResp.code, 400) } // Test that blank labels are removed by the ingester diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index 5eebd7d0cca..b86dee3d13d 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -111,7 +111,7 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien // 400 error to the client. The client (Prometheus) will not retry on 400, and // we actually ingested all samples which haven't failed. if err == tsdb.ErrOutOfBounds || err == tsdb.ErrOutOfOrderSample || err == tsdb.ErrAmendSample { - lastPartialErr = httpgrpc.Errorf(http.StatusBadRequest, err.Error()) + lastPartialErr = err continue } @@ -135,7 +135,10 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien client.ReuseSlice(req.Timeseries) - return &client.WriteResponse{}, lastPartialErr + if lastPartialErr != nil { + return &client.WriteResponse{}, httpgrpc.Errorf(http.StatusBadRequest, lastPartialErr.Error()) + } + return &client.WriteResponse{}, nil } func (i *Ingester) v2Query(ctx old_ctx.Context, req *client.QueryRequest) (*client.QueryResponse, error) { diff --git a/pkg/ingester/user_state.go b/pkg/ingester/user_state.go index e27d68411fd..0999564ac5a 100644 --- a/pkg/ingester/user_state.go +++ b/pkg/ingester/user_state.go @@ -195,8 +195,7 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri err := u.limiter.AssertMaxSeriesPerUser(u.userID, u.fpToSeries.length()) if err != nil { u.fpLocker.Unlock(fp) - u.discardedSamples.WithLabelValues(perUserSeriesLimit).Inc() - return fp, nil, httpgrpc.Errorf(http.StatusTooManyRequests, err.Error()) + return fp, nil, makeLimitError(perUserSeriesLimit, err) } metricName, err := extract.MetricNameFromLabelAdapters(metric) @@ -209,8 +208,7 @@ func (u *userState) getSeries(metric labelPairs) (model.Fingerprint, *memorySeri err = u.canAddSeriesFor(string(metricName)) if err != nil { u.fpLocker.Unlock(fp) - u.discardedSamples.WithLabelValues(perMetricSeriesLimit).Inc() - return fp, nil, httpgrpc.Errorf(http.StatusTooManyRequests, "%s for: %s", err.Error(), metric) + return fp, nil, makeMetricLimitError(perMetricSeriesLimit, client.FromLabelAdaptersToLabels(metric), err) } u.memSeriesCreatedTotal.Inc()