Skip to content

Commit 0068e73

Browse files
committed
address comment; handle histogram partial append errors
Signed-off-by: Ben Ye <[email protected]>
1 parent ca0ba27 commit 0068e73

File tree

1 file changed

+61
-83
lines changed

1 file changed

+61
-83
lines changed

pkg/ingester/ingester.go

Lines changed: 61 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,6 +1082,65 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
10821082
firstPartialErr = errFn()
10831083
}
10841084
}
1085+
1086+
handleAppendFailure = func(err error, timestampMs int64, lbls []cortexpb.LabelAdapter, copiedLabels labels.Labels) (rollback bool) {
1087+
// Check if the error is a soft error we can proceed on. If so, we keep track
1088+
// of it, so that we can return it back to the distributor, which will return a
1089+
// 400 error to the client. The client (Prometheus) will not retry on 400, and
1090+
// we actually ingested all samples which haven't failed.
1091+
switch cause := errors.Cause(err); {
1092+
case errors.Is(cause, storage.ErrOutOfBounds):
1093+
sampleOutOfBoundsCount++
1094+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
1095+
1096+
case errors.Is(cause, storage.ErrOutOfOrderSample):
1097+
sampleOutOfOrderCount++
1098+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
1099+
1100+
case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp):
1101+
newValueForTimestampCount++
1102+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
1103+
1104+
case errors.Is(cause, storage.ErrTooOldSample):
1105+
sampleTooOldCount++
1106+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
1107+
1108+
case errors.Is(cause, errMaxSeriesPerUserLimitExceeded):
1109+
perUserSeriesLimitCount++
1110+
updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause)) })
1111+
1112+
case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
1113+
perMetricSeriesLimitCount++
1114+
updateFirstPartial(func() error {
1115+
return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
1116+
})
1117+
1118+
case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}):
1119+
perLabelSetSeriesLimitCount++
1120+
updateFirstPartial(func() error {
1121+
return makeMetricLimitError(perLabelsetSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
1122+
})
1123+
1124+
case errors.Is(cause, histogram.ErrHistogramSpanNegativeOffset):
1125+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
1126+
1127+
case errors.Is(cause, histogram.ErrHistogramSpansBucketsMismatch):
1128+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
1129+
1130+
case errors.Is(cause, histogram.ErrHistogramNegativeBucketCount):
1131+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
1132+
1133+
case errors.Is(cause, histogram.ErrHistogramCountNotBigEnough):
1134+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
1135+
1136+
case errors.Is(cause, histogram.ErrHistogramCountMismatch):
1137+
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(timestampMs), lbls) })
1138+
1139+
default:
1140+
rollback = true
1141+
}
1142+
return
1143+
}
10851144
)
10861145

10871146
// Walk the samples, appending them to the users database
@@ -1121,50 +1180,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
11211180

11221181
failedSamplesCount++
11231182

1124-
// Check if the error is a soft error we can proceed on. If so, we keep track
1125-
// of it, so that we can return it back to the distributor, which will return a
1126-
// 400 error to the client. The client (Prometheus) will not retry on 400, and
1127-
// we actually ingested all samples which haven't failed.
1128-
switch cause := errors.Cause(err); {
1129-
case errors.Is(cause, storage.ErrOutOfBounds):
1130-
sampleOutOfBoundsCount++
1131-
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
1132-
continue
1133-
1134-
case errors.Is(cause, storage.ErrOutOfOrderSample):
1135-
sampleOutOfOrderCount++
1136-
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
1137-
continue
1138-
1139-
case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp):
1140-
newValueForTimestampCount++
1141-
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
1142-
continue
1143-
1144-
case errors.Is(cause, storage.ErrTooOldSample):
1145-
sampleTooOldCount++
1146-
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(s.TimestampMs), ts.Labels) })
1147-
continue
1148-
1149-
case errors.Is(cause, errMaxSeriesPerUserLimitExceeded):
1150-
perUserSeriesLimitCount++
1151-
updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause)) })
1152-
continue
1153-
1154-
case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
1155-
perMetricSeriesLimitCount++
1156-
updateFirstPartial(func() error {
1157-
return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
1158-
})
1159-
continue
1160-
case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}):
1161-
perLabelSetSeriesLimitCount++
1162-
updateFirstPartial(func() error {
1163-
return makeMetricLimitError(perLabelsetSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
1164-
})
1183+
if rollback := handleAppendFailure(err, s.TimestampMs, ts.Labels, copiedLabels); !rollback {
11651184
continue
11661185
}
1167-
11681186
// The error looks an issue on our side, so we should rollback
11691187
if rollbackErr := app.Rollback(); rollbackErr != nil {
11701188
level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to rollback on error", "user", userID, "err", rollbackErr)
@@ -1203,49 +1221,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
12031221

12041222
failedSamplesCount++
12051223

1206-
// Check if the error is a soft error we can proceed on. If so, we keep track
1207-
// of it, so that we can return it back to the distributor, which will return a
1208-
// 400 error to the client. The client (Prometheus) will not retry on 400, and
1209-
// we actually ingested all samples which haven't failed.
1210-
switch cause := errors.Cause(err); {
1211-
case errors.Is(cause, storage.ErrOutOfBounds):
1212-
sampleOutOfBoundsCount++
1213-
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(hp.TimestampMs), ts.Labels) })
1214-
continue
1215-
1216-
case errors.Is(cause, storage.ErrOutOfOrderSample):
1217-
sampleOutOfOrderCount++
1218-
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(hp.TimestampMs), ts.Labels) })
1219-
continue
1220-
1221-
case errors.Is(cause, storage.ErrDuplicateSampleForTimestamp):
1222-
newValueForTimestampCount++
1223-
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(hp.TimestampMs), ts.Labels) })
1224-
continue
1225-
1226-
case errors.Is(cause, storage.ErrTooOldSample):
1227-
sampleTooOldCount++
1228-
updateFirstPartial(func() error { return wrappedTSDBIngestErr(err, model.Time(hp.TimestampMs), ts.Labels) })
1229-
continue
1230-
1231-
case errors.Is(cause, errMaxSeriesPerUserLimitExceeded):
1232-
perUserSeriesLimitCount++
1233-
updateFirstPartial(func() error { return makeLimitError(perUserSeriesLimit, i.limiter.FormatError(userID, cause)) })
1234-
continue
1235-
1236-
case errors.Is(cause, errMaxSeriesPerMetricLimitExceeded):
1237-
perMetricSeriesLimitCount++
1238-
updateFirstPartial(func() error {
1239-
return makeMetricLimitError(perMetricSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
1240-
})
1241-
continue
1242-
case errors.As(cause, &errMaxSeriesPerLabelSetLimitExceeded{}):
1243-
updateFirstPartial(func() error {
1244-
return makeMetricLimitError(perLabelsetSeriesLimit, copiedLabels, i.limiter.FormatError(userID, cause))
1245-
})
1224+
if rollback := handleAppendFailure(err, hp.TimestampMs, ts.Labels, copiedLabels); !rollback {
12461225
continue
12471226
}
1248-
12491227
// The error looks an issue on our side, so we should rollback
12501228
if rollbackErr := app.Rollback(); rollbackErr != nil {
12511229
level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "failed to rollback on error", "user", userID, "err", rollbackErr)

0 commit comments

Comments
 (0)