Skip to content

Add metric and enhanced logging for query partial data #6676

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [ENHANCEMENT] Query Frontend: Add a `-frontend.enabled-ruler-query-stats` flag to configure whether to report the query stats log for queries coming from the Ruler. #6504
* [ENHANCEMENT] OTLP: Support otlp metadata ingestion. #6617
* [ENHANCEMENT] AlertManager: Add `keep_instance_in_the_ring_on_shutdown` and `tokens_file_path` configs for alertmanager ring. #6628
* [ENHANCEMENT] Querier: Add metric and enhanced logging for query partial data. #6676
* [BUGFIX] Ingester: Avoid error or early throttling when READONLY ingesters are present in the ring #6517
* [BUGFIX] Ingester: Fix labelset data race condition. #6573
* [BUGFIX] Compactor: Cleaner should not put deletion marker for blocks with no-compact marker. #6576
Expand Down
6 changes: 6 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type Distributor struct {
ingesterAppendFailures *prometheus.CounterVec
ingesterQueries *prometheus.CounterVec
ingesterQueryFailures *prometheus.CounterVec
ingesterPartialDataQueries prometheus.Counter
replicationFactor prometheus.Gauge
latestSeenSampleTimestampPerUser *prometheus.GaugeVec

Expand Down Expand Up @@ -375,6 +376,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Name: "distributor_ingester_query_failures_total",
Help: "The total number of failed queries sent to ingesters.",
}, []string{"ingester"}),
ingesterPartialDataQueries: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_ingester_partial_data_queries_total",
Help: "The total number of queries sent to ingesters that may have returned partial data.",
}),
replicationFactor: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "distributor_replication_factor",
Expand Down
3 changes: 2 additions & 1 deletion pkg/distributor/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri
reqStats.AddFetchedSamples(uint64(resp.SamplesCount()))

if partialdata.IsPartialDataError(err) {
level.Info(d.log).Log("msg", "returning partial data")
level.Warn(d.log).Log("msg", "returning partial data", "err", err.Error())
d.ingesterPartialDataQueries.Inc()
return resp, err
}

Expand Down
26 changes: 10 additions & 16 deletions pkg/ring/replication_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ring

import (
"context"
"fmt"
"sort"
"time"

Expand Down Expand Up @@ -70,39 +71,32 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResults
}(i, &r.Instances[i])
}

trackerFailed := false
cnt := 0

