diff --git a/CHANGELOG.md b/CHANGELOG.md index 77444a304c5..45fad4f40cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,11 +7,13 @@ * `-experimental.distributor.user-subring-size` flag renamed to `-distributor.ingestion-tenant-shard-size` * `user_subring_size` limit YAML config option renamed to `ingestion_tenant_shard_size` * [CHANGE] Dropped "blank Alertmanager configuration; using fallback" message from Info to Debug level. #3205 +* [CHANGE] Zone-awareness replication for time-series now should be explicitly enabled in the distributor via the `-distributor.zone-awareness-enabled` CLI flag (or its respective YAML config option). Before, zone-aware replication was implicitly enabled if a zone was set on ingesters. #3200 * [FEATURE] Added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-user` globally, or using per-user limit `max_queriers_per_user`), each user's requests will be handled by different set of queriers. #3113 * [ENHANCEMENT] Added `cortex_query_frontend_connected_clients` metric to show the number of workers currently connected to the frontend. #3207 * [ENHANCEMENT] Shuffle sharding: improved shuffle sharding in the write path. Shuffle sharding now should be explicitly enabled via `-distributor.sharding-strategy` CLI flag (or its respective YAML config option) and guarantees stability, consistency, shuffling and balanced zone-awareness properties. #3090 * [ENHANCEMENT] Ingester: added new metric `cortex_ingester_active_series` to track active series more accurately. Also added options to control whether active series tracking is enabled (`-ingester.active-series-enabled`, defaults to false), and how often this metric is updated (`-ingester.active-series-update-period`) and max idle time for series to be considered inactive (`-ingester.active-series-idle-timeout`). #3153 * [ENHANCEMENT] Blocksconvert – Builder: download plan file locally before processing it. #3209 +* [ENHANCEMENT] Store-gateway: added zone-aware replication support to blocks replication in the store-gateway. #3200 * [BUGFIX] No-longer-needed ingester operations for queries triggered by queriers and rulers are now canceled. #3178 * [BUGFIX] Ruler: directories in the configured `rules-path` will be removed on startup and shutdown in order to ensure they don't persist between runs. #3195 * [BUGFIX] Handle hash-collisions in the query path. #3192 diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index c145c01776e..483fff51598 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -56,6 +56,16 @@ To protect from this, when an healthy store-gateway instance finds another insta This feature is called **auto-forget** and is built into the store-gateway. +### Zone-awareness + +The store-gateway replication optionally supports [zone-awareness](../guides/zone-replication.md). When zone-aware replication is enabled and the blocks replication factor is > 1, each block is guaranteed to be replicated across store-gateway instances running in different availability zones. + +**To enable** the zone-aware replication for the store-gateways you should: + +1. Configure the availability zone for each store-gateway via the `-store-gateway.sharding-ring.instance-availability-zone` CLI flag (or its respective YAML config option) +2. Enable blocks zone-aware replication via the `-store-gateway.sharding-ring.zone-awareness-enabled` CLI flag (or its respective YAML config option). Please be aware this configuration option should be set to store-gateways, queriers and rulers. +3. Rollout store-gateways, queriers and rulers to apply the new configuration + ## Caching The store-gateway supports the following caches: @@ -207,6 +217,16 @@ store_gateway: # CLI flag: -store-gateway.sharding-ring.tokens-file-path [tokens_file_path: | default = ""] + # True to enable zone-awareness and replicate blocks across different + # availability zones. + # CLI flag: -store-gateway.sharding-ring.zone-awareness-enabled + [zone_awareness_enabled: | default = false] + + # The availability zone where this instance is running. Required if + # zone-awareness is enabled. + # CLI flag: -store-gateway.sharding-ring.instance-availability-zone + [instance_availability_zone: | default = ""] + # The sharding strategy to use. Supported values are: default, # shuffle-sharding. # CLI flag: -store-gateway.sharding-strategy diff --git a/docs/blocks-storage/store-gateway.template b/docs/blocks-storage/store-gateway.template index 8c95335df7a..3f0454d209c 100644 --- a/docs/blocks-storage/store-gateway.template +++ b/docs/blocks-storage/store-gateway.template @@ -56,6 +56,16 @@ To protect from this, when an healthy store-gateway instance finds another insta This feature is called **auto-forget** and is built into the store-gateway. +### Zone-awareness + +The store-gateway replication optionally supports [zone-awareness](../guides/zone-replication.md). When zone-aware replication is enabled and the blocks replication factor is > 1, each block is guaranteed to be replicated across store-gateway instances running in different availability zones. + +**To enable** the zone-aware replication for the store-gateways you should: + +1. Configure the availability zone for each store-gateway via the `-store-gateway.sharding-ring.instance-availability-zone` CLI flag (or its respective YAML config option) +2. Enable blocks zone-aware replication via the `-store-gateway.sharding-ring.zone-awareness-enabled` CLI flag (or its respective YAML config option). Please be aware this configuration option should be set to store-gateways, queriers and rulers. +3. Rollout store-gateways, queriers and rulers to apply the new configuration + ## Caching The store-gateway supports the following caches: diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index d07ea185634..7f1d5fd7e25 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -523,6 +523,11 @@ lifecycler: # CLI flag: -distributor.replication-factor [replication_factor: | default = 3] + # True to enable the zone-awareness and replicate ingested samples across + # different availability zones. + # CLI flag: -distributor.zone-awareness-enabled + [zone_awareness_enabled: | default = false] + # Number of tokens for each ingester. # CLI flag: -ingester.num-tokens [num_tokens: | default = 128] @@ -559,8 +564,7 @@ lifecycler: # CLI flag: -ingester.tokens-file-path [tokens_file_path: | default = ""] - # The availability zone of the host, this instance is running on. Default is - # an empty string, which disables zone awareness for writes. + # The availability zone where this instance is running. # CLI flag: -ingester.availability-zone [availability_zone: | default = ""] @@ -3663,6 +3667,16 @@ sharding_ring: # CLI flag: -store-gateway.sharding-ring.tokens-file-path [tokens_file_path: | default = ""] + # True to enable zone-awareness and replicate blocks across different + # availability zones. + # CLI flag: -store-gateway.sharding-ring.zone-awareness-enabled + [zone_awareness_enabled: | default = false] + + # The availability zone where this instance is running. Required if + # zone-awareness is enabled. + # CLI flag: -store-gateway.sharding-ring.instance-availability-zone + [instance_availability_zone: | default = ""] + # The sharding strategy to use. Supported values are: default, shuffle-sharding. # CLI flag: -store-gateway.sharding-strategy [sharding_strategy: | default = "default"] diff --git a/docs/guides/zone-replication.md b/docs/guides/zone-replication.md index 3092dc21186..af58aca2ea2 100644 --- a/docs/guides/zone-replication.md +++ b/docs/guides/zone-replication.md @@ -5,26 +5,39 @@ weight: 5 slug: zone-aware-replication --- -In a default configuration, time-series written to ingesters are replicated based on the container/pod name of the ingester instances. It is completely possible that all the replicas for the given time-series are held with in the same availability zone, even if the cortex infrastructure spans multiple zones within the region. Storing multiple replicas for a given time-series poses a risk for data loss if there is an outage affecting various nodes within a zone or a total outage. +Cortex supports data replication for different services. By default, data is transparently replicated across the whole pool of service instances, regardless of whether these instances are all running within the same availability zone (or data center, or rack) or in different ones. -## Configuration +It is completely possible that all the replicas for the given data are held within the same availability zone, even if the Cortex cluster spans multiple zones. Storing multiple replicas for a given data within the same availability zone poses a risk for data loss if there is an outage affecting various nodes within a zone or a full zone outage. -Cortex can be configured to consider an availability zone value in its replication system. Doing so mitigates risks associated with losing multiple nodes within the same availability zone. The availability zone for an ingester can be defined on the command line of the ingester using the `ingester.availability-zone` flag or using the yaml configuration: +For this reason, Cortex optionally supports zone-aware replication. When zone-aware replication is **enabled**, replicas for the given data are guaranteed to span across different availability zones. This requires Cortex cluster to run at least in a number of zones equal to the configured replication factor. -```yaml -ingester: - lifecycler: - availability_zone: "zone-3" -``` +The Cortex services supporting **zone-aware replication** are: -## Zone Replication Considerations +- **[Distributors and Ingesters](#distributors-and-ingesters-time-series-replication)** +- **[Store-gateways](#store-gateways-blocks-replication)** ([blocks storage](../blocks-storage/_index.md) only) -Enabling availability zone awareness helps mitigate risks regarding data loss within a single zone, some items need consideration by an operator if they are thinking of enabling this feature. +## Distributors / Ingesters: time-series replication -### Minimum number of Zones +The Cortex time-series replication is used to hold multiple (typically 3) replicas of each time series in the **ingesters**. -For cortex to function correctly, there must be at least the same number of availability zones as there is replica count. So by default, a cortex cluster should be spread over 3 zones as the default replica count is 3. It is safe to have more zones than the replica count, but it cannot be less. Having fewer availability zones than replica count causes a replica write to be missed, and in some cases, the write fails if the availability zone count is too low. +**To enable** the zone-aware replication for the ingesters you should: -### Cost +1. Configure the availability zone for each ingester via the `-ingester.availability-zone` CLI flag (or its respective YAML config option) +2. Rollout ingesters to apply the configured zone +3. Enable time-series zone-aware replication via the `-distributor.zone-awareness-enabled` CLI flag (or its respective YAML config option). Please be aware this configuration option should be set to distributors, queriers and rulers. -Depending on the existing cortex infrastructure being used, this may cause an increase in running costs as most cloud providers charge for cross availability zone traffic. The most significant change would be for a cortex cluster currently running in a singular zone. \ No newline at end of file +## Store-gateways: blocks replication + +The Cortex [store-gateway](../blocks-storage/store-gateway.md) (used only when Cortex is running with the [blocks storage](../blocks-storage/_index.md)) supports blocks sharding, used to horizontally scale blocks in a large cluster without hitting any vertical scalability limit. + +To enable the zone-aware replication for the store-gateways, please refer to the [store-gateway](../blocks-storage/store-gateway.md#zone-awareness) documentation. + +## Minimum number of zones + +For Cortex to function correctly, there must be at least the same number of availability zones as the replication factor. For example, if the replication factor is configured to 3 (default for time-series replication), the Cortex cluster should be spread at least over 3 availability zones. + +It is safe to have more zones than the replication factor, but it cannot be less. Having fewer availability zones than replication factor causes a replica write to be missed, and in some cases, the write fails if the availability zones count is too low. + +## Impact on costs + +Depending on the underlying infrastructure being used, deploying Cortex across multiple availability zones may cause an increase in running costs as most cloud providers charge for inter availability zone networking. The most significant change would be for a Cortex cluster currently running in a single zone. diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 4a4430f0a63..40629e65302 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -963,7 +963,7 @@ func prepare(t *testing.T, cfg prepConfig) ([]*Distributor, []mockIngester, *rin addr := fmt.Sprintf("%d", i) ingesterDescs[addr] = ring.IngesterDesc{ Addr: addr, - Zone: addr, + Zone: "", State: ring.ACTIVE, Timestamp: time.Now().Unix(), Tokens: []uint32{uint32((math.MaxUint32 / cfg.numIngesters) * i)}, diff --git a/pkg/ring/lifecycler.go b/pkg/ring/lifecycler.go index bc1f1b171c4..8ad7f8ae788 100644 --- a/pkg/ring/lifecycler.go +++ b/pkg/ring/lifecycler.go @@ -101,7 +101,7 @@ func (cfg *LifecyclerConfig) RegisterFlagsWithPrefix(prefix string, f *flag.Flag f.StringVar(&cfg.Addr, prefix+"lifecycler.addr", "", "IP address to advertise in consul.") f.IntVar(&cfg.Port, prefix+"lifecycler.port", 0, "port to advertise in consul (defaults to server.grpc-listen-port).") f.StringVar(&cfg.ID, prefix+"lifecycler.ID", hostname, "ID to register into consul.") - f.StringVar(&cfg.Zone, prefix+"availability-zone", "", "The availability zone of the host, this instance is running on. Default is an empty string, which disables zone awareness for writes.") + f.StringVar(&cfg.Zone, prefix+"availability-zone", "", "The availability zone where this instance is running.") } // Lifecycler is responsible for managing the lifecycle of entries in the ring. diff --git a/pkg/ring/replication_strategy.go b/pkg/ring/replication_strategy.go index 370c0040a52..67d96c30ed4 100644 --- a/pkg/ring/replication_strategy.go +++ b/pkg/ring/replication_strategy.go @@ -9,7 +9,7 @@ type ReplicationStrategy interface { // Filter out unhealthy instances and checks if there're enough instances // for an operation to succeed. Returns an error if there are not enough // instances. - Filter(instances []IngesterDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration) (healthy []IngesterDesc, maxFailures int, err error) + Filter(instances []IngesterDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool) (healthy []IngesterDesc, maxFailures int, err error) // ShouldExtendReplicaSet returns true if given an instance that's going to be // added to the replica set, the replica set size should be extended by 1 @@ -25,7 +25,7 @@ type DefaultReplicationStrategy struct{} // - Filters out dead ingesters so the one doesn't even try to write to them. // - Checks there is enough ingesters for an operation to succeed. // The ingesters argument may be overwritten. -func (s *DefaultReplicationStrategy) Filter(ingesters []IngesterDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration) ([]IngesterDesc, int, error) { +func (s *DefaultReplicationStrategy) Filter(ingesters []IngesterDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool) ([]IngesterDesc, int, error) { // We need a response from a quorum of ingesters, which is n/2 + 1. In the // case of a node joining/leaving, the actual replica set might be bigger // than the replication factor, so use the bigger or the two. @@ -49,8 +49,14 @@ func (s *DefaultReplicationStrategy) Filter(ingesters []IngesterDesc, op Operati // This is just a shortcut - if there are not minSuccess available ingesters, // after filtering out dead ones, don't even bother trying. if len(ingesters) < minSuccess { - err := fmt.Errorf("at least %d live replicas required, could only find %d", - minSuccess, len(ingesters)) + var err error + + if zoneAwarenessEnabled { + err = fmt.Errorf("at least %d live replicas required across different availability zones, could only find %d", minSuccess, len(ingesters)) + } else { + err = fmt.Errorf("at least %d live replicas required, could only find %d", minSuccess, len(ingesters)) + } + return nil, 0, err } diff --git a/pkg/ring/replication_strategy_test.go b/pkg/ring/replication_strategy_test.go index 10a299ec35e..da5e95ac0a0 100644 --- a/pkg/ring/replication_strategy_test.go +++ b/pkg/ring/replication_strategy_test.go @@ -91,7 +91,7 @@ func TestRingReplicationStrategy(t *testing.T) { t.Run(fmt.Sprintf("[%d]", i), func(t *testing.T) { strategy := &DefaultReplicationStrategy{} - liveIngesters, maxFailure, err := strategy.Filter(ingesters, tc.op, tc.RF, 100*time.Second) + liveIngesters, maxFailure, err := strategy.Filter(ingesters, tc.op, tc.RF, 100*time.Second, false) if tc.ExpectedError == "" { assert.NoError(t, err) assert.Equal(t, tc.LiveIngesters, len(liveIngesters)) diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index e572b0500b3..6374add3fd3 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -85,9 +85,10 @@ var ( // Config for a Ring type Config struct { - KVStore kv.Config `yaml:"kvstore"` - HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` - ReplicationFactor int `yaml:"replication_factor"` + KVStore kv.Config `yaml:"kvstore"` + HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` + ReplicationFactor int `yaml:"replication_factor"` + ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` } // RegisterFlags adds the flags required to config this to the given FlagSet with a specified prefix @@ -101,6 +102,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.DurationVar(&cfg.HeartbeatTimeout, prefix+"ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which ingesters are skipped for reads/writes.") f.IntVar(&cfg.ReplicationFactor, prefix+"distributor.replication-factor", 3, "The number of ingesters to write to and read from.") + f.BoolVar(&cfg.ZoneAwarenessEnabled, prefix+"distributor.zone-awareness-enabled", false, "True to enable the zone-awareness and replicate ingested samples across different availability zones.") } // Ring holds the information about the members of the consistent hash ring. @@ -240,12 +242,15 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet if _, ok := distinctHosts[token.Ingester]; ok { continue } - if token.Zone != "" { // Ignore if the ingesters don't have a zone set. + + // Ignore if the ingesters don't have a zone set. + if r.cfg.ZoneAwarenessEnabled && token.Zone != "" { if _, ok := distinctZones[token.Zone]; ok { continue } distinctZones[token.Zone] = struct{}{} } + distinctHosts[token.Ingester] = struct{}{} ingester := r.ringDesc.Ingesters[token.Ingester] @@ -258,7 +263,7 @@ func (r *Ring) Get(key uint32, op Operation, buf []IngesterDesc) (ReplicationSet ingesters = append(ingesters, ingester) } - liveIngesters, maxFailure, err := r.strategy.Filter(ingesters, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout) + liveIngesters, maxFailure, err := r.strategy.Filter(ingesters, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled) if err != nil { return ReplicationSet{}, err } @@ -402,72 +407,6 @@ func (r *Ring) Collect(ch chan<- prometheus.Metric) { ) } -// Subring returns a ring of n ingesters from the given ring. If the subring can't be built -// (ie. because there are not enough instances) then it returns the full ring. -func (r *Ring) Subring(key uint32, n int) ReadRing { - r.mtx.RLock() - defer r.mtx.RUnlock() - - if r.ringDesc == nil || len(r.ringTokens) == 0 || n <= 0 { - return r - } - - var ( - ingesters = make(map[string]IngesterDesc, n) - distinctHosts = map[string]struct{}{} - start = searchToken(r.ringTokens, key) - iterations = 0 - ) - - // Subring exceeds number of ingesters, set to total ring size - if n > len(r.ringDesc.Ingesters) { - n = len(r.ringDesc.Ingesters) - } - - for i := start; len(distinctHosts) < n && iterations < len(r.ringTokens); i++ { - iterations++ - // Wrap i around in the ring. - i %= len(r.ringTokens) - - // We want n *distinct* ingesters. - token := r.ringTokens[i] - if _, ok := distinctHosts[token.Ingester]; ok { - continue - } - distinctHosts[token.Ingester] = struct{}{} - ingester := r.ringDesc.Ingesters[token.Ingester] - - ingesters[token.Ingester] = ingester - } - - if n > len(ingesters) { - return r - } - - numTokens := 0 - for _, ing := range ingesters { - numTokens += len(ing.Tokens) - } - - sub := &Ring{ - cfg: r.cfg, - strategy: r.strategy, - ringDesc: &Desc{ - Ingesters: ingesters, - }, - ringTokens: make([]TokenDesc, 0, numTokens), - } - - // add tokens for the ingesters in the subring, they should already be sorted, so no need to re-sort - for _, t := range r.ringTokens { - if _, ok := ingesters[t.Ingester]; ok { - sub.ringTokens = append(sub.ringTokens, t) - } - } - - return sub -} - // ShuffleShard returns a subring for the provided identifier (eg. a tenant ID) // and size (number of instances). The size is expected to be a multiple of the // number of zones and the returned subring will contain the same number of @@ -507,15 +446,33 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { r.mtx.RLock() defer r.mtx.RUnlock() - // We expect the shard size to be divisible by the number of zones, in order to - // have nodes balanced across zones. If it's not, we do round up. - numInstancesPerZone := int(math.Ceil(float64(size) / float64(len(r.ringZones)))) + var numInstancesPerZone int + var actualZones []string + + if r.cfg.ZoneAwarenessEnabled { + // When zone-awareness is enabled, we expect the shard size to be divisible + // by the number of zones, in order to have nodes balanced across zones. + // If it's not, we do round up. + numInstancesPerZone = int(math.Ceil(float64(size) / float64(len(r.ringZones)))) + actualZones = r.ringZones + } else { + numInstancesPerZone = size + actualZones = []string{""} + } shard := make(map[string]IngesterDesc, size) // We need to iterate zones always in the same order to guarantee stability. - for _, zone := range r.ringZones { - tokens := r.ringTokensByZone[zone] + for _, zone := range actualZones { + var tokens []TokenDesc + + if r.cfg.ZoneAwarenessEnabled { + tokens = r.ringTokensByZone[zone] + } else { + // When zone-awareness is disabled, we just iterate over 1 single fake zone + // and use all tokens in the ring. + tokens = r.ringTokens + } // To select one more instance while guaranteeing the "consistency" property, // we do pick a random value from the generator and resolve uniqueness collisions diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index 1ef531fed4a..dc2bc2a4c6c 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -5,7 +5,6 @@ import ( "fmt" "math" "math/rand" - "sort" strconv "strconv" "testing" "time" @@ -124,16 +123,129 @@ func TestAddIngesterReplacesExistingTokens(t *testing.T) { require.Equal(t, newTokens, r.Ingesters[ing1Name].Tokens) } +func TestRing_Get_ZoneAwareness(t *testing.T) { + // Number of tests to run. + const testCount = 10000 + + tests := map[string]struct { + numIngesters int + numZones int + replicationFactor int + zoneAwarenessEnabled bool + expectedErr string + expectedIngesters int + }{ + "should succeed if there are enough ingesters per zone on RF = 3": { + numIngesters: 16, + numZones: 3, + replicationFactor: 3, + zoneAwarenessEnabled: true, + expectedIngesters: 3, + }, + "should fail if there are ingesters in 1 zone only on RF = 3": { + numIngesters: 16, + numZones: 1, + replicationFactor: 3, + zoneAwarenessEnabled: true, + expectedErr: "at least 2 live replicas required across different availability zones, could only find 1", + }, + "should succeed if there are ingesters in 2 zones on RF = 3": { + numIngesters: 16, + numZones: 2, + replicationFactor: 3, + zoneAwarenessEnabled: true, + expectedIngesters: 2, + }, + "should succeed if there are ingesters in 1 zone only on RF = 3 but zone-awareness is disabled": { + numIngesters: 16, + numZones: 1, + replicationFactor: 3, + zoneAwarenessEnabled: false, + expectedIngesters: 3, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + // Add ingesters to the ring. + r := NewDesc() + var prevTokens []uint32 + for i := 0; i < testData.numIngesters; i++ { + name := fmt.Sprintf("ing%v", i) + ingTokens := GenerateTokens(128, prevTokens) + + r.AddIngester(name, fmt.Sprintf("127.0.0.%d", i), fmt.Sprintf("zone-%v", i%testData.numZones), ingTokens, ACTIVE) + + prevTokens = append(prevTokens, ingTokens...) + } + + // Create a ring with the ingesters + ring := Ring{ + cfg: Config{ + HeartbeatTimeout: time.Hour, + ReplicationFactor: testData.replicationFactor, + ZoneAwarenessEnabled: testData.zoneAwarenessEnabled, + }, + ringDesc: r, + ringTokens: r.getTokens(), + ringTokensByZone: r.getTokensByZone(), + ringZones: getZones(r.getTokensByZone()), + strategy: &DefaultReplicationStrategy{}, + } + + ingesters := make([]IngesterDesc, 0, len(r.GetIngesters())) + for _, v := range r.GetIngesters() { + ingesters = append(ingesters, v) + } + + // Use the GenerateTokens to get an array of random uint32 values. + testValues := GenerateTokens(testCount, nil) + + var set ReplicationSet + var err error + for i := 0; i < testCount; i++ { + set, err = ring.Get(testValues[i], Write, ingesters) + if testData.expectedErr != "" { + require.EqualError(t, err, testData.expectedErr) + } else { + require.NoError(t, err) + } + + // Skip the rest of the assertions if we were expecting an error. + if testData.expectedErr != "" { + continue + } + + // Check that we have the expected number of ingesters for replication. + assert.Equal(t, testData.expectedIngesters, len(set.Ingesters)) + + // Ensure all ingesters are in a different zone (only if zone-awareness is enabled). + if testData.zoneAwarenessEnabled { + zones := make(map[string]struct{}) + for i := 0; i < len(set.Ingesters); i++ { + if _, ok := zones[set.Ingesters[i].Zone]; ok { + t.Fatal("found multiple ingesters in the same zone") + } + zones[set.Ingesters[i].Zone] = struct{}{} + } + } + } + }) + } +} + func TestRing_ShuffleShard(t *testing.T) { tests := map[string]struct { ringInstances map[string]IngesterDesc shardSize int + zoneAwarenessEnabled bool expectedSize int expectedDistribution []int }{ "empty ring": { ringInstances: nil, shardSize: 2, + zoneAwarenessEnabled: true, expectedSize: 0, expectedDistribution: []int{}, }, @@ -143,6 +255,7 @@ func TestRing_ShuffleShard(t *testing.T) { "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, }, shardSize: 3, + zoneAwarenessEnabled: true, expectedSize: 2, expectedDistribution: []int{2}, }, @@ -153,6 +266,7 @@ func TestRing_ShuffleShard(t *testing.T) { "instance-3": {Addr: "127.0.0.3", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, }, shardSize: 2, + zoneAwarenessEnabled: true, expectedSize: 2, expectedDistribution: []int{2}, }, @@ -163,6 +277,7 @@ func TestRing_ShuffleShard(t *testing.T) { "instance-3": {Addr: "127.0.0.3", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, }, shardSize: 2, + zoneAwarenessEnabled: true, expectedSize: 3, expectedDistribution: []int{1, 1, 1}, }, @@ -176,6 +291,7 @@ func TestRing_ShuffleShard(t *testing.T) { "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, }, shardSize: 3, + zoneAwarenessEnabled: true, expectedSize: 3, expectedDistribution: []int{1, 1, 1}, }, @@ -189,9 +305,23 @@ func TestRing_ShuffleShard(t *testing.T) { "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, }, shardSize: 4, + zoneAwarenessEnabled: true, expectedSize: 6, expectedDistribution: []int{2, 2, 2}, }, + "multiple zones, shard size NOT divisible by num zones, but zone awareness is disabled": { + ringInstances: map[string]IngesterDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + }, + shardSize: 4, + zoneAwarenessEnabled: false, + expectedSize: 4, + }, } for testName, testData := range tests { @@ -205,7 +335,10 @@ func TestRing_ShuffleShard(t *testing.T) { } ring := Ring{ - cfg: Config{HeartbeatTimeout: time.Hour}, + cfg: Config{ + HeartbeatTimeout: time.Hour, + ZoneAwarenessEnabled: testData.zoneAwarenessEnabled, + }, ringDesc: ringDesc, ringTokens: ringDesc.getTokens(), ringTokensByZone: ringDesc.getTokensByZone(), @@ -217,23 +350,25 @@ func TestRing_ShuffleShard(t *testing.T) { assert.Equal(t, testData.expectedSize, shardRing.IngesterCount()) // Compute the actual distribution of instances across zones. - var actualDistribution []int + if testData.zoneAwarenessEnabled { + var actualDistribution []int - if shardRing.IngesterCount() > 0 { - all, err := shardRing.GetAll(Read) - require.NoError(t, err) + if shardRing.IngesterCount() > 0 { + all, err := shardRing.GetAll(Read) + require.NoError(t, err) - countByZone := map[string]int{} - for _, instance := range all.Ingesters { - countByZone[instance.Zone]++ - } + countByZone := map[string]int{} + for _, instance := range all.Ingesters { + countByZone[instance.Zone]++ + } - for _, count := range countByZone { - actualDistribution = append(actualDistribution, count) + for _, count := range countByZone { + actualDistribution = append(actualDistribution, count) + } } - } - assert.ElementsMatch(t, testData.expectedDistribution, actualDistribution) + assert.ElementsMatch(t, testData.expectedDistribution, actualDistribution) + } }) } } @@ -251,7 +386,10 @@ func TestRing_ShuffleShard_Stability(t *testing.T) { // Initialise the ring. ringDesc := &Desc{Ingesters: generateRingInstances(numInstances, numZones)} ring := Ring{ - cfg: Config{HeartbeatTimeout: time.Hour}, + cfg: Config{ + HeartbeatTimeout: time.Hour, + ZoneAwarenessEnabled: true, + }, ringDesc: ringDesc, ringTokens: ringDesc.getTokens(), ringTokensByZone: ringDesc.getTokensByZone(), @@ -314,7 +452,10 @@ func TestRing_ShuffleShard_Shuffling(t *testing.T) { // Initialise the ring. ringDesc := &Desc{Ingesters: instances} ring := Ring{ - cfg: Config{HeartbeatTimeout: time.Hour}, + cfg: Config{ + HeartbeatTimeout: time.Hour, + ZoneAwarenessEnabled: true, + }, ringDesc: ringDesc, ringTokens: ringDesc.getTokens(), ringTokensByZone: ringDesc.getTokensByZone(), @@ -409,7 +550,10 @@ func TestRing_ShuffleShard_Consistency(t *testing.T) { // Initialise the ring. ringDesc := &Desc{Ingesters: generateRingInstances(s.numInstances, s.numZones)} ring := Ring{ - cfg: Config{HeartbeatTimeout: time.Hour}, + cfg: Config{ + HeartbeatTimeout: time.Hour, + ZoneAwarenessEnabled: true, + }, ringDesc: ringDesc, ringTokens: ringDesc.getTokens(), ringTokensByZone: ringDesc.getTokensByZone(), @@ -465,7 +609,10 @@ func BenchmarkRing_ShuffleShard(b *testing.B) { // Initialise the ring. ringDesc := &Desc{Ingesters: generateRingInstances(numInstances, numZones)} ring := Ring{ - cfg: Config{HeartbeatTimeout: time.Hour}, + cfg: Config{ + HeartbeatTimeout: time.Hour, + ZoneAwarenessEnabled: true, + }, ringDesc: ringDesc, ringTokens: ringDesc.getTokens(), ringTokensByZone: ringDesc.getTokensByZone(), @@ -484,229 +631,6 @@ func BenchmarkRing_ShuffleShard(b *testing.B) { } } -func TestSubring(t *testing.T) { - r := NewDesc() - - n := 16 // number of ingesters in ring - var prevTokens []uint32 - for i := 0; i < n; i++ { - name := fmt.Sprintf("ing%v", i) - ingTokens := GenerateTokens(128, prevTokens) - - r.AddIngester(name, fmt.Sprintf("addr%v", i), strconv.Itoa(i), ingTokens, ACTIVE) - - prevTokens = append(prevTokens, ingTokens...) - } - - // Create a ring with the ingesters - ring := Ring{ - cfg: Config{ - HeartbeatTimeout: time.Hour, - }, - ringDesc: r, - ringTokens: r.getTokens(), - ringTokensByZone: r.getTokensByZone(), - ringZones: getZones(r.getTokensByZone()), - strategy: &DefaultReplicationStrategy{}, - } - - // Generate a sub ring for all possible valid ranges - for i := 1; i < n+2; i++ { - subr := ring.Subring(rand.Uint32(), i) - subringSize := i - if i > n { - subringSize = n - } - require.Equal(t, subringSize, len(subr.(*Ring).ringDesc.Ingesters)) - require.Equal(t, subringSize*128, len(subr.(*Ring).ringTokens)) - require.True(t, sort.SliceIsSorted(subr.(*Ring).ringTokens, func(i, j int) bool { - return subr.(*Ring).ringTokens[i].Token < subr.(*Ring).ringTokens[j].Token - })) - - // Obtain a replication slice - size := i - 1 - if size <= 0 { - size = 1 - } - subr.(*Ring).cfg.ReplicationFactor = size - set, err := subr.Get(rand.Uint32(), Write, nil) - require.NoError(t, err) - require.Equal(t, size, len(set.Ingesters)) - } -} - -func TestStableSubring(t *testing.T) { - r := NewDesc() - - n := 16 // number of ingesters in ring - var prevTokens []uint32 - for i := 0; i < n; i++ { - name := fmt.Sprintf("ing%v", i) - ingTokens := GenerateTokens(128, prevTokens) - - r.AddIngester(name, fmt.Sprintf("addr%v", i), strconv.Itoa(i), ingTokens, ACTIVE) - - prevTokens = append(prevTokens, ingTokens...) - } - - // Create a ring with the ingesters - ring := Ring{ - cfg: Config{ - HeartbeatTimeout: time.Hour, - }, - ringDesc: r, - ringTokens: r.getTokens(), - ringTokensByZone: r.getTokensByZone(), - ringZones: getZones(r.getTokensByZone()), - strategy: &DefaultReplicationStrategy{}, - } - - // Generate the same subring multiple times - var subrings [][]TokenDesc - key := rand.Uint32() - subringsize := 4 - for i := 1; i < 4; i++ { - subr := ring.Subring(key, subringsize) - require.Equal(t, subringsize, len(subr.(*Ring).ringDesc.Ingesters)) - require.Equal(t, subringsize*128, len(subr.(*Ring).ringTokens)) - require.True(t, sort.SliceIsSorted(subr.(*Ring).ringTokens, func(i, j int) bool { - return subr.(*Ring).ringTokens[i].Token < subr.(*Ring).ringTokens[j].Token - })) - - subrings = append(subrings, subr.(*Ring).ringTokens) - } - - // Validate that the same subring is produced each time from the same ring - for i := 0; i < len(subrings); i++ { - next := i + 1 - if next >= len(subrings) { - next = 0 - } - require.Equal(t, subrings[i], subrings[next]) - } -} - -func TestZoneAwareIngesterAssignmentSucccess(t *testing.T) { - - // runs a series of Get calls on the ring to ensure Ingesters' zone values are taken into - // consideration when assigning a set for a given token. - - r := NewDesc() - - n := 16 // number of ingesters in ring - z := 3 // number of availability zones. - - testCount := 1000000 // number of key tests to run. - - var prevTokens []uint32 - for i := 0; i < n; i++ { - name := fmt.Sprintf("ing%v", i) - ingTokens := GenerateTokens(128, prevTokens) - - r.AddIngester(name, fmt.Sprintf("addr%v", i), fmt.Sprintf("zone-%v", i%z), ingTokens, ACTIVE) - - prevTokens = append(prevTokens, ingTokens...) - } - - // Create a ring with the ingesters - ring := Ring{ - cfg: Config{ - HeartbeatTimeout: time.Hour, - ReplicationFactor: 3, - }, - ringDesc: r, - ringTokens: r.getTokens(), - ringTokensByZone: r.getTokensByZone(), - ringZones: getZones(r.getTokensByZone()), - strategy: &DefaultReplicationStrategy{}, - } - // use the GenerateTokens to get an array of random uint32 values - testValues := make([]uint32, testCount) - testValues = GenerateTokens(testCount, testValues) - ing := r.GetIngesters() - ingesters := make([]IngesterDesc, 0, len(ing)) - for _, v := range ing { - ingesters = append(ingesters, v) - } - var set ReplicationSet - var e error - for i := 0; i < testCount; i++ { - set, e = ring.Get(testValues[i], Write, ingesters) - if e != nil { - t.Fail() - return - } - - // check that we have the expected number of ingesters for replication. - require.Equal(t, 3, len(set.Ingesters)) - - // ensure all ingesters are in a different zone. - zones := make(map[string]struct{}) - for i := 0; i < len(set.Ingesters); i++ { - if _, ok := zones[set.Ingesters[i].Zone]; ok { - t.Fail() - } - zones[set.Ingesters[i].Zone] = struct{}{} - } - } - -} - -func TestZoneAwareIngesterAssignmentFailure(t *testing.T) { - - // This test ensures that when there are not ingesters in enough distinct zones - // an error will occur when attempting to get a replication set for a token. - - r := NewDesc() - - n := 16 // number of ingesters in ring - z := 1 // number of availability zones. - - testCount := 10 // number of key tests to run. - - var prevTokens []uint32 - for i := 0; i < n; i++ { - name := fmt.Sprintf("ing%v", i) - ingTokens := GenerateTokens(128, prevTokens) - - r.AddIngester(name, fmt.Sprintf("addr%v", i), fmt.Sprintf("zone-%v", i%z), ingTokens, ACTIVE) - - prevTokens = append(prevTokens, ingTokens...) - } - - // Create a ring with the ingesters - ring := Ring{ - cfg: Config{ - HeartbeatTimeout: time.Hour, - ReplicationFactor: 3, - }, - ringDesc: r, - ringTokens: r.getTokens(), - ringTokensByZone: r.getTokensByZone(), - ringZones: getZones(r.getTokensByZone()), - strategy: &DefaultReplicationStrategy{}, - } - // use the GenerateTokens to get an array of random uint32 values - testValues := make([]uint32, testCount) - testValues = GenerateTokens(testCount, testValues) - ing := r.GetIngesters() - ingesters := make([]IngesterDesc, 0, len(ing)) - for _, v := range ing { - ingesters = append(ingesters, v) - } - - for i := 0; i < testCount; i++ { - // Since there is only 1 zone assigned, we are expecting an error here. - _, e := ring.Get(testValues[i], Write, ingesters) - if e != nil { - require.Equal(t, "at least 2 live replicas required, could only find 1", e.Error()) - continue - } - t.Fail() - } - -} - // generateTokensLinear returns tokens with a linear distribution. func generateTokensLinear(instanceID, numInstances, numTokens int) []uint32 { tokens := make([]uint32, 0, numTokens) diff --git a/pkg/storegateway/gateway_ring.go b/pkg/storegateway/gateway_ring.go index 3c890f138f7..1b5c89e606a 100644 --- a/pkg/storegateway/gateway_ring.go +++ b/pkg/storegateway/gateway_ring.go @@ -36,17 +36,19 @@ const ( // is used to strip down the config to the minimum, and avoid confusion // to the user. type RingConfig struct { - KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances. This option needs be set both on the store-gateway and querier when running in microservices mode."` - HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` - HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` - ReplicationFactor int `yaml:"replication_factor"` - TokensFilePath string `yaml:"tokens_file_path"` + KVStore kv.Config `yaml:"kvstore" doc:"description=The key-value store used to share the hash ring across multiple instances. This option needs be set both on the store-gateway and querier when running in microservices mode."` + HeartbeatPeriod time.Duration `yaml:"heartbeat_period"` + HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"` + ReplicationFactor int `yaml:"replication_factor"` + TokensFilePath string `yaml:"tokens_file_path"` + ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` // Instance details InstanceID string `yaml:"instance_id" doc:"hidden"` InstanceInterfaceNames []string `yaml:"instance_interface_names" doc:"hidden"` InstancePort int `yaml:"instance_port" doc:"hidden"` InstanceAddr string `yaml:"instance_addr" doc:"hidden"` + InstanceZone string `yaml:"instance_availability_zone"` // Injected internally ListenPort int `yaml:"-"` @@ -69,6 +71,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.HeartbeatTimeout, ringFlagsPrefix+"heartbeat-timeout", time.Minute, "The heartbeat timeout after which store gateways are considered unhealthy within the ring."+sharedOptionWithQuerier) f.IntVar(&cfg.ReplicationFactor, ringFlagsPrefix+"replication-factor", 3, "The replication factor to use when sharding blocks."+sharedOptionWithQuerier) f.StringVar(&cfg.TokensFilePath, ringFlagsPrefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.") + f.BoolVar(&cfg.ZoneAwarenessEnabled, ringFlagsPrefix+"zone-awareness-enabled", false, "True to enable zone-awareness and replicate blocks across different availability zones.") // Instance flags cfg.InstanceInterfaceNames = []string{"eth0", "en0"} @@ -76,6 +79,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.InstanceAddr, ringFlagsPrefix+"instance-addr", "", "IP address to advertise in the ring.") f.IntVar(&cfg.InstancePort, ringFlagsPrefix+"instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).") f.StringVar(&cfg.InstanceID, ringFlagsPrefix+"instance-id", hostname, "Instance ID to register in the ring.") + f.StringVar(&cfg.InstanceZone, ringFlagsPrefix+"instance-availability-zone", "", "The availability zone where this instance is running. Required if zone-awareness is enabled.") // Defaults for internal settings. cfg.RingCheckPeriod = 5 * time.Second @@ -88,6 +92,7 @@ func (cfg *RingConfig) ToRingConfig() ring.Config { rc.KVStore = cfg.KVStore rc.HeartbeatTimeout = cfg.HeartbeatTimeout rc.ReplicationFactor = cfg.ReplicationFactor + rc.ZoneAwarenessEnabled = cfg.ZoneAwarenessEnabled return rc } @@ -103,6 +108,7 @@ func (cfg *RingConfig) ToLifecyclerConfig() (ring.BasicLifecyclerConfig, error) return ring.BasicLifecyclerConfig{ ID: cfg.InstanceID, Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort), + Zone: cfg.InstanceZone, HeartbeatPeriod: cfg.HeartbeatPeriod, TokensObservePeriod: 0, NumTokens: RingNumTokens, diff --git a/pkg/storegateway/replication_strategy.go b/pkg/storegateway/replication_strategy.go index 25851081e7e..db978bdcd4f 100644 --- a/pkg/storegateway/replication_strategy.go +++ b/pkg/storegateway/replication_strategy.go @@ -9,7 +9,7 @@ import ( type BlocksReplicationStrategy struct{} -func (s *BlocksReplicationStrategy) Filter(instances []ring.IngesterDesc, op ring.Operation, replicationFactor int, heartbeatTimeout time.Duration) ([]ring.IngesterDesc, int, error) { +func (s *BlocksReplicationStrategy) Filter(instances []ring.IngesterDesc, op ring.Operation, _ int, heartbeatTimeout time.Duration, _ bool) ([]ring.IngesterDesc, int, error) { // Filter out unhealthy instances. for i := 0; i < len(instances); { if instances[i].IsHealthy(op, heartbeatTimeout) { diff --git a/pkg/storegateway/sharding_strategy_test.go b/pkg/storegateway/sharding_strategy_test.go index f17bfaaa499..effc7f5ebd1 100644 --- a/pkg/storegateway/sharding_strategy_test.go +++ b/pkg/storegateway/sharding_strategy_test.go @@ -35,9 +35,10 @@ func TestDefaultShardingStrategy(t *testing.T) { block4Hash := cortex_tsdb.HashBlockID(block4) tests := map[string]struct { - replicationFactor int - setupRing func(*ring.Desc) - expectedBlocks map[string][]ulid.ULID + replicationFactor int + zoneAwarenessEnabled bool + setupRing func(*ring.Desc) + expectedBlocks map[string][]ulid.ULID }{ "one ACTIVE instance in the ring with replication factor = 1": { replicationFactor: 1, @@ -94,6 +95,20 @@ func TestDefaultShardingStrategy(t *testing.T) { "127.0.0.3": {block4 /* replicated: */, block3}, }, }, + "multiple ACTIVE instances in the ring with replication factor = 2 and zone-awareness enabled": { + replicationFactor: 2, + zoneAwarenessEnabled: true, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "zone-a", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "zone-a", []uint32{block2Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-3", "127.0.0.3", "zone-b", []uint32{block4Hash + 1}, ring.ACTIVE) + }, + expectedBlocks: map[string][]ulid.ULID{ + "127.0.0.1": {block1, block3, block4}, + "127.0.0.2": {block2}, + "127.0.0.3": {block1, block2, block3, block4}, + }, + }, "one unhealthy instance in the ring with replication factor = 1": { replicationFactor: 1, setupRing: func(r *ring.Desc) { @@ -235,8 +250,9 @@ func TestDefaultShardingStrategy(t *testing.T) { })) cfg := ring.Config{ - ReplicationFactor: testData.replicationFactor, - HeartbeatTimeout: time.Minute, + ReplicationFactor: testData.replicationFactor, + HeartbeatTimeout: time.Minute, + ZoneAwarenessEnabled: testData.zoneAwarenessEnabled, } r, err := ring.NewWithStoreClientAndStrategy(cfg, "test", "test", store, &BlocksReplicationStrategy{})