diff --git a/CHANGELOG.md b/CHANGELOG.md index 85512f3cf8e..0f19505d41f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ * [CHANGE] Store Gateway: Rename `cortex_bucket_store_cached_postings_compression_time_seconds` to `cortex_bucket_store_cached_postings_compression_time_seconds_total`. #5431 * [CHANGE] Store Gateway: Rename `cortex_bucket_store_cached_series_fetch_duration_seconds` to `cortex_bucket_store_series_fetch_duration_seconds` and `cortex_bucket_store_cached_postings_fetch_duration_seconds` to `cortex_bucket_store_postings_fetch_duration_seconds`. Add new metric `cortex_bucket_store_chunks_fetch_duration_seconds`. #5448 * [CHANGE] Store Gateway: Remove `idle_timeout`, `max_conn_age`, `pool_size`, `min_idle_conns` fields for Redis index cache and caching bucket. #5448 +* [CHANGE] Store Gateway: Add flag `-store-gateway.sharding-ring.zone-stable-shuffle-sharding` to enable store gateway to use zone stable shuffle sharding. #5489 * [FEATURE] Store Gateway: Add `max_downloaded_bytes_per_request` to limit max bytes to download per store gateway request. * [FEATURE] Added 2 flags `-alertmanager.alertmanager-client.grpc-max-send-msg-size` and ` -alertmanager.alertmanager-client.grpc-max-recv-msg-size` to configure alert manager grpc client message size limits. #5338 * [FEATURE] Query Frontend: Add `cortex_rejected_queries_total` metric for throttled queries. #5356 diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 126089a1548..b28fabc08b0 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -54,6 +54,7 @@ The store-gateway supports two sharding strategies: - `default` - `shuffle-sharding` +- `zone-stable-shuffle-sharding` The **`default`** sharding strategy spreads the blocks of each tenant across all store-gateway instances. It's the easiest form of sharding supported, but doesn't provide any workload isolation between different tenants. @@ -61,6 +62,12 @@ The **`shuffle-sharding`** strategy spreads the blocks of a tenant across a subs The shuffle sharding strategy can be enabled via `-store-gateway.sharding-strategy=shuffle-sharding` and requires the `-store-gateway.tenant-shard-size` flag (or their respective YAML config options) to be set to the default shard size, which is the default number of store-gateway instances each tenant should be sharded to. The shard size can then be overridden on a per-tenant basis setting the `store_gateway_tenant_shard_size` in the limits overrides. +The **`zone-stable-shuffle-sharding`** strategy achieves the same as the **`shuffle-sharding`** strategy, but using a different sharding algorithm. The new sharding algorithm ensures that when zone awareness is enabled, when shard size increases or decreases by one, the replicas for any block should only change at most by one instance. This is important for querying store gateway because a block can be retried at most 3 times. + +Zone stable shuffle sharding can be enabled via `-store-gateway.sharding-ring.zone-stable-shuffle-sharding` CLI flag. + +It will become the default shuffle sharding strategy for store gateway in `v1.17.0` release and the previous shuffle sharding algorithm will be removed in `v1.18.0` release. + _Please check out the [shuffle sharding documentation](../guides/shuffle-sharding.md) for more information about how it works._ ### Auto-forget diff --git a/docs/blocks-storage/store-gateway.template b/docs/blocks-storage/store-gateway.template index b4cb6a1fe03..7e16e9584f3 100644 --- a/docs/blocks-storage/store-gateway.template +++ b/docs/blocks-storage/store-gateway.template @@ -54,6 +54,7 @@ The store-gateway supports two sharding strategies: - `default` - `shuffle-sharding` +- `zone-stable-shuffle-sharding` The **`default`** sharding strategy spreads the blocks of each tenant across all store-gateway instances. It's the easiest form of sharding supported, but doesn't provide any workload isolation between different tenants. @@ -61,6 +62,12 @@ The **`shuffle-sharding`** strategy spreads the blocks of a tenant across a subs The shuffle sharding strategy can be enabled via `-store-gateway.sharding-strategy=shuffle-sharding` and requires the `-store-gateway.tenant-shard-size` flag (or their respective YAML config options) to be set to the default shard size, which is the default number of store-gateway instances each tenant should be sharded to. The shard size can then be overridden on a per-tenant basis setting the `store_gateway_tenant_shard_size` in the limits overrides. +The **`zone-stable-shuffle-sharding`** strategy achieves the same as the **`shuffle-sharding`** strategy, but using a different sharding algorithm. The new sharding algorithm ensures that when zone awareness is enabled, when shard size increases or decreases by one, the replicas for any block should only change at most by one instance. This is important for querying store gateway because a block can be retried at most 3 times. + +Zone stable shuffle sharding can be enabled via `-store-gateway.sharding-ring.zone-stable-shuffle-sharding` CLI flag. + +It will become the default shuffle sharding strategy for store gateway in `v1.17.0` release and the previous shuffle sharding algorithm will be removed in `v1.18.0` release. + _Please check out the [shuffle sharding documentation](../guides/shuffle-sharding.md) for more information about how it works._ ### Auto-forget diff --git a/docs/configuration/v1-guarantees.md b/docs/configuration/v1-guarantees.md index 1384357c26a..1a125ac7832 100644 --- a/docs/configuration/v1-guarantees.md +++ b/docs/configuration/v1-guarantees.md @@ -105,3 +105,6 @@ Currently experimental features are: - `-blocks-storage.tsdb.out-of-order-cap-max` (int) CLI flag - `-ingester.out-of-order-time-window` (duration) CLI flag - `out_of_order_time_window` (duration) field in runtime config file +- Store Gateway Zone Stable Shuffle Sharding + - `-store-gateway.sharding-ring.zone-stable-shuffle-sharding` CLI flag + - `zone_stable_shuffle_sharding` (boolean) field in config file diff --git a/pkg/compactor/shuffle_sharding_grouper_test.go b/pkg/compactor/shuffle_sharding_grouper_test.go index c680e88a73f..e237a43a921 100644 --- a/pkg/compactor/shuffle_sharding_grouper_test.go +++ b/pkg/compactor/shuffle_sharding_grouper_test.go @@ -784,6 +784,11 @@ func (r *RingMock) ShuffleShard(identifier string, size int) ring.ReadRing { return args.Get(0).(ring.ReadRing) } +func (r *RingMock) ShuffleShardWithZoneStability(identifier string, size int) ring.ReadRing { + args := r.Called(identifier, size) + return args.Get(0).(ring.ReadRing) +} + func (r *RingMock) GetInstanceState(instanceID string) (ring.InstanceState, error) { args := r.Called(instanceID) return args.Get(0).(ring.InstanceState), args.Error(1) diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index fc194932f2e..c78d8952362 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -228,7 +228,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa return nil, errors.Wrap(err, "failed to create store-gateway ring client") } - stores, err = newBlocksStoreReplicationSet(storesRing, gatewayCfg.ShardingStrategy, randomLoadBalancing, limits, querierCfg.StoreGatewayClient, logger, reg, storesRingCfg.ZoneAwarenessEnabled) + stores, err = newBlocksStoreReplicationSet(storesRing, gatewayCfg.ShardingStrategy, randomLoadBalancing, limits, querierCfg.StoreGatewayClient, logger, reg, storesRingCfg.ZoneAwarenessEnabled, gatewayCfg.ShardingRing.ZoneStableShuffleSharding) if err != nil { return nil, errors.Wrap(err, "failed to create store set") } diff --git a/pkg/querier/blocks_store_replicated_set.go b/pkg/querier/blocks_store_replicated_set.go index cb2bdd3863e..adda8ece6d1 100644 --- a/pkg/querier/blocks_store_replicated_set.go +++ b/pkg/querier/blocks_store_replicated_set.go @@ -37,7 +37,8 @@ type blocksStoreReplicationSet struct { balancingStrategy loadBalancingStrategy limits BlocksStoreLimits - zoneAwarenessEnabled bool + zoneAwarenessEnabled bool + zoneStableShuffleSharding bool // Subservices manager. subservices *services.Manager @@ -53,14 +54,17 @@ func newBlocksStoreReplicationSet( logger log.Logger, reg prometheus.Registerer, zoneAwarenessEnabled bool, + zoneStableShuffleSharding bool, ) (*blocksStoreReplicationSet, error) { s := &blocksStoreReplicationSet{ - storesRing: storesRing, - clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg), - shardingStrategy: shardingStrategy, - balancingStrategy: balancingStrategy, - limits: limits, - zoneAwarenessEnabled: zoneAwarenessEnabled, + storesRing: storesRing, + clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), clientConfig, logger, reg), + shardingStrategy: shardingStrategy, + balancingStrategy: balancingStrategy, + limits: limits, + + zoneAwarenessEnabled: zoneAwarenessEnabled, + zoneStableShuffleSharding: zoneStableShuffleSharding, } var err error @@ -106,7 +110,7 @@ func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid // otherwise we just use the full ring. var userRing ring.ReadRing if s.shardingStrategy == util.ShardingStrategyShuffle { - userRing = storegateway.GetShuffleShardingSubring(s.storesRing, userID, s.limits) + userRing = storegateway.GetShuffleShardingSubring(s.storesRing, userID, s.limits, s.zoneStableShuffleSharding) } else { userRing = s.storesRing } diff --git a/pkg/querier/blocks_store_replicated_set_test.go b/pkg/querier/blocks_store_replicated_set_test.go index 8d780694e35..40637a07d4d 100644 --- a/pkg/querier/blocks_store_replicated_set_test.go +++ b/pkg/querier/blocks_store_replicated_set_test.go @@ -585,7 +585,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { } reg := prometheus.NewPedanticRegistry() - s, err := newBlocksStoreReplicationSet(r, testData.shardingStrategy, noLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, testData.zoneAwarenessEnabled) + s, err := newBlocksStoreReplicationSet(r, testData.shardingStrategy, noLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, testData.zoneAwarenessEnabled, true) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, s)) defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck @@ -647,7 +647,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor_ShouldSupportRandomLoadBalancin limits := &blocksStoreLimitsMock{} reg := prometheus.NewPedanticRegistry() - s, err := newBlocksStoreReplicationSet(r, util.ShardingStrategyDefault, randomLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, false) + s, err := newBlocksStoreReplicationSet(r, util.ShardingStrategyDefault, randomLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, false, false) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, s)) defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck @@ -716,7 +716,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor_ZoneAwareness(t *testing.T) { limits := &blocksStoreLimitsMock{} reg := prometheus.NewPedanticRegistry() - s, err := newBlocksStoreReplicationSet(r, util.ShardingStrategyDefault, randomLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, true) + s, err := newBlocksStoreReplicationSet(r, util.ShardingStrategyDefault, randomLoadBalancing, limits, ClientConfig{}, log.NewNopLogger(), reg, true, false) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, s)) defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index 0978a9308b4..b231014ed78 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -68,6 +68,12 @@ type ReadRing interface { // and size (number of instances). ShuffleShard(identifier string, size int) ReadRing + // ShuffleShardWithZoneStability does the same as ShuffleShard but using a different shuffle sharding algorithm. + // It doesn't round up shard size to be divisible to number of zones and make sure when scaling up/down one + // shard size at a time, at most 1 instance can be changed. + // It is only used in Store Gateway for now. + ShuffleShardWithZoneStability(identifier string, size int) ReadRing + // GetInstanceState returns the current state of an instance or an error if the // instance does not exist in the ring. GetInstanceState(instanceID string) (InstanceState, error) @@ -200,6 +206,8 @@ type Ring struct { type subringCacheKey struct { identifier string shardSize int + + zoneStableSharding bool } // New creates a new Ring. Being a service, Ring needs to be started to do anything. @@ -659,24 +667,16 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) { // - Stability: given the same ring, two invocations returns the same result. // // - Consistency: adding/removing 1 instance from the ring generates a resulting -// subring with no more then 1 difference. +// subring with no more than 1 difference. // // - Shuffling: probabilistically, for a large enough cluster each identifier gets a different // set of instances, with a reduced number of overlapping instances between two identifiers. func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { - // Nothing to do if the shard size is not smaller then the actual ring. - if size <= 0 || r.InstancesCount() <= size { - return r - } - - if cached := r.getCachedShuffledSubring(identifier, size); cached != nil { - return cached - } - - result := r.shuffleShard(identifier, size, 0, time.Now()) + return r.shuffleShardWithCache(identifier, size, false) +} - r.setCachedShuffledSubring(identifier, size, result) - return result +func (r *Ring) ShuffleShardWithZoneStability(identifier string, size int) ReadRing { + return r.shuffleShardWithCache(identifier, size, true) } // ShuffleShardWithLookback is like ShuffleShard() but the returned subring includes all instances @@ -687,25 +687,49 @@ func (r *Ring) ShuffleShard(identifier string, size int) ReadRing { // // This function doesn't support caching. func (r *Ring) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ReadRing { - // Nothing to do if the shard size is not smaller then the actual ring. + // Nothing to do if the shard size is not smaller than the actual ring. if size <= 0 || r.InstancesCount() <= size { return r } - return r.shuffleShard(identifier, size, lookbackPeriod, now) + return r.shuffleShard(identifier, size, lookbackPeriod, now, false) } -func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time) *Ring { +func (r *Ring) shuffleShardWithCache(identifier string, size int, zoneStableSharding bool) ReadRing { + // Nothing to do if the shard size is not smaller than the actual ring. + if size <= 0 || r.InstancesCount() <= size { + return r + } + + if cached := r.getCachedShuffledSubring(identifier, size, zoneStableSharding); cached != nil { + return cached + } + + result := r.shuffleShard(identifier, size, 0, time.Now(), zoneStableSharding) + + r.setCachedShuffledSubring(identifier, size, zoneStableSharding, result) + return result +} + +func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Duration, now time.Time, zoneStableSharding bool) *Ring { lookbackUntil := now.Add(-lookbackPeriod).Unix() r.mtx.RLock() defer r.mtx.RUnlock() - var numInstancesPerZone int - var actualZones []string + var ( + numInstancesPerZone int + actualZones []string + zonesWithExtraInstance int + ) if r.cfg.ZoneAwarenessEnabled { - numInstancesPerZone = shardUtil.ShuffleShardExpectedInstancesPerZone(size, len(r.ringZones)) + if zoneStableSharding { + numInstancesPerZone = size / len(r.ringZones) + zonesWithExtraInstance = size % len(r.ringZones) + } else { + numInstancesPerZone = shardUtil.ShuffleShardExpectedInstancesPerZone(size, len(r.ringZones)) + } actualZones = r.ringZones } else { numInstancesPerZone = size @@ -735,7 +759,12 @@ func (r *Ring) shuffleShard(identifier string, size int, lookbackPeriod time.Dur // To select one more instance while guaranteeing the "consistency" property, // we do pick a random value from the generator and resolve uniqueness collisions // (if any) continuing walking the ring. - for i := 0; i < numInstancesPerZone; i++ { + finalInstancesPerZone := numInstancesPerZone + if zonesWithExtraInstance > 0 { + zonesWithExtraInstance-- + finalInstancesPerZone++ + } + for i := 0; i < finalInstancesPerZone; i++ { start := searchToken(tokens, random.Uint32()) iterations := 0 found := false @@ -828,7 +857,7 @@ func (r *Ring) HasInstance(instanceID string) bool { return ok } -func (r *Ring) getCachedShuffledSubring(identifier string, size int) *Ring { +func (r *Ring) getCachedShuffledSubring(identifier string, size int, zoneStableSharding bool) *Ring { if r.cfg.SubringCacheDisabled { return nil } @@ -837,7 +866,7 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int) *Ring { defer r.mtx.RUnlock() // if shuffledSubringCache map is nil, reading it returns default value (nil pointer). - cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size}] + cached := r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}] if cached == nil { return nil } @@ -856,7 +885,7 @@ func (r *Ring) getCachedShuffledSubring(identifier string, size int) *Ring { return cached } -func (r *Ring) setCachedShuffledSubring(identifier string, size int, subring *Ring) { +func (r *Ring) setCachedShuffledSubring(identifier string, size int, zoneStableSharding bool, subring *Ring) { if subring == nil || r.cfg.SubringCacheDisabled { return } @@ -868,7 +897,7 @@ func (r *Ring) setCachedShuffledSubring(identifier string, size int, subring *Ri // (which can happen between releasing the read lock and getting read-write lock). // Note that shuffledSubringCache can be only nil when set by test. if r.shuffledSubringCache != nil && r.lastTopologyChange.Equal(subring.lastTopologyChange) { - r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size}] = subring + r.shuffledSubringCache[subringCacheKey{identifier: identifier, shardSize: size, zoneStableSharding: zoneStableSharding}] = subring } } diff --git a/pkg/ring/ring_test.go b/pkg/ring/ring_test.go index 8626b19fcbc..e6ab16acb92 100644 --- a/pkg/ring/ring_test.go +++ b/pkg/ring/ring_test.go @@ -1424,6 +1424,7 @@ func TestRing_ShuffleShard(t *testing.T) { tests := map[string]struct { ringInstances map[string]InstanceDesc shardSize int + zoneStability bool zoneAwarenessEnabled bool expectedSize int expectedDistribution []int @@ -1508,6 +1509,51 @@ func TestRing_ShuffleShard(t *testing.T) { zoneAwarenessEnabled: false, expectedSize: 4, }, + "multiple zones, shard size NOT divisible by num zones with zone stability enabled, shard size = 4": { + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + }, + shardSize: 4, + zoneAwarenessEnabled: true, + zoneStability: true, + expectedSize: 4, + expectedDistribution: []int{2, 1, 1}, + }, + "multiple zones, shard size NOT divisible by num zones with zone stability enabled, shard size = 5": { + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + }, + shardSize: 5, + zoneAwarenessEnabled: true, + zoneStability: true, + expectedSize: 5, + expectedDistribution: []int{2, 2, 1}, + }, + "multiple zones, shard size divisible by num zones with zone stability enabled, equal distribution over zones": { + ringInstances: map[string]InstanceDesc{ + "instance-1": {Addr: "127.0.0.1", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-2": {Addr: "127.0.0.2", Zone: "zone-a", Tokens: GenerateTokens(128, nil)}, + "instance-3": {Addr: "127.0.0.3", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-4": {Addr: "127.0.0.4", Zone: "zone-b", Tokens: GenerateTokens(128, nil)}, + "instance-5": {Addr: "127.0.0.5", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + "instance-6": {Addr: "127.0.0.6", Zone: "zone-c", Tokens: GenerateTokens(128, nil)}, + }, + shardSize: 6, + zoneAwarenessEnabled: true, + zoneStability: true, + expectedSize: 6, + expectedDistribution: []int{2, 2, 2}, + }, } for testName, testData := range tests { @@ -1534,7 +1580,12 @@ func TestRing_ShuffleShard(t *testing.T) { KVClient: &MockClient{}, } - shardRing := ring.ShuffleShard("tenant-id", testData.shardSize) + var shardRing ReadRing + if testData.zoneStability { + shardRing = ring.ShuffleShardWithZoneStability("tenant-id", testData.shardSize) + } else { + shardRing = ring.ShuffleShard("tenant-id", testData.shardSize) + } assert.Equal(t, testData.expectedSize, shardRing.InstancesCount()) // Compute the actual distribution of instances across zones. @@ -1875,6 +1926,71 @@ func TestRing_ShuffleShard_ConsistencyOnShardSizeChanged(t *testing.T) { } } +// Make sure consistency when scaling shard size up and down with step 1 at a time. +// Previous shuffle sharding mechanism always changes shard size by number of zones +// so minimum step size will be > 1 if we have multiple zones. +func TestRing_ShuffleShardWithZoneStability_ConsistencyOnShardSizeChanged(t *testing.T) { + // Create 300 instances in 3 zones. + ringInstances := map[string]InstanceDesc{} + for i := 0; i < 300; i++ { + name, desc := generateRingInstance(i, i%3, 128) + ringInstances[name] = desc + } + + // Init the ring. + ringDesc := &Desc{Ingesters: ringInstances} + ring := Ring{ + cfg: Config{ + HeartbeatTimeout: time.Hour, + ZoneAwarenessEnabled: true, + }, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(), + KVClient: &MockClient{}, + } + + tenant := "tenant-id" + rs := make([]ReplicationSet, 150-3+1) + var prevRs *ReplicationSet + // Scale up 1 replica a time. + for shardSize := 3; shardSize <= 150; shardSize++ { + r := ring.ShuffleShardWithZoneStability(tenant, shardSize) + assert.Equal(t, shardSize, r.InstancesCount()) + s, err := r.GetAllHealthy(Read) + require.NoError(t, err) + if prevRs != nil { + // Make sure all prev replication set instances are included. + for _, ins := range prevRs.Instances { + require.True(t, s.Includes(ins.Addr)) + } + } + rs[shardSize-3] = s + prevRs = &s + } + // Scale down 1 replica a time. + for shardSize := 149; shardSize >= 3; shardSize-- { + r := ring.ShuffleShardWithZoneStability(tenant, shardSize) + assert.Equal(t, shardSize, r.InstancesCount()) + s, err := r.GetAllHealthy(Read) + require.NoError(t, err) + // Make sure all instances of current replica set is included + // in the previous replica set. + for _, ins := range s.Instances { + require.True(t, prevRs.Includes(ins.Addr)) + } + // Make sure when scaling down, instances in the ring is always the same. + require.Equal(t, len(s.Instances), len(rs[shardSize-3].Instances)) + for _, ins := range s.Instances { + require.True(t, rs[shardSize-3].Includes(ins.Addr)) + } + prevRs = &s + } +} + func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) { // Create 20 instances in 2 zones. ringInstances := map[string]InstanceDesc{} @@ -1952,6 +2068,90 @@ func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) { } } +func TestRing_ShuffleShardWithZoneStability_ConsistencyOnZonesChanged(t *testing.T) { + // Create 20 instances in 2 zones. + ringInstances := map[string]InstanceDesc{} + for i := 0; i < 20; i++ { + name, desc := generateRingInstance(i, i%2, 128) + ringInstances[name] = desc + } + + // Init the ring. + ringDesc := &Desc{Ingesters: ringInstances} + ring := Ring{ + cfg: Config{ + HeartbeatTimeout: time.Hour, + ZoneAwarenessEnabled: true, + }, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(), + KVClient: &MockClient{}, + } + + // Get the replication set with shard size = 2. + firstShard := ring.ShuffleShardWithZoneStability("tenant-id", 2) + assert.Equal(t, 2, firstShard.InstancesCount()) + + firstSet, err := firstShard.GetAllHealthy(Read) + require.NoError(t, err) + + // Increase shard size to 3. + secondShard := ring.ShuffleShardWithZoneStability("tenant-id", 3) + assert.Equal(t, 3, secondShard.InstancesCount()) + + secondSet, err := secondShard.GetAllHealthy(Read) + require.NoError(t, err) + + for _, firstInstance := range firstSet.Instances { + assert.True(t, secondSet.Includes(firstInstance.Addr), "new replication set is expected to include previous instance %s", firstInstance.Addr) + } + + // Increase shard size to 5. + thirdShard := ring.ShuffleShardWithZoneStability("tenant-id", 5) + assert.Equal(t, 5, thirdShard.InstancesCount()) + + thirdSet, err := thirdShard.GetAllHealthy(Read) + require.NoError(t, err) + + // Scale up cluster, adding 10 instances in 1 new zone. + for i := 20; i < 30; i++ { + name, desc := generateRingInstance(i, 2, 128) + ringInstances[name] = desc + } + + ring.ringDesc.Ingesters = ringInstances + ring.ringTokens = ringDesc.GetTokens() + ring.ringTokensByZone = ringDesc.getTokensByZone() + ring.ringInstanceByToken = ringDesc.getTokensInfo() + ring.ringZones = getZones(ringDesc.getTokensByZone()) + + // Increase shard size to 7. + fourthShard := ring.ShuffleShardWithZoneStability("tenant-id", 7) + assert.Equal(t, 7, fourthShard.InstancesCount()) + + fourthSet, err := fourthShard.GetAllHealthy(Read) + require.NoError(t, err) + + for _, thirdInstance := range thirdSet.Instances { + assert.True(t, fourthSet.Includes(thirdInstance.Addr), "new replication set is expected to include previous instance %s", thirdInstance.Addr) + } + + // Increase shard size to 10. + fifthShard := ring.ShuffleShardWithZoneStability("tenant-id", 10) + assert.Equal(t, 10, fifthShard.InstancesCount()) + + fifthSet, err := fifthShard.GetAllHealthy(Read) + require.NoError(t, err) + + for _, fourthInstance := range fourthSet.Instances { + assert.True(t, fifthSet.Includes(fourthInstance.Addr), "new replication set is expected to include previous instance %s", fourthInstance.Addr) + } +} + func TestRing_ShuffleShardWithLookback(t *testing.T) { type eventType int @@ -2200,130 +2400,132 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) { for _, numInstances := range numInitialInstances { for _, numZones := range numInitialZones { - testName := fmt.Sprintf("num instances = %d, num zones = %d", numInstances, numZones) - - t.Run(testName, func(t *testing.T) { - // Randomise the seed but log it in case we need to reproduce the test on failure. - seed := time.Now().UnixNano() - rand.Seed(seed) - t.Log("random generator seed:", seed) + for _, enableStableSharding := range []bool{false, true} { + testName := fmt.Sprintf("num instances = %d, num zones = %d, stable sharding = %s", numInstances, numZones, strconv.FormatBool(enableStableSharding)) + + t.Run(testName, func(t *testing.T) { + // Randomise the seed but log it in case we need to reproduce the test on failure. + seed := time.Now().UnixNano() + rand.Seed(seed) + t.Log("random generator seed:", seed) + + // Initialise the ring. + ringDesc := &Desc{Ingesters: generateRingInstances(numInstances, numZones, 128)} + ring := Ring{ + cfg: Config{ + HeartbeatTimeout: time.Hour, + ZoneAwarenessEnabled: true, + ReplicationFactor: 3, + }, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + strategy: NewDefaultReplicationStrategy(), + KVClient: &MockClient{}, + } - // Initialise the ring. - ringDesc := &Desc{Ingesters: generateRingInstances(numInstances, numZones, 128)} - ring := Ring{ - cfg: Config{ - HeartbeatTimeout: time.Hour, - ZoneAwarenessEnabled: true, - ReplicationFactor: 3, - }, - ringDesc: ringDesc, - ringTokens: ringDesc.GetTokens(), - ringTokensByZone: ringDesc.getTokensByZone(), - ringInstanceByToken: ringDesc.getTokensInfo(), - ringZones: getZones(ringDesc.getTokensByZone()), - strategy: NewDefaultReplicationStrategy(), - KVClient: &MockClient{}, - } + // The simulation starts with the minimum shard size. Random events can later increase it. + shardSize := numZones - // The simulation starts with the minimum shard size. Random events can later increase it. - shardSize := numZones + // The simulation assumes the initial ring contains instances registered + // since more than the lookback period. + currTime := time.Now().Add(lookbackPeriod).Add(time.Minute) - // The simulation assumes the initial ring contains instances registered - // since more than the lookback period. - currTime := time.Now().Add(lookbackPeriod).Add(time.Minute) + // Add the initial shard to the history. + rs, err := ring.shuffleShard(userID, shardSize, 0, time.Now(), enableStableSharding).GetReplicationSetForOperation(Read) + require.NoError(t, err) - // Add the initial shard to the history. - rs, err := ring.shuffleShard(userID, shardSize, 0, time.Now()).GetReplicationSetForOperation(Read) - require.NoError(t, err) + history := map[time.Time]ReplicationSet{ + currTime: rs, + } - history := map[time.Time]ReplicationSet{ - currTime: rs, - } + // Simulate a progression of random events over the time and, at each iteration of the simuation, + // make sure the subring includes all non-removed instances picked from previous versions of the + // ring up until the lookback period. + nextInstanceID := len(ringDesc.Ingesters) + 1 + + for i := 1; i <= numEvents; i++ { + currTime = currTime.Add(delayBetweenEvents) + + switch r := rand.Intn(100); { + case r < 80: + // Scale up instances by 1. + instanceID := fmt.Sprintf("instance-%d", nextInstanceID) + zoneID := fmt.Sprintf("zone-%d", nextInstanceID%numZones) + nextInstanceID++ + + ringDesc.Ingesters[instanceID] = generateRingInstanceWithInfo(instanceID, zoneID, GenerateTokens(128, nil), currTime) + + ring.ringTokens = ringDesc.GetTokens() + ring.ringTokensByZone = ringDesc.getTokensByZone() + ring.ringInstanceByToken = ringDesc.getTokensInfo() + ring.ringZones = getZones(ringDesc.getTokensByZone()) + case r < 90: + // Scale down instances by 1. To make tests reproducible we get the instance IDs, sort them + // and then get a random index (using the random generator initialized with a constant seed). + instanceIDs := make([]string, 0, len(ringDesc.Ingesters)) + for id := range ringDesc.Ingesters { + instanceIDs = append(instanceIDs, id) + } - // Simulate a progression of random events over the time and, at each iteration of the simuation, - // make sure the subring includes all non-removed instances picked from previous versions of the - // ring up until the lookback period. - nextInstanceID := len(ringDesc.Ingesters) + 1 - - for i := 1; i <= numEvents; i++ { - currTime = currTime.Add(delayBetweenEvents) - - switch r := rand.Intn(100); { - case r < 80: - // Scale up instances by 1. - instanceID := fmt.Sprintf("instance-%d", nextInstanceID) - zoneID := fmt.Sprintf("zone-%d", nextInstanceID%numZones) - nextInstanceID++ - - ringDesc.Ingesters[instanceID] = generateRingInstanceWithInfo(instanceID, zoneID, GenerateTokens(128, nil), currTime) - - ring.ringTokens = ringDesc.GetTokens() - ring.ringTokensByZone = ringDesc.getTokensByZone() - ring.ringInstanceByToken = ringDesc.getTokensInfo() - ring.ringZones = getZones(ringDesc.getTokensByZone()) - case r < 90: - // Scale down instances by 1. To make tests reproducible we get the instance IDs, sort them - // and then get a random index (using the random generator initialized with a constant seed). - instanceIDs := make([]string, 0, len(ringDesc.Ingesters)) - for id := range ringDesc.Ingesters { - instanceIDs = append(instanceIDs, id) - } + sort.Strings(instanceIDs) - sort.Strings(instanceIDs) + idxToRemove := rand.Intn(len(instanceIDs)) + idToRemove := instanceIDs[idxToRemove] + delete(ringDesc.Ingesters, idToRemove) - idxToRemove := rand.Intn(len(instanceIDs)) - idToRemove := instanceIDs[idxToRemove] - delete(ringDesc.Ingesters, idToRemove) + ring.ringTokens = ringDesc.GetTokens() + ring.ringTokensByZone = ringDesc.getTokensByZone() + ring.ringInstanceByToken = ringDesc.getTokensInfo() + ring.ringZones = getZones(ringDesc.getTokensByZone()) - ring.ringTokens = ringDesc.GetTokens() - ring.ringTokensByZone = ringDesc.getTokensByZone() - ring.ringInstanceByToken = ringDesc.getTokensInfo() - ring.ringZones = getZones(ringDesc.getTokensByZone()) + // Remove the terminated instance from the history. + for ringTime, ringState := range history { + for idx, desc := range ringState.Instances { + // In this simulation instance ID == instance address. + if desc.Addr != idToRemove { + continue + } - // Remove the terminated instance from the history. - for ringTime, ringState := range history { - for idx, desc := range ringState.Instances { - // In this simulation instance ID == instance address. - if desc.Addr != idToRemove { - continue + ringState.Instances = append(ringState.Instances[:idx], ringState.Instances[idx+1:]...) + history[ringTime] = ringState + break } - - ringState.Instances = append(ringState.Instances[:idx], ringState.Instances[idx+1:]...) - history[ringTime] = ringState - break } + default: + // Scale up shard size (keeping the per-zone balance). + shardSize += numZones } - default: - // Scale up shard size (keeping the per-zone balance). - shardSize += numZones - } - // Add the current shard to the history. - rs, err = ring.shuffleShard(userID, shardSize, 0, time.Now()).GetReplicationSetForOperation(Read) - require.NoError(t, err) - history[currTime] = rs + // Add the current shard to the history. + rs, err = ring.shuffleShard(userID, shardSize, 0, time.Now(), enableStableSharding).GetReplicationSetForOperation(Read) + require.NoError(t, err) + history[currTime] = rs - // Ensure the shard with lookback includes all instances from previous states of the ring. - rsWithLookback, err := ring.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, currTime).GetReplicationSetForOperation(Read) - require.NoError(t, err) + // Ensure the shard with lookback includes all instances from previous states of the ring. + rsWithLookback, err := ring.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, currTime).GetReplicationSetForOperation(Read) + require.NoError(t, err) - for ringTime, ringState := range history { - if ringTime.Before(currTime.Add(-lookbackPeriod)) { - // This entry from the history is obsolete, we can remove it. - delete(history, ringTime) - continue - } + for ringTime, ringState := range history { + if ringTime.Before(currTime.Add(-lookbackPeriod)) { + // This entry from the history is obsolete, we can remove it. + delete(history, ringTime) + continue + } - for _, expectedAddr := range ringState.GetAddresses() { - if !rsWithLookback.Includes(expectedAddr) { - t.Fatalf( - "subring generated after event %d is expected to include instance %s from ring state at time %s but it's missing (actual instances are: %s)", - i, expectedAddr, ringTime.String(), strings.Join(rsWithLookback.GetAddresses(), ", ")) + for _, expectedAddr := range ringState.GetAddresses() { + if !rsWithLookback.Includes(expectedAddr) { + t.Fatalf( + "subring generated after event %d is expected to include instance %s from ring state at time %s but it's missing (actual instances are: %s)", + i, expectedAddr, ringTime.String(), strings.Join(rsWithLookback.GetAddresses(), ", ")) + } } } } - } - }) + }) + } } } } diff --git a/pkg/ring/util_test.go b/pkg/ring/util_test.go index 1bc8b573f42..4e35f9f7fe0 100644 --- a/pkg/ring/util_test.go +++ b/pkg/ring/util_test.go @@ -54,6 +54,11 @@ func (r *RingMock) ShuffleShard(identifier string, size int) ReadRing { return args.Get(0).(ReadRing) } +func (r *RingMock) ShuffleShardWithZoneStability(identifier string, size int) ReadRing { + args := r.Called(identifier, size) + return args.Get(0).(ReadRing) +} + func (r *RingMock) GetInstanceState(instanceID string) (InstanceState, error) { args := r.Called(instanceID) return args.Get(0).(InstanceState), args.Error(1) diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 106535ff2f9..cdf4930d8f9 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -174,7 +174,7 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf case util.ShardingStrategyDefault: shardingStrategy = NewDefaultShardingStrategy(g.ring, lifecyclerCfg.Addr, logger) case util.ShardingStrategyShuffle: - shardingStrategy = NewShuffleShardingStrategy(g.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, limits, logger) + shardingStrategy = NewShuffleShardingStrategy(g.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, limits, logger, g.gatewayCfg.ShardingRing.ZoneStableShuffleSharding) default: return nil, errInvalidShardingStrategy } diff --git a/pkg/storegateway/gateway_ring.go b/pkg/storegateway/gateway_ring.go index 8d685067253..987be838638 100644 --- a/pkg/storegateway/gateway_ring.go +++ b/pkg/storegateway/gateway_ring.go @@ -67,6 +67,7 @@ type RingConfig struct { TokensFilePath string `yaml:"tokens_file_path"` ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"` KeepInstanceInTheRingOnShutdown bool `yaml:"keep_instance_in_the_ring_on_shutdown"` + ZoneStableShuffleSharding bool `yaml:"zone_stable_shuffle_sharding" doc:"hidden"` // Wait ring stability. WaitStabilityMinDuration time.Duration `yaml:"wait_stability_min_duration"` @@ -102,6 +103,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.TokensFilePath, ringFlagsPrefix+"tokens-file-path", "", "File path where tokens are stored. If empty, tokens are not stored at shutdown and restored at startup.") f.BoolVar(&cfg.ZoneAwarenessEnabled, ringFlagsPrefix+"zone-awareness-enabled", false, "True to enable zone-awareness and replicate blocks across different availability zones.") f.BoolVar(&cfg.KeepInstanceInTheRingOnShutdown, ringFlagsPrefix+"keep-instance-in-the-ring-on-shutdown", false, "True to keep the store gateway instance in the ring when it shuts down. The instance will then be auto-forgotten from the ring after 10*heartbeat_timeout.") + f.BoolVar(&cfg.ZoneStableShuffleSharding, ringFlagsPrefix+"zone-stable-shuffle-sharding", false, "If true, use zone stable shuffle sharding algorithm. Otherwise, use the default shuffle sharding algorithm.") // Wait stability flags. f.DurationVar(&cfg.WaitStabilityMinDuration, ringFlagsPrefix+"wait-stability-min-duration", time.Minute, "Minimum time to wait for ring stability at startup. 0 to disable.") diff --git a/pkg/storegateway/sharding_strategy.go b/pkg/storegateway/sharding_strategy.go index b7a2852ea71..6c6493cfb14 100644 --- a/pkg/storegateway/sharding_strategy.go +++ b/pkg/storegateway/sharding_strategy.go @@ -87,16 +87,20 @@ type ShuffleShardingStrategy struct { instanceAddr string limits ShardingLimits logger log.Logger + + zoneStableShuffleSharding bool } // NewShuffleShardingStrategy makes a new ShuffleShardingStrategy. -func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger) *ShuffleShardingStrategy { +func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger, zoneStableShuffleSharding bool) *ShuffleShardingStrategy { return &ShuffleShardingStrategy{ r: r, instanceID: instanceID, instanceAddr: instanceAddr, limits: limits, logger: logger, + + zoneStableShuffleSharding: zoneStableShuffleSharding, } } @@ -105,7 +109,7 @@ func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []strin var filteredIDs []string for _, userID := range userIDs { - subRing := GetShuffleShardingSubring(s.r, userID, s.limits) + subRing := GetShuffleShardingSubring(s.r, userID, s.limits, s.zoneStableShuffleSharding) // Include the user only if it belongs to this store-gateway shard. if subRing.HasInstance(s.instanceID) { @@ -118,7 +122,7 @@ func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []strin // FilterBlocks implements ShardingStrategy. func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, loaded map[ulid.ULID]struct{}, synced block.GaugeVec) error { - subRing := GetShuffleShardingSubring(s.r, userID, s.limits) + subRing := GetShuffleShardingSubring(s.r, userID, s.limits, s.zoneStableShuffleSharding) filterBlocksByRingSharding(subRing, s.instanceAddr, metas, loaded, synced, s.logger) return nil } @@ -173,7 +177,7 @@ func filterBlocksByRingSharding(r ring.ReadRing, instanceAddr string, metas map[ // GetShuffleShardingSubring returns the subring to be used for a given user. This function // should be used both by store-gateway and querier in order to guarantee the same logic is used. -func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLimits) ring.ReadRing { +func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLimits, zoneStableShuffleSharding bool) ring.ReadRing { shardSize := util.DynamicShardSize(limits.StoreGatewayTenantShardSize(userID), ring.InstancesCount()) // A shard size of 0 means shuffle sharding is disabled for this specific user, @@ -182,6 +186,11 @@ func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLi return ring } + if zoneStableShuffleSharding { + // Zone stability is required for store gateway when shuffle shard, see + // https://github.com/cortexproject/cortex/issues/5467 for more details. + return ring.ShuffleShardWithZoneStability(userID, shardSize) + } return ring.ShuffleShard(userID, shardSize) } diff --git a/pkg/storegateway/sharding_strategy_test.go b/pkg/storegateway/sharding_strategy_test.go index 869f15350cc..1dfd54d41ec 100644 --- a/pkg/storegateway/sharding_strategy_test.go +++ b/pkg/storegateway/sharding_strategy_test.go @@ -2,6 +2,8 @@ package storegateway import ( "context" + "fmt" + "strconv" "testing" "time" @@ -595,71 +597,73 @@ func TestShuffleShardingStrategy(t *testing.T) { } for testName, testData := range tests { - testName := testName - testData := testData - - t.Run(testName, func(t *testing.T) { - t.Parallel() - - ctx := context.Background() - store, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) - t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + for _, zoneStableShuffleSharding := range []bool{false, true} { + testName := testName + testData := testData + + t.Run(fmt.Sprintf("%s %s", testName, strconv.FormatBool(zoneStableShuffleSharding)), func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + store, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + t.Cleanup(func() { assert.NoError(t, closer.Close()) }) + + // Initialize the ring state. + require.NoError(t, store.CAS(ctx, "test", func(in interface{}) (interface{}, bool, error) { + d := ring.NewDesc() + testData.setupRing(d) + return d, true, nil + })) + + cfg := ring.Config{ + ReplicationFactor: testData.replicationFactor, + HeartbeatTimeout: time.Minute, + SubringCacheDisabled: true, + } - // Initialize the ring state. - require.NoError(t, store.CAS(ctx, "test", func(in interface{}) (interface{}, bool, error) { - d := ring.NewDesc() - testData.setupRing(d) - return d, true, nil - })) + r, err := ring.NewWithStoreClientAndStrategy(cfg, "test", "test", store, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), nil, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, r)) + defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck - cfg := ring.Config{ - ReplicationFactor: testData.replicationFactor, - HeartbeatTimeout: time.Minute, - SubringCacheDisabled: true, - } + // Wait until the ring client has synced. + require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE)) - r, err := ring.NewWithStoreClientAndStrategy(cfg, "test", "test", store, ring.NewIgnoreUnhealthyInstancesReplicationStrategy(), nil, nil) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(ctx, r)) - defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck + // Assert on filter users. + for _, expected := range testData.expectedUsers { + filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger(), zoneStableShuffleSharding) //nolint:govet + assert.Equal(t, expected.users, filter.FilterUsers(ctx, []string{userID})) + } - // Wait until the ring client has synced. - require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE)) + // Assert on filter blocks. + for _, expected := range testData.expectedBlocks { + filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger(), zoneStableShuffleSharding) //nolint:govet + synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}) + synced.WithLabelValues(shardExcludedMeta).Set(0) - // Assert on filter users. - for _, expected := range testData.expectedUsers { - filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger()) - assert.Equal(t, expected.users, filter.FilterUsers(ctx, []string{userID})) - } + metas := map[ulid.ULID]*metadata.Meta{ + block1: {}, + block2: {}, + block3: {}, + block4: {}, + } - // Assert on filter blocks. - for _, expected := range testData.expectedBlocks { - filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger()) - synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}) - synced.WithLabelValues(shardExcludedMeta).Set(0) + err = filter.FilterBlocks(ctx, userID, metas, map[ulid.ULID]struct{}{}, synced) + require.NoError(t, err) - metas := map[ulid.ULID]*metadata.Meta{ - block1: {}, - block2: {}, - block3: {}, - block4: {}, - } + var actualBlocks []ulid.ULID + for id := range metas { + actualBlocks = append(actualBlocks, id) + } - err = filter.FilterBlocks(ctx, userID, metas, map[ulid.ULID]struct{}{}, synced) - require.NoError(t, err) + assert.ElementsMatch(t, expected.blocks, actualBlocks) - var actualBlocks []ulid.ULID - for id := range metas { - actualBlocks = append(actualBlocks, id) + // Assert on the metric used to keep track of the blocks filtered out. + synced.Submit() + assert.Equal(t, float64(numAllBlocks-len(expected.blocks)), testutil.ToFloat64(synced)) } - - assert.ElementsMatch(t, expected.blocks, actualBlocks) - - // Assert on the metric used to keep track of the blocks filtered out. - synced.Submit() - assert.Equal(t, float64(numAllBlocks-len(expected.blocks)), testutil.ToFloat64(synced)) - } - }) + }) + } } }