track:
for !tracker.succeeded() {
for !tracker.succeeded() && !tracker.finished() {
select {
case res := <-ch:
tracker.done(res.instance, res.res, res.err)
if res.err != nil {
if tracker.failed() {
if !partialDataEnabled || tracker.failedInAllZones() {
return nil, res.err
}
trackerFailed = true
if tracker.failed() && (!partialDataEnabled || tracker.failedCompletely()) {
return nil, res.err
}

// force one of the delayed requests to start
if delay > 0 && r.MaxUnavailableZones == 0 {
forceStart <- struct{}{}
}
}
cnt++
if cnt == len(r.Instances) {
break track
}

case <-ctx.Done():
return nil, ctx.Err()
}
}

if partialDataEnabled && trackerFailed {
return tracker.getResults(), partialdata.ErrPartialData
if partialDataEnabled && tracker.failed() {
finalErr := partialdata.ErrPartialData
for _, partialErr := range tracker.getErrors() {
finalErr = fmt.Errorf("%w: %w", finalErr, partialErr)
}
return tracker.getResults(), finalErr
}

return tracker.getResults(), nil
Expand Down
9 changes: 7 additions & 2 deletions pkg/ring/replication_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ func TestReplicationSet_Do(t *testing.T) {
expectedError error
zoneResultsQuorum bool
queryPartialData bool
errStrContains []string
}{
{
name: "max errors = 0, no errors no delay",
Expand Down Expand Up @@ -196,12 +197,13 @@ func TestReplicationSet_Do(t *testing.T) {
},
{
name: "with partial data enabled and max unavailable zones = 1, should succeed on instances failing in 2 out of 3 zones (3 instances)",
instances: []InstanceDesc{{Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone3"}},
instances: []InstanceDesc{{Addr: "10.0.0.1", Zone: "zone1"}, {Addr: "10.0.0.2", Zone: "zone2"}, {Addr: "10.0.0.3", Zone: "zone3"}},
f: failingFunctionOnZones("zone1", "zone2"),
maxUnavailableZones: 1,
queryPartialData: true,
want: []interface{}{1},
expectedError: partialdata.ErrPartialData,
errStrContains: []string{"10.0.0.1", "10.0.0.2", "zone failed"},
},
{
name: "with partial data enabled, should fail on instances failing in all zones",
Expand Down Expand Up @@ -264,7 +266,10 @@ func TestReplicationSet_Do(t *testing.T) {
}
got, err := r.Do(ctx, tt.delay, tt.zoneResultsQuorum, tt.queryPartialData, tt.f)
if tt.expectedError != nil {
assert.Equal(t, tt.expectedError, err)
assert.ErrorIs(t, err, tt.expectedError)
for _, str := range tt.errStrContains {
assert.ErrorContains(t, err, str)
}
} else {
assert.NoError(t, err)
}
Expand Down
46 changes: 40 additions & 6 deletions pkg/ring/replication_set_tracker.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
package ring

import "fmt"

type replicationSetResultTracker interface {
// Signals an instance has done the execution, either successful (no error)
// or failed (with error). If successful, result will be recorded and can
// be accessed via getResults.
done(instance *InstanceDesc, result interface{}, err error)

// Returns true if all instances are done executing
finished() bool

// Returns true if the minimum number of successful results have been received.
succeeded() bool

// Returns true if the maximum number of failed executions have been reached.
failed() bool

// Returns true if executions failed in all zones. Only relevant for zoneAwareResultTracker.
failedInAllZones() bool
// Returns true if executions failed in all instances or all zones.
failedCompletely() bool

// Returns recorded results.
getResults() []interface{}

// Returns errors
getErrors() []error
}

type defaultResultTracker struct {
Expand All @@ -25,6 +33,8 @@ type defaultResultTracker struct {
numErrors int
maxErrors int
results []interface{}
numInstances int
errors []error
}

func newDefaultResultTracker(instances []InstanceDesc, maxErrors int) *defaultResultTracker {
Expand All @@ -33,19 +43,26 @@ func newDefaultResultTracker(instances []InstanceDesc, maxErrors int) *defaultRe
numSucceeded: 0,
numErrors: 0,
maxErrors: maxErrors,
errors: make([]error, 0, len(instances)),
results: make([]interface{}, 0, len(instances)),
numInstances: len(instances),
}
}

func (t *defaultResultTracker) done(_ *InstanceDesc, result interface{}, err error) {
func (t *defaultResultTracker) done(instance *InstanceDesc, result interface{}, err error) {
if err == nil {
t.numSucceeded++
t.results = append(t.results, result)
} else {
t.errors = append(t.errors, fmt.Errorf("(%s) %w", instance.GetAddr(), err))
t.numErrors++
}
}

func (t *defaultResultTracker) finished() bool {
return t.numSucceeded+t.numErrors == t.numInstances
}

func (t *defaultResultTracker) succeeded() bool {
return t.numSucceeded >= t.minSucceeded
}
Expand All @@ -54,14 +71,18 @@ func (t *defaultResultTracker) failed() bool {
return t.numErrors > t.maxErrors
}

func (t *defaultResultTracker) failedInAllZones() bool {
return false
func (t *defaultResultTracker) failedCompletely() bool {
return t.numInstances == t.numErrors
}

func (t *defaultResultTracker) getResults() []interface{} {
return t.results
}

func (t *defaultResultTracker) getErrors() []error {
return t.errors
}

// zoneAwareResultTracker tracks the results per zone.
// All instances in a zone must succeed in order for the zone to succeed.
type zoneAwareResultTracker struct {
Expand All @@ -73,6 +94,8 @@ type zoneAwareResultTracker struct {
numInstances int
zoneResultsQuorum bool
zoneCount int
doneCount int
errors []error
}

func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int, zoneResultsQuorum bool) *zoneAwareResultTracker {
Expand All @@ -82,6 +105,7 @@ func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int
maxUnavailableZones: maxUnavailableZones,
numInstances: len(instances),
zoneResultsQuorum: zoneResultsQuorum,
errors: make([]error, 0, len(instances)),
}

for _, instance := range instances {
Expand All @@ -97,6 +121,7 @@ func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int
func (t *zoneAwareResultTracker) done(instance *InstanceDesc, result interface{}, err error) {
if err != nil {
t.failuresByZone[instance.Zone]++
t.errors = append(t.errors, fmt.Errorf("(%s) %w", instance.GetAddr(), err))
} else {
if _, ok := t.resultsPerZone[instance.Zone]; !ok {
// If it is the first result in the zone, then total number of instances
Expand All @@ -107,6 +132,11 @@ func (t *zoneAwareResultTracker) done(instance *InstanceDesc, result interface{}
}

t.waitingByZone[instance.Zone]--
t.doneCount++
}

func (t *zoneAwareResultTracker) finished() bool {
return t.doneCount == t.numInstances
}

func (t *zoneAwareResultTracker) succeeded() bool {
Expand All @@ -128,7 +158,7 @@ func (t *zoneAwareResultTracker) failed() bool {
return failedZones > t.maxUnavailableZones
}

func (t *zoneAwareResultTracker) failedInAllZones() bool {
func (t *zoneAwareResultTracker) failedCompletely() bool {
failedZones := len(t.failuresByZone)
return failedZones == t.zoneCount
}
Expand All @@ -150,3 +180,7 @@ func (t *zoneAwareResultTracker) getResults() []interface{} {
}
return results
}

func (t *zoneAwareResultTracker) getErrors() []error {
return t.errors
}
77 changes: 73 additions & 4 deletions pkg/ring/replication_set_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ring

import (
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -154,6 +155,50 @@ func TestDefaultResultTracker(t *testing.T) {
assert.Equal(t, []interface{}{[]int{1, 1, 1}, []int{2, 2, 2}, []int{3, 3, 3}}, tracker.getResults())
},
},
"failedCompletely() should return true only if all instances have failed, regardless of max errors": {
instances: []InstanceDesc{instance1, instance2, instance3},
maxErrors: 1,
run: func(t *testing.T, tracker *defaultResultTracker) {
tracker.done(&instance1, nil, errors.New("test"))
assert.False(t, tracker.succeeded())
assert.False(t, tracker.failed())
assert.False(t, tracker.failedCompletely())

tracker.done(&instance2, nil, errors.New("test"))
assert.False(t, tracker.succeeded())
assert.True(t, tracker.failed())
assert.False(t, tracker.failedCompletely())

tracker.done(&instance3, nil, errors.New("test"))
assert.False(t, tracker.succeeded())
assert.True(t, tracker.failed())
assert.True(t, tracker.failedCompletely())
},
},
"finished() should return true only if all instances are done": {
instances: []InstanceDesc{instance1, instance2},
maxErrors: 1,
run: func(t *testing.T, tracker *defaultResultTracker) {
tracker.done(&instance1, nil, errors.New("test"))
assert.False(t, tracker.finished())

tracker.done(&instance2, nil, errors.New("test"))
assert.True(t, tracker.finished())
},
},
"getErrors() should return list of all errors": {
instances: []InstanceDesc{instance1, instance2},
maxErrors: 1,
run: func(t *testing.T, tracker *defaultResultTracker) {
tracker.done(&instance1, nil, errors.New("test1"))
err1 := fmt.Errorf("(%s) %w", instance1.GetAddr(), errors.New("test1"))
assert.ElementsMatch(t, []error{err1}, tracker.getErrors())

tracker.done(&instance2, nil, errors.New("test2"))
err2 := fmt.Errorf("(%s) %w", instance2.GetAddr(), errors.New("test2"))
assert.ElementsMatch(t, []error{err1, err2}, tracker.getErrors())
},
},
}

for testName, testCase := range tests {
Expand Down Expand Up @@ -399,27 +444,51 @@ func TestZoneAwareResultTracker(t *testing.T) {
assert.False(t, tracker.failed())
},
},
"failInAllZones should return true only if all zones have failed, regardless of max unavailable zones": {
"failedCompletely() should return true only if all zones have failed, regardless of max unavailable zones": {
instances: []InstanceDesc{instance1, instance2, instance3, instance4, instance5, instance6},
maxUnavailableZones: 1,
run: func(t *testing.T, tracker *zoneAwareResultTracker) {
// Zone-a
tracker.done(&instance1, nil, errors.New("test"))
assert.False(t, tracker.succeeded())
assert.False(t, tracker.failed())
assert.False(t, tracker.failedInAllZones())
assert.False(t, tracker.failedCompletely())

// Zone-b
tracker.done(&instance3, nil, errors.New("test"))
assert.False(t, tracker.succeeded())
assert.True(t, tracker.failed())
assert.False(t, tracker.failedInAllZones())
assert.False(t, tracker.failedCompletely())

// Zone-c
tracker.done(&instance5, nil, errors.New("test"))
assert.False(t, tracker.succeeded())
assert.True(t, tracker.failed())
assert.True(t, tracker.failedInAllZones())
assert.True(t, tracker.failedCompletely())
},
},
"finished() should return true only if all instances are done": {
instances: []InstanceDesc{instance1, instance2},
maxUnavailableZones: 1,
run: func(t *testing.T, tracker *zoneAwareResultTracker) {
tracker.done(&instance1, nil, errors.New("test"))
assert.False(t, tracker.finished())

tracker.done(&instance2, nil, errors.New("test"))
assert.True(t, tracker.finished())
},
},
"getErrors() should return list of all errors": {
instances: []InstanceDesc{instance1, instance2},
maxUnavailableZones: 1,
run: func(t *testing.T, tracker *zoneAwareResultTracker) {
tracker.done(&instance1, nil, errors.New("test1"))
err1 := fmt.Errorf("(%s) %w", instance1.GetAddr(), errors.New("test1"))
assert.ElementsMatch(t, []error{err1}, tracker.getErrors())

tracker.done(&instance2, nil, errors.New("test2"))
err2 := fmt.Errorf("(%s) %w", instance2.GetAddr(), errors.New("test2"))
assert.ElementsMatch(t, []error{err1, err2}, tracker.getErrors())
},
},
}
Expand Down
Loading