Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
* [ENHANCEMENT] Distributor: Add native histograms max sample size bytes limit validation. #6834
* [ENHANCEMENT] Querier: Support caching parquet labels file in parquet queryable. #6835
* [ENHANCEMENT] Querier: Support query limits in parquet queryable. #6870
* [ENHANCEMENT] Ingester: Add new metric `cortex_ingester_push_errors_total` to track reasons for ingester request failures. #6901
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
11 changes: 6 additions & 5 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,11 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
span, ctx := opentracing.StartSpanFromContext(ctx, "Ingester.Push")
defer span.Finish()

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

// We will report *this* request in the error too.
inflight := i.inflightPushRequests.Inc()
i.maxInflightPushRequests.Track(inflight)
Expand All @@ -1175,6 +1180,7 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
gl := i.getInstanceLimits()
if gl != nil && gl.MaxInflightPushRequests > 0 {
if inflight > gl.MaxInflightPushRequests {
i.metrics.pushErrorsTotal.WithLabelValues(userID, pushErrTooManyInflightRequests).Inc()
return nil, errTooManyInflightPushRequests
}
}
Expand All @@ -1186,11 +1192,6 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
defer req.Free()
defer cortexpb.ReuseSlice(req.Timeseries)

userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

il := i.getInstanceLimits()
if il != nil && il.MaxIngestionRate > 0 {
if rate := i.ingestionRate.Rate(); rate >= il.MaxIngestionRate {
Expand Down
8 changes: 7 additions & 1 deletion pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6515,7 +6515,8 @@ func TestIngester_inflightPushRequests(t *testing.T) {
cfg.InstanceLimitsFn = func() *InstanceLimits { return &limits }
cfg.LifecyclerConfig.JoinAfter = 0

i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
reg := prometheus.NewRegistry()
i, err := prepareIngesterWithBlocksStorage(t, cfg, reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand Down Expand Up @@ -6553,6 +6554,11 @@ func TestIngester_inflightPushRequests(t *testing.T) {

_, err := i.Push(ctx, req)
require.Equal(t, errTooManyInflightPushRequests, err)
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_ingester_push_errors_total The total number of push errors per user.
# TYPE cortex_ingester_push_errors_total counter
cortex_ingester_push_errors_total{reason="tooManyInflightRequests",user="test"} 1
`), "cortex_ingester_push_errors_total"))
return nil
})

Expand Down
2 changes: 2 additions & 0 deletions pkg/ingester/instance_limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ var (
errMaxSeriesLimitReached = errors.New("cannot add series: ingesters's max series limit reached")
errTooManyInflightPushRequests = errors.New("cannot push: too many inflight push requests in ingester")
errTooManyInflightQueryRequests = errors.New("cannot push: too many inflight query requests in ingester")

pushErrTooManyInflightRequests = "tooManyInflightRequests"
)

// InstanceLimits describes limits used by ingester. Reaching any of these will result in error response to the call.
Expand Down
6 changes: 6 additions & 0 deletions pkg/ingester/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type ingesterMetrics struct {
memMetadataCreatedTotal *prometheus.CounterVec
memSeriesRemovedTotal *prometheus.CounterVec
memMetadataRemovedTotal *prometheus.CounterVec
pushErrorsTotal *prometheus.CounterVec

activeSeriesPerUser *prometheus.GaugeVec
activeNHSeriesPerUser *prometheus.GaugeVec
Expand Down Expand Up @@ -165,6 +166,10 @@ func newIngesterMetrics(r prometheus.Registerer,
Name: "cortex_ingester_memory_metadata_removed_total",
Help: "The total number of metadata that were removed per user.",
}, []string{"user"}),
pushErrorsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ingester_push_errors_total",
Help: "The total number of push errors per user.",
}, []string{"user", "reason"}),

maxUsersGauge: promauto.With(r).NewGaugeFunc(prometheus.GaugeOpts{
Name: instanceLimits,
Expand Down Expand Up @@ -295,6 +300,7 @@ func (m *ingesterMetrics) deletePerUserMetrics(userID string) {
m.activeNHSeriesPerUser.DeleteLabelValues(userID)
m.usagePerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID})
m.limitsPerLabelSet.DeletePartialMatch(prometheus.Labels{"user": userID})
m.pushErrorsTotal.DeletePartialMatch(prometheus.Labels{"user": userID})

if m.memSeriesCreatedTotal != nil {
m.memSeriesCreatedTotal.DeleteLabelValues(userID)
Expand Down
Loading