From ba3a0f1b1f3cb083e10b6a7459d2707ceef41e42 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 20 Feb 2024 21:19:46 -0800 Subject: [PATCH 1/8] add zone-results-quorum for metadata APIs Signed-off-by: Ben Ye --- pkg/distributor/distributor.go | 26 +++++++++++++++---------- pkg/distributor/query.go | 6 +++--- pkg/ring/replication_set.go | 35 ++++++++++++++++++++++++++++------ 3 files changed, 48 insertions(+), 19 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 4b4eea0bf9..a8083fbfd4 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -145,6 +145,11 @@ type Config struct { // This config is dynamically injected because defined in the querier config. ShuffleShardingLookbackPeriod time.Duration `yaml:"-"` + // ZoneResultsQuorumMetadata enables zone results quorum when querying ingester replication set + // with metadata APIs (labels names, values and series). When zone awareness is enabled, only results + // from quorum number of zones will be included to reduce data merged and improve performance. + ZoneResultsQuorumMetadata bool `yaml:"zone_results_quorum_metadata"` + // Limits for distributor InstanceLimits InstanceLimits `yaml:"instance_limits"` } @@ -167,6 +172,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.SignWriteRequestsEnabled, "distributor.sign-write-requests", false, "EXPERIMENTAL: If enabled, sign the write request between distributors and ingesters.") f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", "))) f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.") + f.BoolVar(&cfg.ZoneResultsQuorumMetadata, "distributor.zone-results-quorum-metadata", false, "If zone awareness and this both enabled, when querying metadata APIs (labels names, values and series), only results from quorum number of zones will be included.") f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.") f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.") @@ -924,8 +930,8 @@ func getErrorStatus(err error) string { } // ForReplicationSet runs f, in parallel, for all ingesters in the input replication set. -func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error) { - return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { +func (d *Distributor) ForReplicationSet(ctx context.Context, replicationSet ring.ReplicationSet, zoneResultsQuorum bool, f func(context.Context, ingester_client.IngesterClient) (interface{}, error)) ([]interface{}, error) { + return replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, zoneResultsQuorum, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { client, err := d.ingesterPool.GetClientFor(ing.Addr) if err != nil { return nil, err @@ -981,7 +987,7 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t // LabelValuesForLabelName returns all the label values that are associated with a given label name. func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) { return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) { - return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { + return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { resp, err := client.LabelValues(ctx, req) if err != nil { return nil, err @@ -994,7 +1000,7 @@ func (d *Distributor) LabelValuesForLabelName(ctx context.Context, from, to mode // LabelValuesForLabelNameStream returns all the label values that are associated with a given label name. func (d *Distributor) LabelValuesForLabelNameStream(ctx context.Context, from, to model.Time, labelName model.LabelName, matchers ...*labels.Matcher) ([]string, error) { return d.LabelValuesForLabelNameCommon(ctx, from, to, labelName, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelValuesRequest) ([]interface{}, error) { - return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { + return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { stream, err := client.LabelValuesStream(ctx, req) if err != nil { return nil, err @@ -1059,7 +1065,7 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time) ([]string, error) { return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) { - return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { + return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { stream, err := client.LabelNamesStream(ctx, req) if err != nil { return nil, err @@ -1085,7 +1091,7 @@ func (d *Distributor) LabelNamesStream(ctx context.Context, from, to model.Time) // LabelNames returns all the label names. func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]string, error) { return d.LabelNamesCommon(ctx, from, to, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.LabelNamesRequest) ([]interface{}, error) { - return d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { + return d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { resp, err := client.LabelNames(ctx, req) if err != nil { return nil, err @@ -1098,7 +1104,7 @@ func (d *Distributor) LabelNames(ctx context.Context, from, to model.Time) ([]st // MetricsForLabelMatchers gets the metrics that match said matchers func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) { return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error { - _, err := d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { + _, err := d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { resp, err := client.MetricsForLabelMatchers(ctx, req) if err != nil { return nil, err @@ -1127,7 +1133,7 @@ func (d *Distributor) MetricsForLabelMatchers(ctx context.Context, from, through func (d *Distributor) MetricsForLabelMatchersStream(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]metric.Metric, error) { return d.metricsForLabelMatchersCommon(ctx, from, through, func(ctx context.Context, rs ring.ReplicationSet, req *ingester_client.MetricsForLabelMatchersRequest, metrics *map[model.Fingerprint]model.Metric, mutex *sync.Mutex, queryLimiter *limiter.QueryLimiter) error { - _, err := d.ForReplicationSet(ctx, rs, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { + _, err := d.ForReplicationSet(ctx, rs, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { stream, err := client.MetricsForLabelMatchersStream(ctx, req) if err != nil { return nil, err @@ -1205,7 +1211,7 @@ func (d *Distributor) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetad req := &ingester_client.MetricsMetadataRequest{} // TODO(gotjosh): We only need to look in all the ingesters if shardByAllLabels is enabled. - resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { + resps, err := d.ForReplicationSet(ctx, replicationSet, d.cfg.ZoneResultsQuorumMetadata, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { return client.MetricsMetadata(ctx, req) }) if err != nil { @@ -1247,7 +1253,7 @@ func (d *Distributor) UserStats(ctx context.Context) (*UserStats, error) { replicationSet.MaxErrors = 0 req := &ingester_client.UserStatsRequest{} - resps, err := d.ForReplicationSet(ctx, replicationSet, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { + resps, err := d.ForReplicationSet(ctx, replicationSet, false, func(ctx context.Context, client ingester_client.IngesterClient) (interface{}, error) { return client.UserStats(ctx, req) }) if err != nil { diff --git a/pkg/distributor/query.go b/pkg/distributor/query.go index 8013e16a84..3048b1c471 100644 --- a/pkg/distributor/query.go +++ b/pkg/distributor/query.go @@ -161,7 +161,7 @@ func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.Replica func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (model.Matrix, error) { // Fetch samples from multiple ingesters in parallel, using the replicationSet // to deal with consistency. - results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { + results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { client, err := d.ingesterPool.GetClientFor(ing.Addr) if err != nil { return nil, err @@ -232,7 +232,7 @@ func mergeExemplarSets(a, b []cortexpb.Exemplar) []cortexpb.Exemplar { func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.ExemplarQueryRequest) (*ingester_client.ExemplarQueryResponse, error) { // Fetch exemplars from multiple ingesters in parallel, using the replicationSet // to deal with consistency. - results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { + results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { client, err := d.ingesterPool.GetClientFor(ing.Addr) if err != nil { return nil, err @@ -293,7 +293,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ri ) // Fetch samples from multiple ingesters - results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { + results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, false, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) { client, err := d.ingesterPool.GetClientFor(ing.Addr) if err != nil { return nil, err diff --git a/pkg/ring/replication_set.go b/pkg/ring/replication_set.go index 67630bf53c..275b7e32fd 100644 --- a/pkg/ring/replication_set.go +++ b/pkg/ring/replication_set.go @@ -21,15 +21,16 @@ type ReplicationSet struct { } // Do function f in parallel for all replicas in the set, erroring is we exceed -// MaxErrors and returning early otherwise. -func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(context.Context, *InstanceDesc) (interface{}, error)) ([]interface{}, error) { +// MaxErrors and returning early otherwise. zoneResultsQuorum allows only include +// results from zones that already reach quorum to improve performance. +func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResultsQuorum bool, f func(context.Context, *InstanceDesc) (interface{}, error)) ([]interface{}, error) { type instanceResult struct { res interface{} err error instance *InstanceDesc } - // Initialise the result tracker, which is use to keep track of successes and failures. + // Initialise the result tracker, which is used to keep track of successes and failures. var tracker replicationSetResultTracker if r.MaxUnavailableZones > 0 { tracker = newZoneAwareResultTracker(r.Instances, r.MaxUnavailableZones) @@ -67,8 +68,7 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(cont }(i, &r.Instances[i]) } - results := make([]interface{}, 0, len(r.Instances)) - + resultsPerZone := make(map[string][]interface{}, r.GetNumOfZones()) for !tracker.succeeded() { select { case res := <-ch: @@ -83,7 +83,10 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(cont forceStart <- struct{}{} } } else { - results = append(results, res.res) + if _, ok := resultsPerZone[res.instance.Zone]; !ok { + resultsPerZone[res.instance.Zone] = make([]interface{}, 0, len(r.Instances)) + } + resultsPerZone[res.instance.Zone] = append(resultsPerZone[res.instance.Zone], res.res) } case <-ctx.Done(): @@ -91,6 +94,26 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, f func(cont } } + results := make([]interface{}, 0, len(r.Instances)) + // If zoneResultsQuorum and zone awareness is enabled, include + // results from the zones that already reached quorum only. + if zoneResultsQuorum && r.MaxUnavailableZones > 0 { + zoneAwareTracker, ok := tracker.(*zoneAwareResultTracker) + if ok { + for zone, waiting := range zoneAwareTracker.waitingByZone { + // No need to check failuresByZone since tracker + // should already succeed before reaching here. + if waiting == 0 { + results = append(results, resultsPerZone[zone]) + } + } + return results, nil + } + } + + for zone := range resultsPerZone { + results = append(results, resultsPerZone[zone]) + } return results, nil } From fe30e2dba46424d1470ec06f5b5cc9d9d461227b Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 21 Feb 2024 05:20:36 +0000 Subject: [PATCH 2/8] update doc Signed-off-by: Ben Ye --- docs/configuration/config-file-reference.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index f3d7e5f414..60e40d068d 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2338,6 +2338,12 @@ ring: # CLI flag: -distributor.ring.instance-interface-names [instance_interface_names: | default = [eth0 en0]] +# If zone awareness and this both enabled, when querying metadata APIs (labels +# names, values and series), only results from quorum number of zones will be +# included. +# CLI flag: -distributor.zone-results-quorum-metadata +[zone_results_quorum_metadata: | default = false] + instance_limits: # Max ingestion rate (samples/sec) that this distributor will accept. This # limit is per-distributor, not per-tenant. Additional push requests will be From baa87d6552c7aa8d1d33d1c5dbb97680cc47db85 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 20 Feb 2024 21:51:11 -0800 Subject: [PATCH 3/8] integration test Signed-off-by: Ben Ye --- integration/zone_aware_test.go | 92 ++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/integration/zone_aware_test.go b/integration/zone_aware_test.go index c4d7937478..4dd5f1c8aa 100644 --- a/integration/zone_aware_test.go +++ b/integration/zone_aware_test.go @@ -151,3 +151,95 @@ func TestZoneAwareReplication(t *testing.T) { require.Equal(t, 500, res.StatusCode) } + +func TestZoneResultsQuorum(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + flags := BlocksStorageFlags() + flags["-distributor.shard-by-all-labels"] = "true" + flags["-distributor.replication-factor"] = "3" + flags["-distributor.zone-awareness-enabled"] = "true" + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Start Cortex components. + ingesterFlags := func(zone string) map[string]string { + return mergeFlags(flags, map[string]string{ + "-ingester.availability-zone": zone, + }) + } + + ingester1 := e2ecortex.NewIngesterWithConfigFile("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-a"), "") + ingester2 := e2ecortex.NewIngesterWithConfigFile("ingester-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-a"), "") + ingester3 := e2ecortex.NewIngesterWithConfigFile("ingester-3", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-b"), "") + ingester4 := e2ecortex.NewIngesterWithConfigFile("ingester-4", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-b"), "") + ingester5 := e2ecortex.NewIngesterWithConfigFile("ingester-5", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-c"), "") + ingester6 := e2ecortex.NewIngesterWithConfigFile("ingester-6", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), "", ingesterFlags("zone-c"), "") + require.NoError(t, s.StartAndWaitReady(ingester1, ingester2, ingester3, ingester4, ingester5, ingester6)) + + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + flagsZoneResultsQuorum := mergeFlags(flags, map[string]string{ + "-distributor.zone-results-quorum-metadata": "true", + }) + querierZoneResultsQuorum := e2ecortex.NewQuerier("querier-zrq", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flagsZoneResultsQuorum, "") + require.NoError(t, s.StartAndWaitReady(distributor, querier, querierZoneResultsQuorum)) + + // Wait until distributor and queriers have updated the ring. + require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + require.NoError(t, querierZoneResultsQuorum.WaitSumMetricsWithOptions(e2e.Equals(6), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID) + require.NoError(t, err) + clientZoneResultsQuorum, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querierZoneResultsQuorum.HTTPEndpoint(), "", "", userID) + require.NoError(t, err) + + // Push some series + now := time.Now() + numSeries := 100 + expectedVectors := map[string]model.Vector{} + + for i := 1; i <= numSeries; i++ { + metricName := fmt.Sprintf("series_%d", i) + series, expectedVector := generateSeries(metricName, now) + res, err := client.Push(series) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + expectedVectors[metricName] = expectedVector + } + + start := now.Add(-time.Hour) + end := now.Add(time.Hour) + res1, err := client.LabelNames(start, end) + require.NoError(t, err) + res2, err := clientZoneResultsQuorum.LabelNames(start, end) + require.NoError(t, err) + assert.Equal(t, res1, res2) + + res1, err := client.LabelValues(labels.MetricName, start, end) + require.NoError(t, err) + res2, err := clientZoneResultsQuorum.LabelValues(labels.MetricName, start, end) + require.NoError(t, err) + assert.Equal(t, res1, res2) + + res1, err := client.Series(`{__name__=~"series_1|series_2|series_3|series_4|series_5"}`, start, end) + require.NoError(t, err) + res2, err := clientZoneResultsQuorum.Series(`{__name__=~"series_1|series_2|series_3|series_4|series_5"}`, start, end) + require.NoError(t, err) + assert.Equal(t, res1, res2) +} From 92f4b2d4f0ddd029faaaa442aeb550d13cf01702 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 20 Feb 2024 22:37:52 -0800 Subject: [PATCH 4/8] fix tests Signed-off-by: Ben Ye --- integration/zone_aware_test.go | 12 ++++++------ pkg/ring/replication_set.go | 4 ++-- pkg/ring/replication_set_test.go | 23 ++++++++++++++++++++++- 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/integration/zone_aware_test.go b/integration/zone_aware_test.go index 4dd5f1c8aa..938be900b0 100644 --- a/integration/zone_aware_test.go +++ b/integration/zone_aware_test.go @@ -231,15 +231,15 @@ func TestZoneResultsQuorum(t *testing.T) { require.NoError(t, err) assert.Equal(t, res1, res2) - res1, err := client.LabelValues(labels.MetricName, start, end) + values1, err := client.LabelValues(labels.MetricName, start, end, nil) require.NoError(t, err) - res2, err := clientZoneResultsQuorum.LabelValues(labels.MetricName, start, end) + values2, err := clientZoneResultsQuorum.LabelValues(labels.MetricName, start, end, nil) require.NoError(t, err) - assert.Equal(t, res1, res2) + assert.Equal(t, values1, values2) - res1, err := client.Series(`{__name__=~"series_1|series_2|series_3|series_4|series_5"}`, start, end) + series1, err := client.Series([]string{`{__name__=~"series_1|series_2|series_3|series_4|series_5"}`}, start, end) require.NoError(t, err) - res2, err := clientZoneResultsQuorum.Series(`{__name__=~"series_1|series_2|series_3|series_4|series_5"}`, start, end) + series2, err := clientZoneResultsQuorum.Series([]string{`{__name__=~"series_1|series_2|series_3|series_4|series_5"}`}, start, end) require.NoError(t, err) - assert.Equal(t, res1, res2) + assert.Equal(t, series1, series2) } diff --git a/pkg/ring/replication_set.go b/pkg/ring/replication_set.go index 275b7e32fd..4ca86a5a6e 100644 --- a/pkg/ring/replication_set.go +++ b/pkg/ring/replication_set.go @@ -104,7 +104,7 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResults // No need to check failuresByZone since tracker // should already succeed before reaching here. if waiting == 0 { - results = append(results, resultsPerZone[zone]) + results = append(results, resultsPerZone[zone]...) } } return results, nil @@ -112,7 +112,7 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResults } for zone := range resultsPerZone { - results = append(results, resultsPerZone[zone]) + results = append(results, resultsPerZone[zone]...) } return results, nil } diff --git a/pkg/ring/replication_set_test.go b/pkg/ring/replication_set_test.go index 0e63184170..f4fb7449c5 100644 --- a/pkg/ring/replication_set_test.go +++ b/pkg/ring/replication_set_test.go @@ -120,6 +120,7 @@ func TestReplicationSet_Do(t *testing.T) { cancelContextDelay time.Duration want []interface{} expectedError error + zoneResultsQuorum bool }{ { name: "max errors = 0, no errors no delay", @@ -211,6 +212,26 @@ func TestReplicationSet_Do(t *testing.T) { maxUnavailableZones: 2, want: []interface{}{1, 1, 1, 1, 1, 1}, }, + { + name: "max unavailable zones = 1, zoneResultsQuorum = false, should contain 5 results (2 from zone1, 2 from zone2 and 1 from zone3)", + instances: []InstanceDesc{{Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone3"}, {Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone3"}}, + f: func(c context.Context, id *InstanceDesc) (interface{}, error) { + return 1, nil + }, + maxUnavailableZones: 1, + want: []interface{}{1, 1, 1, 1, 1}, + zoneResultsQuorum: false, + }, + { + name: "max unavailable zones = 1, zoneResultsQuorum = true, should contain 4 results (2 from zone1, 2 from zone2)", + instances: []InstanceDesc{{Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone3"}, {Zone: "zone1"}, {Zone: "zone2"}, {Zone: "zone3"}}, + f: func(c context.Context, id *InstanceDesc) (interface{}, error) { + return 1, nil + }, + maxUnavailableZones: 1, + want: []interface{}{1, 1, 1, 1}, + zoneResultsQuorum: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -231,7 +252,7 @@ func TestReplicationSet_Do(t *testing.T) { cancel() }) } - got, err := r.Do(ctx, tt.delay, tt.f) + got, err := r.Do(ctx, tt.delay, tt.zoneResultsQuorum, tt.f) if tt.expectedError != nil { assert.Equal(t, tt.expectedError, err) } else { From 3f09c40c1c06d5b8f2e2c379bfee6e8e1c6cd6c4 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 21 Feb 2024 08:13:24 -0800 Subject: [PATCH 5/8] mark flag hidden Signed-off-by: Ben Ye --- pkg/distributor/distributor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index a8083fbfd4..2fce10cf31 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -148,7 +148,7 @@ type Config struct { // ZoneResultsQuorumMetadata enables zone results quorum when querying ingester replication set // with metadata APIs (labels names, values and series). When zone awareness is enabled, only results // from quorum number of zones will be included to reduce data merged and improve performance. - ZoneResultsQuorumMetadata bool `yaml:"zone_results_quorum_metadata"` + ZoneResultsQuorumMetadata bool `yaml:"zone_results_quorum_metadata" doc:"hidden"` // Limits for distributor InstanceLimits InstanceLimits `yaml:"instance_limits"` @@ -172,7 +172,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.SignWriteRequestsEnabled, "distributor.sign-write-requests", false, "EXPERIMENTAL: If enabled, sign the write request between distributors and ingesters.") f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", "))) f.BoolVar(&cfg.ExtendWrites, "distributor.extend-writes", true, "Try writing to an additional ingester in the presence of an ingester not in the ACTIVE state. It is useful to disable this along with -ingester.unregister-on-shutdown=false in order to not spread samples to extra ingesters during rolling restarts with consistent naming.") - f.BoolVar(&cfg.ZoneResultsQuorumMetadata, "distributor.zone-results-quorum-metadata", false, "If zone awareness and this both enabled, when querying metadata APIs (labels names, values and series), only results from quorum number of zones will be included.") + f.BoolVar(&cfg.ZoneResultsQuorumMetadata, "distributor.zone-results-quorum-metadata", false, "Experimental, this flag may change in the future. If zone awareness and this both enabled, when querying metadata APIs (labels names, values and series), only results from quorum number of zones will be included.") f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.") f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.") From 106d5b33d0f84bd200ccdfc0167fecea6a0c6ec2 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 21 Feb 2024 16:23:37 +0000 Subject: [PATCH 6/8] changelog Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + docs/configuration/config-file-reference.md | 6 ------ 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fee8384c8..e8ae5a0dd7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ * [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731 * [FEATURE] Tracing: Add `tracing.otel.round-robin` flag to use `round_robin` gRPC client side LB policy for sending OTLP traces. #5731 * [FEATURE] Ruler: Add `ruler.concurrent-evals-enabled` flag to enable concurrent evaluation within a single rule group for independent rules. Maximum concurrency can be configured via `ruler.max-concurrent-evals`. #5766 +* [FEATURE] Distributor Queryable: Experimental: Add config `zone_results_quorum_metadata`. When querying ingesters using metadata APIs such as label names, values and series, only results from quorum number of zones will be included and merged. #5779 * [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638 * [ENHANCEMENT] Compactor: Add new compactor metric `cortex_compactor_start_duration_seconds`. #5683 * [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 60e40d068d..f3d7e5f414 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2338,12 +2338,6 @@ ring: # CLI flag: -distributor.ring.instance-interface-names [instance_interface_names: | default = [eth0 en0]] -# If zone awareness and this both enabled, when querying metadata APIs (labels -# names, values and series), only results from quorum number of zones will be -# included. -# CLI flag: -distributor.zone-results-quorum-metadata -[zone_results_quorum_metadata: | default = false] - instance_limits: # Max ingestion rate (samples/sec) that this distributor will accept. This # limit is per-distributor, not per-tenant. Additional push requests will be From 8cf6ed19ea4490beb2b5a4100f0b808b6437b500 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 21 Feb 2024 14:11:12 -0800 Subject: [PATCH 7/8] refactor interfaces Signed-off-by: Ben Ye --- pkg/ring/replication_set.go | 32 +--- pkg/ring/replication_set_tracker.go | 56 ++++++- pkg/ring/replication_set_tracker_test.go | 203 +++++++++++++++++++---- 3 files changed, 225 insertions(+), 66 deletions(-) diff --git a/pkg/ring/replication_set.go b/pkg/ring/replication_set.go index 4ca86a5a6e..0182207fd7 100644 --- a/pkg/ring/replication_set.go +++ b/pkg/ring/replication_set.go @@ -33,7 +33,7 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResults // Initialise the result tracker, which is used to keep track of successes and failures. var tracker replicationSetResultTracker if r.MaxUnavailableZones > 0 { - tracker = newZoneAwareResultTracker(r.Instances, r.MaxUnavailableZones) + tracker = newZoneAwareResultTracker(r.Instances, r.MaxUnavailableZones, zoneResultsQuorum) } else { tracker = newDefaultResultTracker(r.Instances, r.MaxErrors) } @@ -68,11 +68,10 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResults }(i, &r.Instances[i]) } - resultsPerZone := make(map[string][]interface{}, r.GetNumOfZones()) for !tracker.succeeded() { select { case res := <-ch: - tracker.done(res.instance, res.err) + tracker.done(res.instance, res.res, res.err) if res.err != nil { if tracker.failed() { return nil, res.err @@ -82,11 +81,6 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResults if delay > 0 && r.MaxUnavailableZones == 0 { forceStart <- struct{}{} } - } else { - if _, ok := resultsPerZone[res.instance.Zone]; !ok { - resultsPerZone[res.instance.Zone] = make([]interface{}, 0, len(r.Instances)) - } - resultsPerZone[res.instance.Zone] = append(resultsPerZone[res.instance.Zone], res.res) } case <-ctx.Done(): @@ -94,27 +88,7 @@ func (r ReplicationSet) Do(ctx context.Context, delay time.Duration, zoneResults } } - results := make([]interface{}, 0, len(r.Instances)) - // If zoneResultsQuorum and zone awareness is enabled, include - // results from the zones that already reached quorum only. - if zoneResultsQuorum && r.MaxUnavailableZones > 0 { - zoneAwareTracker, ok := tracker.(*zoneAwareResultTracker) - if ok { - for zone, waiting := range zoneAwareTracker.waitingByZone { - // No need to check failuresByZone since tracker - // should already succeed before reaching here. - if waiting == 0 { - results = append(results, resultsPerZone[zone]...) - } - } - return results, nil - } - } - - for zone := range resultsPerZone { - results = append(results, resultsPerZone[zone]...) - } - return results, nil + return tracker.getResults(), nil } // Includes returns whether the replication set includes the replica with the provided addr. diff --git a/pkg/ring/replication_set_tracker.go b/pkg/ring/replication_set_tracker.go index fcdf5441dd..c62895d7ba 100644 --- a/pkg/ring/replication_set_tracker.go +++ b/pkg/ring/replication_set_tracker.go @@ -2,14 +2,18 @@ package ring type replicationSetResultTracker interface { // Signals an instance has done the execution, either successful (no error) - // or failed (with error). - done(instance *InstanceDesc, err error) + // or failed (with error). If successful, result will be recorded and can + // be accessed via getResults. + done(instance *InstanceDesc, result interface{}, err error) // Returns true if the minimum number of successful results have been received. succeeded() bool // Returns true if the maximum number of failed executions have been reached. failed() bool + + // Returns recorded results. + getResults() []interface{} } type defaultResultTracker struct { @@ -17,6 +21,7 @@ type defaultResultTracker struct { numSucceeded int numErrors int maxErrors int + results []interface{} } func newDefaultResultTracker(instances []InstanceDesc, maxErrors int) *defaultResultTracker { @@ -25,12 +30,14 @@ func newDefaultResultTracker(instances []InstanceDesc, maxErrors int) *defaultRe numSucceeded: 0, numErrors: 0, maxErrors: maxErrors, + results: make([]interface{}, 0, len(instances)), } } -func (t *defaultResultTracker) done(_ *InstanceDesc, err error) { +func (t *defaultResultTracker) done(_ *InstanceDesc, result interface{}, err error) { if err == nil { t.numSucceeded++ + t.results = append(t.results, result) } else { t.numErrors++ } @@ -44,6 +51,10 @@ func (t *defaultResultTracker) failed() bool { return t.numErrors > t.maxErrors } +func (t *defaultResultTracker) getResults() []interface{} { + return t.results +} + // zoneAwareResultTracker tracks the results per zone. // All instances in a zone must succeed in order for the zone to succeed. type zoneAwareResultTracker struct { @@ -51,29 +62,42 @@ type zoneAwareResultTracker struct { failuresByZone map[string]int minSuccessfulZones int maxUnavailableZones int + resultsPerZone map[string][]interface{} + numInstances int + zoneResultsQuorum bool } -func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int) *zoneAwareResultTracker { +func newZoneAwareResultTracker(instances []InstanceDesc, maxUnavailableZones int, zoneResultsQuorum bool) *zoneAwareResultTracker { t := &zoneAwareResultTracker{ waitingByZone: make(map[string]int), failuresByZone: make(map[string]int), maxUnavailableZones: maxUnavailableZones, + numInstances: len(instances), + zoneResultsQuorum: zoneResultsQuorum, } for _, instance := range instances { t.waitingByZone[instance.Zone]++ } t.minSuccessfulZones = len(t.waitingByZone) - maxUnavailableZones + t.resultsPerZone = make(map[string][]interface{}, len(t.waitingByZone)) return t } -func (t *zoneAwareResultTracker) done(instance *InstanceDesc, err error) { - t.waitingByZone[instance.Zone]-- - +func (t *zoneAwareResultTracker) done(instance *InstanceDesc, result interface{}, err error) { if err != nil { t.failuresByZone[instance.Zone]++ + } else { + if _, ok := t.resultsPerZone[instance.Zone]; !ok { + // If it is the first result in the zone, then total number of instances + // in this zone should be current waiting required + 1, since this is called after done. + t.resultsPerZone[instance.Zone] = make([]interface{}, 0, t.waitingByZone[instance.Zone]) + } + t.resultsPerZone[instance.Zone] = append(t.resultsPerZone[instance.Zone], result) } + + t.waitingByZone[instance.Zone]-- } func (t *zoneAwareResultTracker) succeeded() bool { @@ -94,3 +118,21 @@ func (t *zoneAwareResultTracker) failed() bool { failedZones := len(t.failuresByZone) return failedZones > t.maxUnavailableZones } + +func (t *zoneAwareResultTracker) getResults() []interface{} { + results := make([]interface{}, 0, t.numInstances) + if t.zoneResultsQuorum { + for zone, waiting := range t.waitingByZone { + // No need to check failuresByZone since tracker + // should already succeed before reaching here. + if waiting == 0 { + results = append(results, t.resultsPerZone[zone]...) + } + } + } else { + for zone := range t.resultsPerZone { + results = append(results, t.resultsPerZone[zone]...) + } + } + return results +} diff --git a/pkg/ring/replication_set_tracker_test.go b/pkg/ring/replication_set_tracker_test.go index f24d23c00a..a0d04f1279 100644 --- a/pkg/ring/replication_set_tracker_test.go +++ b/pkg/ring/replication_set_tracker_test.go @@ -33,19 +33,19 @@ func TestDefaultResultTracker(t *testing.T) { assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance1, nil) + tracker.done(&instance1, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance2, nil) + tracker.done(&instance2, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance3, nil) + tracker.done(&instance3, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance4, nil) + tracker.done(&instance4, nil, nil) assert.True(t, tracker.succeeded()) assert.False(t, tracker.failed()) }, @@ -57,11 +57,11 @@ func TestDefaultResultTracker(t *testing.T) { assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance1, nil) + tracker.done(&instance1, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance2, errors.New("test")) + tracker.done(&instance2, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.True(t, tracker.failed()) }, @@ -73,15 +73,15 @@ func TestDefaultResultTracker(t *testing.T) { assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance1, nil) + tracker.done(&instance1, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance2, errors.New("test")) + tracker.done(&instance2, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance3, errors.New("test")) + tracker.done(&instance3, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.True(t, tracker.failed()) }, @@ -93,23 +93,67 @@ func TestDefaultResultTracker(t *testing.T) { assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance1, nil) + tracker.done(&instance1, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance2, errors.New("test")) + tracker.done(&instance2, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance3, errors.New("test")) + tracker.done(&instance3, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance4, errors.New("test")) + tracker.done(&instance4, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.True(t, tracker.failed()) }, }, + "record and getResults": { + instances: []InstanceDesc{instance1, instance2, instance3, instance4}, + maxErrors: 1, + run: func(t *testing.T, tracker *defaultResultTracker) { + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance1, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance2, 2, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance3, 3, nil) + assert.True(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + assert.Equal(t, []interface{}{1, 2, 3}, tracker.getResults()) + }, + }, + "record and getResults2": { + instances: []InstanceDesc{instance1, instance2, instance3, instance4}, + maxErrors: 1, + run: func(t *testing.T, tracker *defaultResultTracker) { + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance1, []int{1, 1, 1}, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance2, []int{2, 2, 2}, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance3, []int{3, 3, 3}, nil) + assert.True(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + assert.Equal(t, []interface{}{[]int{1, 1, 1}, []int{2, 2, 2}, []int{3, 3, 3}}, tracker.getResults()) + }, + }, } for testName, testCase := range tests { @@ -130,6 +174,7 @@ func TestZoneAwareResultTracker(t *testing.T) { tests := map[string]struct { instances []InstanceDesc maxUnavailableZones int + zoneResultsQuorum bool run func(t *testing.T, tracker *zoneAwareResultTracker) }{ "should succeed on no instances to track": { @@ -147,17 +192,115 @@ func TestZoneAwareResultTracker(t *testing.T) { assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance1, nil) + tracker.done(&instance1, 1, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance2, nil) + tracker.done(&instance2, 1, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance3, nil) + tracker.done(&instance3, 1, nil) assert.True(t, tracker.succeeded()) assert.False(t, tracker.failed()) + + assert.Equal(t, []interface{}{1, 1, 1}, tracker.getResults()) + }, + }, + "should succeed once all 6 instances succeed on max unavailable zones = 0": { + instances: []InstanceDesc{instance1, instance2, instance3, instance4, instance5, instance6}, + maxUnavailableZones: 0, + run: func(t *testing.T, tracker *zoneAwareResultTracker) { + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance1, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance2, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance3, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance4, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance5, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance6, 1, nil) + assert.True(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + assert.Equal(t, []interface{}{1, 1, 1, 1, 1, 1}, tracker.getResults()) + }, + }, + "should succeed once all 5 instances succeed on max unavailable zones = 1, zone results quorum disabled": { + instances: []InstanceDesc{instance1, instance2, instance3, instance4, instance5, instance6}, + maxUnavailableZones: 1, + zoneResultsQuorum: false, + run: func(t *testing.T, tracker *zoneAwareResultTracker) { + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance1, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance2, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance3, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance5, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance4, 1, nil) + assert.True(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + assert.Equal(t, []interface{}{1, 1, 1, 1, 1}, tracker.getResults()) + }, + }, + "should succeed once all 5 instances succeed on max unavailable zones = 1, zone results quorum enabled": { + instances: []InstanceDesc{instance1, instance2, instance3, instance4, instance5, instance6}, + maxUnavailableZones: 1, + zoneResultsQuorum: true, + run: func(t *testing.T, tracker *zoneAwareResultTracker) { + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance1, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance2, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance3, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance5, 1, nil) + assert.False(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + tracker.done(&instance4, 1, nil) + assert.True(t, tracker.succeeded()) + assert.False(t, tracker.failed()) + + assert.Equal(t, []interface{}{1, 1, 1, 1}, tracker.getResults()) }, }, "should fail on 1st failing instance on max unavailable zones = 0": { @@ -167,11 +310,11 @@ func TestZoneAwareResultTracker(t *testing.T) { assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance1, nil) + tracker.done(&instance1, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance2, errors.New("test")) + tracker.done(&instance2, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.True(t, tracker.failed()) }, @@ -182,19 +325,19 @@ func TestZoneAwareResultTracker(t *testing.T) { run: func(t *testing.T, tracker *zoneAwareResultTracker) { // Track failing instances. for _, instance := range []InstanceDesc{instance1, instance2} { - tracker.done(&instance, errors.New("test")) + tracker.done(&instance, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) } // Track successful instances. for _, instance := range []InstanceDesc{instance3, instance4, instance5} { - tracker.done(&instance, nil) + tracker.done(&instance, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) } - tracker.done(&instance6, nil) + tracker.done(&instance6, nil, nil) assert.True(t, tracker.succeeded()) assert.False(t, tracker.failed()) }, @@ -205,12 +348,12 @@ func TestZoneAwareResultTracker(t *testing.T) { run: func(t *testing.T, tracker *zoneAwareResultTracker) { // Track successful instances. for _, instance := range []InstanceDesc{instance1, instance2, instance3} { - tracker.done(&instance, nil) + tracker.done(&instance, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) } - tracker.done(&instance4, nil) + tracker.done(&instance4, nil, nil) assert.True(t, tracker.succeeded()) assert.False(t, tracker.failed()) }, @@ -221,17 +364,17 @@ func TestZoneAwareResultTracker(t *testing.T) { run: func(t *testing.T, tracker *zoneAwareResultTracker) { // Track failing instances. for _, instance := range []InstanceDesc{instance1, instance2, instance3, instance4} { - tracker.done(&instance, errors.New("test")) + tracker.done(&instance, nil, errors.New("test")) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) } // Track successful instances. - tracker.done(&instance5, nil) + tracker.done(&instance5, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) - tracker.done(&instance6, nil) + tracker.done(&instance6, nil, nil) assert.True(t, tracker.succeeded()) assert.False(t, tracker.failed()) }, @@ -241,17 +384,17 @@ func TestZoneAwareResultTracker(t *testing.T) { maxUnavailableZones: 2, run: func(t *testing.T, tracker *zoneAwareResultTracker) { // Zone-a - tracker.done(&instance1, nil) + tracker.done(&instance1, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) // Zone-b - tracker.done(&instance3, nil) + tracker.done(&instance3, nil, nil) assert.False(t, tracker.succeeded()) assert.False(t, tracker.failed()) // Zone-a - tracker.done(&instance2, nil) + tracker.done(&instance2, nil, nil) assert.True(t, tracker.succeeded()) assert.False(t, tracker.failed()) }, @@ -260,7 +403,7 @@ func TestZoneAwareResultTracker(t *testing.T) { for testName, testCase := range tests { t.Run(testName, func(t *testing.T) { - testCase.run(t, newZoneAwareResultTracker(testCase.instances, testCase.maxUnavailableZones)) + testCase.run(t, newZoneAwareResultTracker(testCase.instances, testCase.maxUnavailableZones, testCase.zoneResultsQuorum)) }) } } From ef6db9537888ed6fd6c9b49f211184b79db38b94 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 21 Feb 2024 14:35:54 -0800 Subject: [PATCH 8/8] update comment Signed-off-by: Ben Ye --- pkg/ring/replication_set_tracker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ring/replication_set_tracker.go b/pkg/ring/replication_set_tracker.go index c62895d7ba..dd22909747 100644 --- a/pkg/ring/replication_set_tracker.go +++ b/pkg/ring/replication_set_tracker.go @@ -91,7 +91,7 @@ func (t *zoneAwareResultTracker) done(instance *InstanceDesc, result interface{} } else { if _, ok := t.resultsPerZone[instance.Zone]; !ok { // If it is the first result in the zone, then total number of instances - // in this zone should be current waiting required + 1, since this is called after done. + // in this zone should be number of waiting required. t.resultsPerZone[instance.Zone] = make([]interface{}, 0, t.waitingByZone[instance.Zone]) } t.resultsPerZone[instance.Zone] = append(t.resultsPerZone[instance.Zone], result)