Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func TestPush_QuorumError(t *testing.T) {
_, err := d.Push(ctx, request)
status, ok := status.FromError(err)
require.True(t, ok)
require.True(t, status.Code() == 429 || status.Code() == 500)
require.Equal(t, codes.Code(429), status.Code())
}

// Simulating 1 error -> Should return 2xx
Expand Down
18 changes: 14 additions & 4 deletions pkg/ring/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,29 @@ type itemTracker struct {
failed4xx atomic.Int32
failed5xx atomic.Int32
remaining atomic.Int32
err atomic.Error
err4xx atomic.Error
err5xx atomic.Error
}

func (i *itemTracker) recordError(err error) int32 {
i.err.Store(err)

if status, ok := status.FromError(err); ok && status.Code()/100 == 4 {
i.err4xx.Store(err)
return i.failed4xx.Inc()
}

i.err5xx.Store(err)
return i.failed5xx.Inc()
}

func (i *itemTracker) getError() error {
if i.failed5xx.Load() > i.failed4xx.Load() {
return i.err5xx.Load()
}

return i.err4xx.Load()
}

// DoBatch request against a set of keys in the ring, handling replication and
// failures. For example if we want to write N items where they may all
// hit different instances, and we want them all replicated R ways with
Expand Down Expand Up @@ -147,7 +157,7 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) {
// Ex: 5xx, _, 5xx -> return 5xx
if errCount > int32(sampleTrackers[i].maxFailures) || sampleTrackers[i].remaining.Dec() == 0 {
if b.rpcsFailed.Inc() == 1 {
b.err <- err
b.err <- sampleTrackers[i].getError()
}
}
} else {
Expand All @@ -165,7 +175,7 @@ func (b *batchTracker) record(sampleTrackers []*itemTracker, err error) {
// Ex: 4xx, 5xx, 2xx
if sampleTrackers[i].remaining.Dec() == 0 {
if b.rpcsFailed.Inc() == 1 {
b.err <- sampleTrackers[i].err.Load()
b.err <- sampleTrackers[i].getError()
}
}
}
Expand Down