diff --git a/CHANGELOG.md b/CHANGELOG.md index 351594df429..f71771b602f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,9 @@ * [CHANGE] TLS server validation is now enabled by default, a new parameter `tls_insecure_skip_verify` can be set to true to skip validation optionally. #3030 * [CHANGE] `cortex_ruler_config_update_failures_total` has been removed in favor of `cortex_ruler_config_last_reload_successful`. #3056 * [FEATURE] Logging of the source IP passed along by a reverse proxy is now supported by setting the `-server.log-source-ips-enabled`. For non standard headers the settings `-server.log-source-ips-header` and `-server.log-source-ips-regex` can be used. #2985 +* [FEATURE] Experimental blocks storage: added shuffle sharding support to store-gateway blocks sharding. Added the following additional metrics to store-gateway: #3069 + * `cortex_bucket_stores_tenants_discovered` + * `cortex_bucket_stores_tenants_synced` * [ENHANCEMENT] Add support for azure storage in China, German and US Government environments. #2988 * [ENHANCEMENT] Query-tee: added a small tolerance to floating point sample values comparison. #2994 * [ENHANCEMENT] Query-tee: add support for doing a passthrough of requests to preferred backend for unregistered routes #3018 diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index fc3288b14cf..3a59c49e4d1 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -35,6 +35,19 @@ Blocks can be replicated across multiple store-gateway instances based on a repl This feature can be enabled via `-experimental.store-gateway.sharding-enabled=true` and requires the backend [hash ring](../architecture.md#the-hash-ring) to be configured via `-experimental.store-gateway.sharding-ring.*` flags (or their respective YAML config options). +### Sharding strategies + +The store-gateway supports two sharding strategies: + +- `default` +- `shuffle-sharding` + +The **`default`** sharding strategy spreads the blocks of each tenant across all store-gateway instances. It's the easiest form of sharding supported, but doesn't provide any workload isolation between different tenants. + +The **`shuffle-sharding`** strategy spreads the blocks of a tenant across a subset of store-gateway instances. This way, the number of store-gateway instances loading blocks of a single tenant is limited and the blast radius of any issue that could be introduced by the ternant's workload is limited to its shard instances. + +The shuffle sharding strategy can be enabled via `-experimental.store-gateway.sharding-strategy=shuffle-sharding` and requires the `-experimental.store-gateway.tenant-shard-size` flag (or their respective YAML config options) to be set to the default shard size, which is the default number of store-gateway instances each tenant should be sharded to. The shard size can then be overridden on a per-tenant basis setting the `store_gateway_tenant_shard_size` in the limits overrides. + ### Auto-forget When a store-gateway instance cleanly shutdowns, it automatically unregisters itself from the ring. However, in the event of a crash or node failure, the instance will not be unregistered from the ring, potentially leaving a spurious entry in the ring forever. @@ -193,6 +206,11 @@ store_gateway: # shutdown and restored at startup. # CLI flag: -experimental.store-gateway.tokens-file-path [tokens_file_path: | default = ""] + + # The sharding strategy to use. Supported values are: default, + # shuffle-sharding. + # CLI flag: -experimental.store-gateway.sharding-strategy + [sharding_strategy: | default = "default"] ``` ### `blocks_storage_config` diff --git a/docs/blocks-storage/store-gateway.template b/docs/blocks-storage/store-gateway.template index c053c01e0ed..29d02c6d063 100644 --- a/docs/blocks-storage/store-gateway.template +++ b/docs/blocks-storage/store-gateway.template @@ -35,6 +35,19 @@ Blocks can be replicated across multiple store-gateway instances based on a repl This feature can be enabled via `-experimental.store-gateway.sharding-enabled=true` and requires the backend [hash ring](../architecture.md#the-hash-ring) to be configured via `-experimental.store-gateway.sharding-ring.*` flags (or their respective YAML config options). +### Sharding strategies + +The store-gateway supports two sharding strategies: + +- `default` +- `shuffle-sharding` + +The **`default`** sharding strategy spreads the blocks of each tenant across all store-gateway instances. It's the easiest form of sharding supported, but doesn't provide any workload isolation between different tenants. + +The **`shuffle-sharding`** strategy spreads the blocks of a tenant across a subset of store-gateway instances. This way, the number of store-gateway instances loading blocks of a single tenant is limited and the blast radius of any issue that could be introduced by the ternant's workload is limited to its shard instances. + +The shuffle sharding strategy can be enabled via `-experimental.store-gateway.sharding-strategy=shuffle-sharding` and requires the `-experimental.store-gateway.tenant-shard-size` flag (or their respective YAML config options) to be set to the default shard size, which is the default number of store-gateway instances each tenant should be sharded to. The shard size can then be overridden on a per-tenant basis setting the `store_gateway_tenant_shard_size` in the limits overrides. + ### Auto-forget When a store-gateway instance cleanly shutdowns, it automatically unregisters itself from the ring. However, in the event of a crash or node failure, the instance will not be unregistered from the ring, potentially leaving a spurious entry in the ring forever. diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 412554aa263..796c15b2087 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2802,6 +2802,13 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -frontend.max-cache-freshness [max_cache_freshness: | default = 1m] +# The default tenant's shard size when the shuffle-sharding strategy is used. +# Must be set when the store-gateway sharding is enabled with the +# shuffle-sharding strategy. When this setting is specified in the per-tenant +# overrides, a value of 0 disables shuffle sharding for the tenant. +# CLI flag: -experimental.store-gateway.tenant-shard-size +[store_gateway_tenant_shard_size: | default = 0] + # File name of per-user overrides. [deprecated, use -runtime-config.file # instead] # CLI flag: -limits.per-user-override-config @@ -3604,6 +3611,10 @@ sharding_ring: # shutdown and restored at startup. # CLI flag: -experimental.store-gateway.tokens-file-path [tokens_file_path: | default = ""] + +# The sharding strategy to use. Supported values are: default, shuffle-sharding. +# CLI flag: -experimental.store-gateway.sharding-strategy +[sharding_strategy: | default = "default"] ``` ### `purger_config` diff --git a/integration/querier_test.go b/integration/querier_test.go index 03830f33ae1..426a4f644b9 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -25,30 +25,37 @@ import ( func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { tests := map[string]struct { - blocksShardingEnabled bool + blocksShardingStrategy string // Empty means sharding is disabled. + tenantShardSize int ingesterStreamingEnabled bool indexCacheBackend string }{ - "blocks sharding enabled, ingester gRPC streaming disabled, inmemory index cache": { - blocksShardingEnabled: true, - ingesterStreamingEnabled: false, - indexCacheBackend: tsdb.IndexCacheBackendInMemory, - }, - "blocks sharding enabled, ingester gRPC streaming enabled, inmemory index cache": { - blocksShardingEnabled: true, - ingesterStreamingEnabled: true, - indexCacheBackend: tsdb.IndexCacheBackendInMemory, - }, "blocks sharding disabled, ingester gRPC streaming disabled, memcached index cache": { - blocksShardingEnabled: false, + blocksShardingStrategy: "", ingesterStreamingEnabled: false, // Memcached index cache is required to avoid flaky tests when the blocks sharding is disabled // because two different requests may hit two different store-gateways, so if the cache is not // shared there's no guarantee we'll have a cache hit. indexCacheBackend: tsdb.IndexCacheBackendMemcached, }, - "blocks sharding enabled, ingester gRPC streaming enabled, memcached index cache": { - blocksShardingEnabled: true, + "blocks default sharding, ingester gRPC streaming disabled, inmemory index cache": { + blocksShardingStrategy: "default", + ingesterStreamingEnabled: false, + indexCacheBackend: tsdb.IndexCacheBackendInMemory, + }, + "blocks default sharding, ingester gRPC streaming enabled, inmemory index cache": { + blocksShardingStrategy: "default", + ingesterStreamingEnabled: true, + indexCacheBackend: tsdb.IndexCacheBackendInMemory, + }, + "blocks default sharding, ingester gRPC streaming enabled, memcached index cache": { + blocksShardingStrategy: "default", + ingesterStreamingEnabled: true, + indexCacheBackend: tsdb.IndexCacheBackendMemcached, + }, + "blocks shuffle sharding, ingester gRPC streaming enabled, memcached index cache": { + blocksShardingStrategy: "shuffle-sharding", + tenantShardSize: 1, ingesterStreamingEnabled: true, indexCacheBackend: tsdb.IndexCacheBackendMemcached, }, @@ -70,7 +77,9 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { "-experimental.blocks-storage.bucket-store.sync-interval": "1s", "-experimental.blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), "-experimental.blocks-storage.bucket-store.index-cache.backend": testCfg.indexCacheBackend, - "-experimental.store-gateway.sharding-enabled": strconv.FormatBool(testCfg.blocksShardingEnabled), + "-experimental.store-gateway.sharding-enabled": strconv.FormatBool(testCfg.blocksShardingStrategy != ""), + "-experimental.store-gateway.sharding-strategy": testCfg.blocksShardingStrategy, + "-experimental.store-gateway.tenant-shard-size": fmt.Sprintf("%d", testCfg.tenantShardSize), "-querier.ingester-streaming": strconv.FormatBool(testCfg.ingesterStreamingEnabled), }) @@ -92,7 +101,7 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { require.NoError(t, s.StartAndWaitReady(distributor, ingester, storeGateway1, storeGateway2)) // Start the querier with configuring store-gateway addresses if sharding is disabled. - if !testCfg.blocksShardingEnabled { + if testCfg.blocksShardingStrategy == "" { flags = mergeFlags(flags, map[string]string{ "-experimental.querier.store-gateway-addresses": strings.Join([]string{storeGateway1.NetworkGRPCEndpoint(), storeGateway2.NetworkGRPCEndpoint()}, ","), }) @@ -103,7 +112,7 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { // Wait until both the distributor and querier have updated the ring. The querier will also watch // the store-gateway ring if blocks sharding is enabled. require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) - if testCfg.blocksShardingEnabled { + if testCfg.blocksShardingStrategy != "" { require.NoError(t, querier.WaitSumMetrics(e2e.Equals(float64(512+(512*storeGateways.NumInstances()))), "cortex_ring_tokens_total")) } else { require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) @@ -153,12 +162,20 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { // Wait until the store-gateway has synched the new uploaded blocks. When sharding is enabled // we don't known which store-gateway instance will synch the blocks, so we need to wait on // metrics extracted from all instances. - if testCfg.blocksShardingEnabled { + if testCfg.blocksShardingStrategy != "" { require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2), "cortex_bucket_store_blocks_loaded")) } else { require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64(2*storeGateways.NumInstances())), "cortex_bucket_store_blocks_loaded")) } + // Check how many tenants have been discovered and synced by store-gateways. + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64(1*storeGateways.NumInstances())), "cortex_bucket_stores_tenants_discovered")) + if testCfg.blocksShardingStrategy == "shuffle-sharding" { + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64(1)), "cortex_bucket_stores_tenants_synced")) + } else { + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64(1*storeGateways.NumInstances())), "cortex_bucket_stores_tenants_synced")) + } + // Query back the series (1 only in the storage, 1 only in the ingesters, 1 on both). result, err := c.Query("series_1", series1Timestamp) require.NoError(t, err) diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index fd851815c02..0b61f4feaa3 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -188,7 +188,10 @@ func (c *Config) Validate(log log.Logger) error { return errors.Wrap(err, "invalid query_range config") } if err := c.TableManager.Validate(); err != nil { - return errors.Wrap(err, "invalid table_manager config") + return errors.Wrap(err, "invalid table-manager config") + } + if err := c.StoreGateway.Validate(c.LimitsConfig); err != nil { + return errors.Wrap(err, "invalid store-gateway config") } if c.Storage.Engine == storage.StorageEngineBlocks && c.Querier.SecondStoreEngine != storage.StorageEngineChunks && len(c.Schema.Configs) > 0 { diff --git a/pkg/querier/blocks_store_balanced_set.go b/pkg/querier/blocks_store_balanced_set.go index e81266ca1b9..4221c3a7044 100644 --- a/pkg/querier/blocks_store_balanced_set.go +++ b/pkg/querier/blocks_store_balanced_set.go @@ -58,7 +58,7 @@ func (s *blocksStoreBalancedSet) resolve(ctx context.Context) error { return nil } -func (s *blocksStoreBalancedSet) GetClientsFor(blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) { +func (s *blocksStoreBalancedSet) GetClientsFor(_ string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) { addresses := s.dnsProvider.Addresses() if len(addresses) == 0 { return nil, fmt.Errorf("no address resolved for the store-gateway service addresses %s", strings.Join(s.serviceAddresses, ",")) diff --git a/pkg/querier/blocks_store_balanced_set_test.go b/pkg/querier/blocks_store_balanced_set_test.go index c7fa8175cd9..5a9d8c0605d 100644 --- a/pkg/querier/blocks_store_balanced_set_test.go +++ b/pkg/querier/blocks_store_balanced_set_test.go @@ -33,7 +33,7 @@ func TestBlocksStoreBalancedSet_GetClientsFor(t *testing.T) { clientsCount := map[string]int{} for i := 0; i < numGets; i++ { - clients, err := s.GetClientsFor([]ulid.ULID{block1}, map[ulid.ULID][]string{}) + clients, err := s.GetClientsFor("", []ulid.ULID{block1}, map[ulid.ULID][]string{}) require.NoError(t, err) require.Len(t, clients, 1) @@ -138,7 +138,7 @@ func TestBlocksStoreBalancedSet_GetClientsFor_Exclude(t *testing.T) { require.NoError(t, services.StartAndAwaitRunning(ctx, s)) defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck - clients, err := s.GetClientsFor(testData.queryBlocks, testData.exclude) + clients, err := s.GetClientsFor("", testData.queryBlocks, testData.exclude) assert.Equal(t, testData.expectedErr, err) if testData.expectedErr == nil { @@ -146,5 +146,4 @@ func TestBlocksStoreBalancedSet_GetClientsFor_Exclude(t *testing.T) { } }) } - } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index c3999d6cd7e..ee009e3b11b 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -57,7 +57,7 @@ type BlocksStoreSet interface { // GetClientsFor returns the store gateway clients that should be used to // query the set of blocks in input. The exclude parameter is the map of // blocks -> store-gateway addresses that should be excluded. - GetClientsFor(blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) + GetClientsFor(userID string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) } // BlocksFinder is the interface used to find blocks for a given user and time range. @@ -82,6 +82,7 @@ type BlocksStoreClient interface { // BlocksStoreLimits is the interface that should be implemented by the limits provider. type BlocksStoreLimits interface { MaxChunksPerQuery(userID string) int + StoreGatewayTenantShardSize(userID string) int } type blocksStoreQueryableMetrics struct { @@ -193,7 +194,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa reg.MustRegister(storesRing) } - stores, err = newBlocksStoreReplicationSet(storesRing, querierCfg.StoreGatewayClient, logger, reg) + stores, err = newBlocksStoreReplicationSet(storesRing, gatewayCfg.ShardingStrategy, limits, querierCfg.StoreGatewayClient, logger, reg) if err != nil { return nil, errors.Wrap(err, "failed to create store set") } @@ -368,7 +369,7 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...* for attempt := 1; attempt <= maxFetchSeriesAttempts; attempt++ { // Find the set of store-gateway instances having the blocks. The exclude parameter is the // map of blocks queried so far, with the list of store-gateway addresses for each block. - clients, err := q.stores.GetClientsFor(remainingBlocks, attemptedBlocks) + clients, err := q.stores.GetClientsFor(q.userID, remainingBlocks, attemptedBlocks) if err != nil { // If it's a retry and we get an error, it means there are no more store-gateways left // from which running another attempt, so we're just stopping retrying. diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index e7f32a4a7db..7fb52366750 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -750,7 +750,7 @@ type blocksStoreSetMock struct { nextResult int } -func (m *blocksStoreSetMock) GetClientsFor(_ []ulid.ULID, _ map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) { +func (m *blocksStoreSetMock) GetClientsFor(_ string, _ []ulid.ULID, _ map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) { if m.nextResult >= len(m.mockedResponses) { panic("not enough mocked results") } @@ -815,13 +815,18 @@ func (m *storeGatewaySeriesClientMock) Recv() (*storepb.SeriesResponse, error) { } type blocksStoreLimitsMock struct { - maxChunksPerQuery int + maxChunksPerQuery int + storeGatewayTenantShardSize int } func (m *blocksStoreLimitsMock) MaxChunksPerQuery(_ string) int { return m.maxChunksPerQuery } +func (m *blocksStoreLimitsMock) StoreGatewayTenantShardSize(userID string) int { + return m.storeGatewayTenantShardSize +} + func mockSeriesResponse(lbls labels.Labels, timeMillis int64, value float64) *storepb.SeriesResponse { // Generate a chunk containing a single value (for simplicity). chunk := chunkenc.NewXORChunk() diff --git a/pkg/querier/blocks_store_replicated_set.go b/pkg/querier/blocks_store_replicated_set.go index 8c9beb607a2..89183efb56b 100644 --- a/pkg/querier/blocks_store_replicated_set.go +++ b/pkg/querier/blocks_store_replicated_set.go @@ -12,6 +12,7 @@ import ( "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/client" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storegateway" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/tls" @@ -22,18 +23,29 @@ import ( type blocksStoreReplicationSet struct { services.Service - storesRing *ring.Ring - clientsPool *client.Pool + storesRing *ring.Ring + clientsPool *client.Pool + shardingStrategy string + limits BlocksStoreLimits // Subservices manager. subservices *services.Manager subservicesWatcher *services.FailureWatcher } -func newBlocksStoreReplicationSet(storesRing *ring.Ring, tlsCfg tls.ClientConfig, logger log.Logger, reg prometheus.Registerer) (*blocksStoreReplicationSet, error) { +func newBlocksStoreReplicationSet( + storesRing *ring.Ring, + shardingStrategy string, + limits BlocksStoreLimits, + tlsCfg tls.ClientConfig, + logger log.Logger, + reg prometheus.Registerer, +) (*blocksStoreReplicationSet, error) { s := &blocksStoreReplicationSet{ - storesRing: storesRing, - clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), tlsCfg, logger, reg), + storesRing: storesRing, + clientsPool: newStoreGatewayClientPool(client.NewRingServiceDiscovery(storesRing), tlsCfg, logger, reg), + shardingStrategy: shardingStrategy, + limits: limits, } var err error @@ -72,17 +84,26 @@ func (s *blocksStoreReplicationSet) stopping(_ error) error { return services.StopManagerAndAwaitStopped(context.Background(), s.subservices) } -func (s *blocksStoreReplicationSet) GetClientsFor(blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) { +func (s *blocksStoreReplicationSet) GetClientsFor(userID string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) { shards := map[string][]ulid.ULID{} + // If shuffle sharding is enabled, we should build a subring for the user, + // otherwise we just use the full ring. + var userRing ring.ReadRing + if s.shardingStrategy == storegateway.ShardingStrategyShuffle { + userRing = storegateway.GetShuffleShardingSubring(s.storesRing, userID, s.limits) + } else { + userRing = s.storesRing + } + // Find the replication set of each block we need to query. for _, blockID := range blockIDs { // Buffer internally used by the ring (give extra room for a JOINING + LEAVING instance). // Do not reuse the same buffer across multiple Get() calls because we do retain the // returned replication set. - buf := make([]ring.IngesterDesc, 0, s.storesRing.ReplicationFactor()+2) + buf := make([]ring.IngesterDesc, 0, userRing.ReplicationFactor()+2) - set, err := s.storesRing.Get(cortex_tsdb.HashBlockID(blockID), ring.BlocksRead, buf) + set, err := userRing.Get(cortex_tsdb.HashBlockID(blockID), ring.BlocksRead, buf) if err != nil { return nil, errors.Wrapf(err, "failed to get store-gateway replication set owning the block %s", blockID.String()) } diff --git a/pkg/querier/blocks_store_replicated_set_test.go b/pkg/querier/blocks_store_replicated_set_test.go index 40037eecdc8..25380046cdc 100644 --- a/pkg/querier/blocks_store_replicated_set_test.go +++ b/pkg/querier/blocks_store_replicated_set_test.go @@ -37,7 +37,14 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { block3Hash := cortex_tsdb.HashBlockID(block3) block4Hash := cortex_tsdb.HashBlockID(block4) + // Ensure the user ID we use belongs to the instances holding the token for the block 1 + // (it's expected by the assertions below). + userID := "user-A" + require.LessOrEqual(t, cortex_tsdb.HashTenantID(userID), block1Hash) + tests := map[string]struct { + shardingStrategy string + tenantShardSize int replicationFactor int setup func(*ring.Desc) queryBlocks []ulid.ULID @@ -45,7 +52,11 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { expectedClients map[string][]ulid.ULID expectedErr error }{ - "single instance in the ring with replication factor = 1": { + // + // Sharding strategy: default + // + "default sharding, single instance in the ring with RF = 1": { + shardingStrategy: storegateway.ShardingStrategyDefault, replicationFactor: 1, setup: func(d *ring.Desc) { d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) @@ -55,7 +66,8 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { "127.0.0.1": {block1, block2}, }, }, - "single instance in the ring with replication factor = 1 but excluded": { + "default sharding, single instance in the ring with RF = 1 but excluded": { + shardingStrategy: storegateway.ShardingStrategyDefault, replicationFactor: 1, setup: func(d *ring.Desc) { d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) @@ -66,7 +78,8 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { }, expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block1.String()), }, - "single instance in the ring with replication factor = 1 but excluded for non queried block": { + "default sharding, single instance in the ring with RF = 1 but excluded for non queried block": { + shardingStrategy: storegateway.ShardingStrategyDefault, replicationFactor: 1, setup: func(d *ring.Desc) { d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) @@ -79,7 +92,8 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { "127.0.0.1": {block1, block2}, }, }, - "single instance in the ring with replication factor = 2": { + "default sharding, single instance in the ring with RF = 2": { + shardingStrategy: storegateway.ShardingStrategyDefault, replicationFactor: 2, setup: func(d *ring.Desc) { d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) @@ -89,7 +103,8 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { "127.0.0.1": {block1, block2}, }, }, - "multiple instances in the ring with each requested block belonging to a different store-gateway and replication factor = 1": { + "default sharding, multiple instances in the ring with each requested block belonging to a different store-gateway and RF = 1": { + shardingStrategy: storegateway.ShardingStrategyDefault, replicationFactor: 1, setup: func(d *ring.Desc) { d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) @@ -104,7 +119,8 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { "127.0.0.4": {block4}, }, }, - "multiple instances in the ring with each requested block belonging to a different store-gateway and replication factor = 1 but excluded": { + "default sharding, multiple instances in the ring with each requested block belonging to a different store-gateway and RF = 1 but excluded": { + shardingStrategy: storegateway.ShardingStrategyDefault, replicationFactor: 1, setup: func(d *ring.Desc) { d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) @@ -118,7 +134,8 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { }, expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block3.String()), }, - "multiple instances in the ring with each requested block belonging to a different store-gateway and replication factor = 2": { + "default sharding, multiple instances in the ring with each requested block belonging to a different store-gateway and RF = 2": { + shardingStrategy: storegateway.ShardingStrategyDefault, replicationFactor: 2, setup: func(d *ring.Desc) { d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) @@ -133,7 +150,8 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { "127.0.0.4": {block4}, }, }, - "multiple instances in the ring with multiple requested blocks belonging to the same store-gateway and replication factor = 2": { + "default sharding, multiple instances in the ring with multiple requested blocks belonging to the same store-gateway and RF = 2": { + shardingStrategy: storegateway.ShardingStrategyDefault, replicationFactor: 2, setup: func(d *ring.Desc) { d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) @@ -145,7 +163,8 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { "127.0.0.2": {block2, block3}, }, }, - "multiple instances in the ring with each requested block belonging to a different store-gateway and replication factor = 2 and some blocks excluded but with replacement available": { + "default sharding, multiple instances in the ring with each requested block belonging to a different store-gateway and RF = 2 and some blocks excluded but with replacement available": { + shardingStrategy: storegateway.ShardingStrategyDefault, replicationFactor: 2, setup: func(d *ring.Desc) { d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) @@ -163,6 +182,130 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { "127.0.0.4": {block3, block4}, }, }, + // + // Sharding strategy: shuffle sharding + // + "shuffle sharding, single instance in the ring with RF = 1, SS = 1": { + shardingStrategy: storegateway.ShardingStrategyShuffle, + tenantShardSize: 1, + replicationFactor: 1, + setup: func(d *ring.Desc) { + d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) + }, + queryBlocks: []ulid.ULID{block1, block2}, + expectedClients: map[string][]ulid.ULID{ + "127.0.0.1": {block1, block2}, + }, + }, + "shuffle sharding, single instance in the ring with RF = 1, SS = 1 but excluded": { + shardingStrategy: storegateway.ShardingStrategyShuffle, + tenantShardSize: 1, + replicationFactor: 1, + setup: func(d *ring.Desc) { + d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) + }, + queryBlocks: []ulid.ULID{block1, block2}, + exclude: map[ulid.ULID][]string{ + block1: {"127.0.0.1"}, + }, + expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block1.String()), + }, + "shuffle sharding, single instance in the ring with RF = 2, SS = 2": { + shardingStrategy: storegateway.ShardingStrategyShuffle, + tenantShardSize: 2, + replicationFactor: 2, + setup: func(d *ring.Desc) { + d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) + }, + queryBlocks: []ulid.ULID{block1, block2}, + expectedClients: map[string][]ulid.ULID{ + "127.0.0.1": {block1, block2}, + }, + }, + "shuffle sharding, multiple instances in the ring with RF = 1, SS = 1": { + shardingStrategy: storegateway.ShardingStrategyShuffle, + tenantShardSize: 1, + replicationFactor: 1, + setup: func(d *ring.Desc) { + d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) + d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) + d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE) + d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE) + }, + queryBlocks: []ulid.ULID{block1, block2, block4}, + expectedClients: map[string][]ulid.ULID{ + "127.0.0.1": {block1, block2, block4}, + }, + }, + "shuffle sharding, multiple instances in the ring with RF = 1, SS = 2": { + shardingStrategy: storegateway.ShardingStrategyShuffle, + tenantShardSize: 2, + replicationFactor: 1, + setup: func(d *ring.Desc) { + d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) + d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) + d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE) + d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE) + }, + queryBlocks: []ulid.ULID{block1, block2, block4}, + expectedClients: map[string][]ulid.ULID{ + "127.0.0.1": {block1, block4}, + "127.0.0.2": {block2}, + }, + }, + "shuffle sharding, multiple instances in the ring with RF = 1, SS = 4": { + shardingStrategy: storegateway.ShardingStrategyShuffle, + tenantShardSize: 4, + replicationFactor: 1, + setup: func(d *ring.Desc) { + d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) + d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) + d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE) + d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE) + }, + queryBlocks: []ulid.ULID{block1, block2, block4}, + expectedClients: map[string][]ulid.ULID{ + "127.0.0.1": {block1}, + "127.0.0.2": {block2}, + "127.0.0.4": {block4}, + }, + }, + "shuffle sharding, multiple instances in the ring with RF = 2, SS = 2 with excluded blocks but some replacement available": { + shardingStrategy: storegateway.ShardingStrategyShuffle, + tenantShardSize: 2, + replicationFactor: 2, + setup: func(d *ring.Desc) { + d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) + d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) + d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE) + d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE) + }, + queryBlocks: []ulid.ULID{block1, block2}, + exclude: map[ulid.ULID][]string{ + block1: {"127.0.0.1"}, + block2: {"127.0.0.1"}, + }, + expectedClients: map[string][]ulid.ULID{ + "127.0.0.2": {block1, block2}, + }, + }, + "shuffle sharding, multiple instances in the ring with RF = 2, SS = 2 with excluded blocks and no replacement available": { + shardingStrategy: storegateway.ShardingStrategyShuffle, + tenantShardSize: 2, + replicationFactor: 2, + setup: func(d *ring.Desc) { + d.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) + d.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) + d.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE) + d.AddIngester("instance-4", "127.0.0.4", "", []uint32{block4Hash + 1}, ring.ACTIVE) + }, + queryBlocks: []ulid.ULID{block1, block2}, + exclude: map[ulid.ULID][]string{ + block1: {"127.0.0.1", "127.0.0.2"}, + block2: {"127.0.0.1"}, + }, + expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block1.String()), + }, } for testName, testData := range tests { @@ -188,8 +331,12 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { r, err := ring.NewWithStoreClientAndStrategy(ringCfg, "test", "test", ringStore, &storegateway.BlocksReplicationStrategy{}) require.NoError(t, err) + limits := &blocksStoreLimitsMock{ + storeGatewayTenantShardSize: testData.tenantShardSize, + } + reg := prometheus.NewPedanticRegistry() - s, err := newBlocksStoreReplicationSet(r, tls.ClientConfig{}, log.NewNopLogger(), reg) + s, err := newBlocksStoreReplicationSet(r, testData.shardingStrategy, limits, tls.ClientConfig{}, log.NewNopLogger(), reg) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, s)) defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck @@ -200,7 +347,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) { return err == nil && len(all.Ingesters) > 0 }) - clients, err := s.GetClientsFor(testData.queryBlocks, testData.exclude) + clients, err := s.GetClientsFor(userID, testData.queryBlocks, testData.exclude) assert.Equal(t, testData.expectedErr, err) if testData.expectedErr == nil { diff --git a/pkg/ring/ring.go b/pkg/ring/ring.go index ca8583401e2..95acfd11878 100644 --- a/pkg/ring/ring.go +++ b/pkg/ring/ring.go @@ -48,6 +48,9 @@ type ReadRing interface { ReplicationFactor() int IngesterCount() int Subring(key uint32, n int) ReadRing + + // HasInstance returns whether the ring contains an instance matching the provided instanceID. + HasInstance(instanceID string) bool } // Operation can be Read or Write @@ -475,3 +478,13 @@ func (r *Ring) GetInstanceState(instanceID string) (IngesterState, error) { return instance.GetState(), nil } + +// HasInstance returns whether the ring contains an instance matching the provided instanceID. +func (r *Ring) HasInstance(instanceID string) bool { + r.mtx.RLock() + defer r.mtx.RUnlock() + + instances := r.ringDesc.GetIngesters() + _, ok := instances[instanceID] + return ok +} diff --git a/pkg/storage/tsdb/util.go b/pkg/storage/tsdb/util.go index c13f5a2b7c4..7687e0dbc9c 100644 --- a/pkg/storage/tsdb/util.go +++ b/pkg/storage/tsdb/util.go @@ -15,3 +15,9 @@ func HashBlockID(id ulid.ULID) uint32 { } return h } + +// HashTenantID returns a 32-bit hash of the tenant ID useful for +// ring-based sharding. +func HashTenantID(id string) uint32 { + return client.HashAdd32a(client.HashNew32a(), id) +} diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 8203130d92f..a9e20493b4b 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -41,7 +41,7 @@ type BucketStores struct { logLevel logging.Level bucketStoreMetrics *BucketStoreMetrics metaFetcherMetrics *MetadataFetcherMetrics - filters []block.MetadataFilter + shardingStrategy ShardingStrategy // Index cache shared across all tenants. indexCache storecache.IndexCache @@ -54,12 +54,14 @@ type BucketStores struct { stores map[string]*store.BucketStore // Metrics. - syncTimes prometheus.Histogram - syncLastSuccess prometheus.Gauge + syncTimes prometheus.Histogram + syncLastSuccess prometheus.Gauge + tenantsDiscovered prometheus.Gauge + tenantsSynced prometheus.Gauge } // NewBucketStores makes a new BucketStores. -func NewBucketStores(cfg tsdb.BlocksStorageConfig, filters []block.MetadataFilter, bucketClient objstore.Bucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) { +func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) { cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, bucketClient, logger, reg) if err != nil { return nil, errors.Wrapf(err, "create caching bucket") @@ -78,7 +80,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, filters []block.MetadataFilte cfg: cfg, limits: limits, bucket: cachingBucket, - filters: filters, + shardingStrategy: shardingStrategy, stores: map[string]*store.BucketStore{}, logLevel: logLevel, bucketStoreMetrics: NewBucketStoreMetrics(), @@ -93,6 +95,14 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, filters []block.MetadataFilte Name: "cortex_bucket_stores_blocks_last_successful_sync_timestamp_seconds", Help: "Unix timestamp of the last successful blocks sync.", }), + tenantsDiscovered: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_bucket_stores_tenants_discovered", + Help: "Number of tenants discovered in the bucket.", + }), + tenantsSynced: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_bucket_stores_tenants_synced", + Help: "Number of tenants synced.", + }), } // Init the index cache. @@ -147,6 +157,22 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte errs := tsdb_errors.MultiError{} errsMx := sync.Mutex{} + // Scan users in the bucket. In case of error, it may return a subset of users. If we sync a subset of users + // during a periodic sync, we may end up unloading blocks for users that still belong to this store-gateway + // so we do prefer to not run the sync at all. + userIDs, err := u.scanUsers(ctx) + if err != nil { + return err + } + + includeUserIDs := make(map[string]struct{}) + for _, userID := range u.shardingStrategy.FilterUsers(ctx, userIDs) { + includeUserIDs[userID] = struct{}{} + } + + u.tenantsDiscovered.Set(float64(len(userIDs))) + u.tenantsSynced.Set(float64(len(includeUserIDs))) + // Create a pool of workers which will synchronize blocks. The pool size // is limited in order to avoid to concurrently sync a lot of tenants in // a large cluster. @@ -165,28 +191,32 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte }() } - // Iterate the bucket, lazily create a bucket store for each new user found + // Lazily create a bucket store for each new user found // and submit a sync job for each user. - err := u.bucket.Iter(ctx, "", func(s string) error { - user := strings.TrimSuffix(s, "/") + for _, userID := range userIDs { + // If we don't have a store for the tenant yet, then we should skip it if it's not + // included in the store-gateway shard. If we already have it, we need to sync it + // anyway to make sure all its blocks are unloaded and metrics updated correctly + // (but bucket API calls are skipped thanks to the objstore client adapter). + if _, included := includeUserIDs[userID]; !included && u.getStore(userID) == nil { + continue + } - bs, err := u.getOrCreateStore(user) + bs, err := u.getOrCreateStore(userID) if err != nil { - return err + errsMx.Lock() + errs.Add(err) + errsMx.Unlock() + + continue } select { - case jobs <- job{userID: user, store: bs}: - return nil + case jobs <- job{userID: userID, store: bs}: + // Nothing to do. Will loop to push more jobs. case <-ctx.Done(): return ctx.Err() } - }) - - if err != nil { - errsMx.Lock() - errs.Add(err) - errsMx.Unlock() } // Wait until all workers completed. @@ -217,6 +247,22 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri }) } +// scanUsers in the bucket and return the list of found users. If an error occurs while +// iterating the bucket, it may return both an error and a subset of the users in the bucket. +func (u *BucketStores) scanUsers(ctx context.Context) ([]string, error) { + var users []string + + // Iterate the bucket to find all users in the bucket. Due to how the bucket listing + // caching works, it's more likely to have a cache hit if there's no delay while + // iterating the bucket, so we do load all users in memory and later process them. + err := u.bucket.Iter(ctx, "", func(s string) error { + users = append(users, strings.TrimSuffix(s, "/")) + return nil + }) + + return users, err +} + func (u *BucketStores) getStore(userID string) *store.BucketStore { u.storesMu.RLock() store := u.stores[userID] @@ -247,15 +293,22 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro userBkt := tsdb.NewUserBucketClient(userID, u.bucket) + // Wrap the bucket reader to skip iterating the bucket at all if the user doesn't + // belong to the store-gateway shard. We need to run the BucketStore synching anyway + // in order to unload previous tenants in case of a resharding leading to tenants + // moving out from the store-gateway shard and also make sure both MetaFetcher and + // BucketStore metrics are correctly updated. + fetcherBkt := NewShardingBucketReaderAdapter(userID, u.shardingStrategy, userBkt) + fetcherReg := prometheus.NewRegistry() fetcher, err := block.NewMetaFetcher( userLogger, u.cfg.BucketStore.MetaSyncConcurrency, - userBkt, + fetcherBkt, filepath.Join(u.cfg.BucketStore.SyncDir, userID), // The fetcher stores cached metas in the "meta-syncer/" sub directory fetcherReg, - // The input filters MUST be before the ones we create here (order matters). - append(u.filters, []block.MetadataFilter{ + // The sharding strategy filter MUST be before the ones we create here (order matters). + append([]block.MetadataFilter{NewShardingMetadataFilterAdapter(userID, u.shardingStrategy)}, []block.MetadataFilter{ block.NewConsistencyDelayMetaFilter(userLogger, u.cfg.BucketStore.ConsistencyDelay, fetcherReg), block.NewIgnoreDeletionMarkFilter(userLogger, userBkt, u.cfg.BucketStore.IgnoreDeletionMarksDelay), // The duplicate filter has been intentionally omitted because it could cause troubles with diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 373071a5c1f..5930dc02171 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -48,7 +49,7 @@ func TestBucketStores_InitialSync(t *testing.T) { require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, nil, bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) // Query series before the initial sync. @@ -66,7 +67,7 @@ func TestBucketStores_InitialSync(t *testing.T) { seriesSet, warnings, err := querySeries(stores, userID, metricName, 20, 40) require.NoError(t, err) assert.Empty(t, warnings) - assert.Len(t, seriesSet, 1) + require.Len(t, seriesSet, 1) assert.Equal(t, []storepb.Label{{Name: labels.MetricName, Value: metricName}}, seriesSet[0].Labels) } @@ -124,7 +125,7 @@ func TestBucketStores_SyncBlocks(t *testing.T) { require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - stores, err := NewBucketStores(cfg, nil, bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) // Run an initial sync to discover 1 block. @@ -179,26 +180,50 @@ func TestBucketStores_SyncBlocks(t *testing.T) { } func TestBucketStores_syncUsersBlocks(t *testing.T) { - cfg, cleanup := prepareStorageConfig(t) - cfg.BucketStore.TenantSyncConcurrency = 2 - defer cleanup() - - bucketClient := &cortex_tsdb.BucketClientMock{} - bucketClient.MockIter("", []string{"user-1", "user-2", "user-3"}, nil) - - stores, err := NewBucketStores(cfg, nil, bucketClient, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), nil) - require.NoError(t, err) - - // Sync user stores and count the number of times the callback is called. - var storesCount atomic.Int32 - err = stores.syncUsersBlocks(context.Background(), func(ctx context.Context, bs *store.BucketStore) error { - storesCount.Inc() - return nil - }) + allUsers := []string{"user-1", "user-2", "user-3"} + + tests := map[string]struct { + shardingStrategy ShardingStrategy + expectedStores int32 + }{ + "when sharding is disabled all users should be synced": { + shardingStrategy: NewNoShardingStrategy(), + expectedStores: 3, + }, + "when sharding is enabled only stores for filtered users should be created": { + shardingStrategy: func() ShardingStrategy { + s := &mockShardingStrategy{} + s.On("FilterUsers", mock.Anything, allUsers).Return([]string{"user-1", "user-2"}) + return s + }(), + expectedStores: 2, + }, + } - assert.NoError(t, err) - bucketClient.AssertNumberOfCalls(t, "Iter", 1) - assert.Equal(t, storesCount.Load(), int32(3)) + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + cfg, cleanup := prepareStorageConfig(t) + cfg.BucketStore.TenantSyncConcurrency = 2 + defer cleanup() + + bucketClient := &cortex_tsdb.BucketClientMock{} + bucketClient.MockIter("", allUsers, nil) + + stores, err := NewBucketStores(cfg, testData.shardingStrategy, bucketClient, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), nil) + require.NoError(t, err) + + // Sync user stores and count the number of times the callback is called. + var storesCount atomic.Int32 + err = stores.syncUsersBlocks(context.Background(), func(ctx context.Context, bs *store.BucketStore) error { + storesCount.Inc() + return nil + }) + + assert.NoError(t, err) + bucketClient.AssertNumberOfCalls(t, "Iter", 1) + assert.Equal(t, storesCount.Load(), testData.expectedStores) + }) + } } func prepareStorageConfig(t *testing.T) (cortex_tsdb.BlocksStorageConfig, func()) { diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 58a00add354..507f23654a9 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -3,6 +3,8 @@ package storegateway import ( "context" "flag" + "fmt" + "strings" "time" "github.com/go-kit/kit/log" @@ -10,7 +12,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -37,12 +38,25 @@ const ( // ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance // in the ring will be automatically removed. ringAutoForgetUnhealthyPeriods = 10 + + // Supported sharding strategies. + ShardingStrategyDefault = "default" + ShardingStrategyShuffle = "shuffle-sharding" +) + +var ( + supportedShardingStrategies = []string{ShardingStrategyDefault, ShardingStrategyShuffle} + + // Validation errors. + errInvalidShardingStrategy = errors.New("invalid sharding strategy") + errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0") ) // Config holds the store gateway config. type Config struct { - ShardingEnabled bool `yaml:"sharding_enabled"` - ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration. This option is required only if blocks sharding is enabled."` + ShardingEnabled bool `yaml:"sharding_enabled"` + ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration. This option is required only if blocks sharding is enabled."` + ShardingStrategy string `yaml:"sharding_strategy"` } // RegisterFlags registers the Config flags. @@ -50,6 +64,22 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.ShardingRing.RegisterFlags(f) f.BoolVar(&cfg.ShardingEnabled, "experimental.store-gateway.sharding-enabled", false, "Shard blocks across multiple store gateway instances."+sharedOptionWithQuerier) + f.StringVar(&cfg.ShardingStrategy, "experimental.store-gateway.sharding-strategy", ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", "))) +} + +// Validate the Config. +func (cfg *Config) Validate(limits validation.Limits) error { + if cfg.ShardingEnabled { + if !util.StringsContain(supportedShardingStrategies, cfg.ShardingStrategy) { + return errInvalidShardingStrategy + } + + if cfg.ShardingStrategy == ShardingStrategyShuffle && limits.StoreGatewayTenantShardSize <= 0 { + return errInvalidTenantShardSize + } + } + + return nil } // StoreGateway is the Cortex service responsible to expose an API over the bucket @@ -98,7 +128,6 @@ func NewStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, bucketClient objstore.Bucket, ringStore kv.Client, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error) { var err error - var filters []block.MetadataFilter g := &StoreGateway{ gatewayCfg: gatewayCfg, @@ -115,6 +144,9 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf g.bucketSync.WithLabelValues(syncReasonPeriodic) g.bucketSync.WithLabelValues(syncReasonRingChange) + // Init sharding strategy. + var shardingStrategy ShardingStrategy + if gatewayCfg.ShardingEnabled { lifecyclerCfg, err := gatewayCfg.ShardingRing.ToLifecyclerConfig() if err != nil { @@ -143,12 +175,20 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf reg.MustRegister(g.ring) } - // Filter blocks by the shard of this store-gateway instance if the - // sharding is enabled. - filters = append(filters, NewShardingMetadataFilter(g.ring, lifecyclerCfg.Addr, logger)) + // Instance the right strategy. + switch gatewayCfg.ShardingStrategy { + case ShardingStrategyDefault: + shardingStrategy = NewDefaultShardingStrategy(g.ring, lifecyclerCfg.Addr, logger) + case ShardingStrategyShuffle: + shardingStrategy = NewShuffleShardingStrategy(g.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, limits, logger) + default: + return nil, errInvalidShardingStrategy + } + } else { + shardingStrategy = NewNoShardingStrategy() } - g.stores, err = NewBucketStores(storageCfg, filters, bucketClient, limits, logLevel, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg)) + g.stores, err = NewBucketStores(storageCfg, shardingStrategy, bucketClient, limits, logLevel, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg)) if err != nil { return nil, errors.Wrap(err, "create bucket stores") } diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index 49032dfae40..2bec1ba57a3 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -14,14 +14,17 @@ import ( "time" "github.com/go-kit/kit/log" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/cortexproject/cortex/pkg/ring" @@ -35,7 +38,52 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) -func TestStoreGateway_InitialSyncWithShardingEnabled(t *testing.T) { +func TestConfig_Validate(t *testing.T) { + tests := map[string]struct { + setup func(cfg *Config, limits *validation.Limits) + expected error + }{ + "should pass by default": { + setup: func(cfg *Config, limits *validation.Limits) {}, + expected: nil, + }, + "should fail if the sharding strategy is invalid": { + setup: func(cfg *Config, limits *validation.Limits) { + cfg.ShardingEnabled = true + cfg.ShardingStrategy = "xxx" + }, + expected: errInvalidShardingStrategy, + }, + "should fail if the sharding strategy is shuffle-sharding and shard size has not been set": { + setup: func(cfg *Config, limits *validation.Limits) { + cfg.ShardingEnabled = true + cfg.ShardingStrategy = ShardingStrategyShuffle + }, + expected: errInvalidTenantShardSize, + }, + "should pass if the sharding strategy is shuffle-sharding and shard size has been set": { + setup: func(cfg *Config, limits *validation.Limits) { + cfg.ShardingEnabled = true + cfg.ShardingStrategy = ShardingStrategyShuffle + limits.StoreGatewayTenantShardSize = 3 + }, + expected: nil, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + cfg := &Config{} + limits := &validation.Limits{} + flagext.DefaultValues(cfg, limits) + testData.setup(cfg, limits) + + assert.Equal(t, testData.expected, cfg.Validate(*limits)) + }) + } +} + +func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) { tests := map[string]struct { initialExists bool initialState ring.IngesterState @@ -168,7 +216,8 @@ func TestStoreGateway_BlocksSharding(t *testing.T) { // This tests uses real TSDB blocks. 24h time range, 2h block range period, // 2 users = total (24 / 12) * 2 = 24 blocks. - numBlocks := 24 + numUsers := 2 + numBlocks := numUsers * 12 now := time.Now() require.NoError(t, mockTSDB(path.Join(storageDir, "user-1"), 24, now.Add(-24*time.Hour).Unix()*1000, now.Unix()*1000)) require.NoError(t, mockTSDB(path.Join(storageDir, "user-2"), 24, now.Add(-24*time.Hour).Unix()*1000, now.Unix()*1000)) @@ -177,43 +226,65 @@ func TestStoreGateway_BlocksSharding(t *testing.T) { require.NoError(t, err) tests := map[string]struct { - shardingEnabled bool + shardingStrategy string // Empty string means disabled. + tenantShardSize int // Used only when the sharding strategy is shuffle-sharding. replicationFactor int numGateways int expectedBlocksLoaded int }{ - "1 gateway, sharding disabled": { - shardingEnabled: false, + "sharding disabled, 1 gateway": { + shardingStrategy: "", numGateways: 1, expectedBlocksLoaded: numBlocks, }, - "2 gateways, sharding disabled": { - shardingEnabled: false, + "sharding disabled, 2 gateways": { + shardingStrategy: "", numGateways: 2, expectedBlocksLoaded: 2 * numBlocks, // each gateway loads all the blocks }, - "1 gateway, sharding enabled, replication factor = 1": { - shardingEnabled: true, + "default sharding strategy, 1 gateway, RF = 1": { + shardingStrategy: ShardingStrategyDefault, replicationFactor: 1, numGateways: 1, expectedBlocksLoaded: numBlocks, }, - "2 gateways, sharding enabled, replication factor = 1": { - shardingEnabled: true, + "default sharding strategy, 2 gateways, RF = 1": { + shardingStrategy: ShardingStrategyDefault, replicationFactor: 1, numGateways: 2, expectedBlocksLoaded: numBlocks, // blocks are sharded across gateways }, - "3 gateways, sharding enabled, replication factor = 2": { - shardingEnabled: true, + "default sharding strategy, 3 gateways, RF = 2": { + shardingStrategy: ShardingStrategyDefault, replicationFactor: 2, numGateways: 3, expectedBlocksLoaded: 2 * numBlocks, // blocks are replicated 2 times }, - "3 gateways, sharding enabled, replication factor = 3": { - shardingEnabled: true, + "default sharding strategy, 5 gateways, RF = 3": { + shardingStrategy: ShardingStrategyDefault, replicationFactor: 3, - numGateways: 3, + numGateways: 5, + expectedBlocksLoaded: 3 * numBlocks, // blocks are replicated 3 times + }, + "shuffle sharding strategy, 1 gateway, RF = 1, SS = 1": { + shardingStrategy: ShardingStrategyShuffle, + tenantShardSize: 1, + replicationFactor: 1, + numGateways: 1, + expectedBlocksLoaded: numBlocks, + }, + "shuffle sharding strategy, 5 gateways, RF = 2, SS = 3": { + shardingStrategy: ShardingStrategyShuffle, + tenantShardSize: 3, + replicationFactor: 2, + numGateways: 5, + expectedBlocksLoaded: 2 * numBlocks, // blocks are replicated 2 times + }, + "shuffle sharding strategy, 20 gateways, RF = 3, SS = 3": { + shardingStrategy: ShardingStrategyShuffle, + tenantShardSize: 3, + replicationFactor: 3, + numGateways: 20, expectedBlocksLoaded: 3 * numBlocks, // blocks are replicated 3 times }, } @@ -222,6 +293,7 @@ func TestStoreGateway_BlocksSharding(t *testing.T) { t.Run(testName, func(t *testing.T) { ctx := context.Background() storageCfg, cleanup := mockStorageConfig(t) + storageCfg.BucketStore.SyncInterval = time.Hour // Do not trigger the periodic sync in this test (we explicitly sync stores). defer cleanup() ringStore := consul.NewInMemoryClient(ring.GetCodec()) @@ -233,14 +305,26 @@ func TestStoreGateway_BlocksSharding(t *testing.T) { for i := 1; i <= testData.numGateways; i++ { instanceID := fmt.Sprintf("gateway-%d", i) + limits := defaultLimitsConfig() gatewayCfg := mockGatewayConfig() - gatewayCfg.ShardingEnabled = testData.shardingEnabled gatewayCfg.ShardingRing.ReplicationFactor = testData.replicationFactor gatewayCfg.ShardingRing.InstanceID = instanceID gatewayCfg.ShardingRing.InstanceAddr = fmt.Sprintf("127.0.0.%d", i) + gatewayCfg.ShardingRing.RingCheckPeriod = time.Hour // Do not check the ring topology changes in this test (we explicitly sync stores). + + if testData.shardingStrategy == "" { + gatewayCfg.ShardingEnabled = false + } else { + gatewayCfg.ShardingEnabled = true + gatewayCfg.ShardingStrategy = testData.shardingStrategy + limits.StoreGatewayTenantShardSize = testData.tenantShardSize + } + + overrides, err := validation.NewOverrides(limits, nil) + require.NoError(t, err) reg := prometheus.NewPedanticRegistry() - g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, ringStore, overrides, mockLoggingLevel(), log.NewNopLogger(), reg) require.NoError(t, err) defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck @@ -252,7 +336,7 @@ func TestStoreGateway_BlocksSharding(t *testing.T) { } // Wait until the ring client of each gateway has synced (to avoid flaky tests on subsequent assertions). - if testData.shardingEnabled { + if testData.shardingStrategy != "" { ctx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() @@ -273,10 +357,15 @@ func TestStoreGateway_BlocksSharding(t *testing.T) { // Assert on the number of blocks loaded extracting this information from metrics. metrics := util.BuildMetricFamiliesPerUserFromUserRegistries(registries) assert.Equal(t, float64(testData.expectedBlocksLoaded), metrics.GetSumOfGauges("cortex_bucket_store_blocks_loaded")) + assert.Equal(t, float64(2*testData.numGateways), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_discovered")) - // The total number of blocks synced (before filtering) is always equal to the total - // number of blocks for each instance. - assert.Equal(t, float64(testData.numGateways*numBlocks), metrics.GetSumOfGauges("cortex_blocks_meta_synced")) + if testData.shardingStrategy == ShardingStrategyShuffle { + assert.Equal(t, float64(testData.tenantShardSize*numBlocks), metrics.GetSumOfGauges("cortex_blocks_meta_synced")) + assert.Equal(t, float64(testData.tenantShardSize*numUsers), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_synced")) + } else { + assert.Equal(t, float64(testData.numGateways*numBlocks), metrics.GetSumOfGauges("cortex_blocks_meta_synced")) + assert.Equal(t, float64(testData.numGateways*numUsers), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_synced")) + } }) } } @@ -830,3 +919,17 @@ func defaultLimitsOverrides(t *testing.T) *validation.Overrides { return overrides } + +type mockShardingStrategy struct { + mock.Mock +} + +func (m *mockShardingStrategy) FilterUsers(ctx context.Context, userIDs []string) []string { + args := m.Called(ctx, userIDs) + return args.Get(0).([]string) +} + +func (m *mockShardingStrategy) FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error { + args := m.Called(ctx, userID, metas, synced) + return args.Error(0) +} diff --git a/pkg/storegateway/metadata_filters.go b/pkg/storegateway/metadata_filters.go deleted file mode 100644 index 75d79094cbf..00000000000 --- a/pkg/storegateway/metadata_filters.go +++ /dev/null @@ -1,60 +0,0 @@ -package storegateway - -import ( - "context" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/oklog/ulid" - "github.com/thanos-io/thanos/pkg/block/metadata" - "github.com/thanos-io/thanos/pkg/extprom" - - "github.com/cortexproject/cortex/pkg/ring" - cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" -) - -const ( - shardExcludedMeta = "shard-excluded" -) - -// ShardingMetadataFilter represents struct that allows sharding using the ring. -// Not go-routine safe. -type ShardingMetadataFilter struct { - r *ring.Ring - instanceAddr string - logger log.Logger -} - -// NewShardingMetadataFilter creates ShardingMetadataFilter. -func NewShardingMetadataFilter(r *ring.Ring, instanceAddr string, logger log.Logger) *ShardingMetadataFilter { - return &ShardingMetadataFilter{ - r: r, - instanceAddr: instanceAddr, - logger: logger, - } -} - -// Filter filters out blocks not included within the current shard. -func (f *ShardingMetadataFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error { - // Buffer internally used by the ring (give extra room for a JOINING + LEAVING instance). - buf := make([]ring.IngesterDesc, 0, f.r.ReplicationFactor()+2) - - for blockID := range metas { - key := cortex_tsdb.HashBlockID(blockID) - set, err := f.r.Get(key, ring.BlocksSync, buf) - - // If there are no healthy instances in the replication set or - // the replication set for this block doesn't include this instance - // then we filter it out. - if err != nil || !set.Includes(f.instanceAddr) { - if err != nil { - level.Warn(f.logger).Log("msg", "failed to get replication set for block", "block", blockID.String(), "err", err) - } - - synced.WithLabelValues(shardExcludedMeta).Inc() - delete(metas, blockID) - } - } - - return nil -} diff --git a/pkg/storegateway/metadata_filters_test.go b/pkg/storegateway/metadata_filters_test.go deleted file mode 100644 index 97f97bfc20f..00000000000 --- a/pkg/storegateway/metadata_filters_test.go +++ /dev/null @@ -1,278 +0,0 @@ -package storegateway - -import ( - "context" - "testing" - "time" - - "github.com/go-kit/kit/log" - "github.com/oklog/ulid" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/thanos-io/thanos/pkg/block/metadata" - "github.com/thanos-io/thanos/pkg/extprom" - - "github.com/cortexproject/cortex/pkg/ring" - "github.com/cortexproject/cortex/pkg/ring/kv/consul" - cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" - "github.com/cortexproject/cortex/pkg/util/services" -) - -func TestShardingMetadataFilter(t *testing.T) { - // The following block IDs have been picked to have increasing hash values - // in order to simplify the tests. - block1 := ulid.MustNew(1, nil) // hash: 283204220 - block2 := ulid.MustNew(2, nil) // hash: 444110359 - block3 := ulid.MustNew(5, nil) // hash: 2931974232 - block4 := ulid.MustNew(6, nil) // hash: 3092880371 - numAllBlocks := 4 - - block1Hash := cortex_tsdb.HashBlockID(block1) - block2Hash := cortex_tsdb.HashBlockID(block2) - block3Hash := cortex_tsdb.HashBlockID(block3) - block4Hash := cortex_tsdb.HashBlockID(block4) - - tests := map[string]struct { - replicationFactor int - setupRing func(*ring.Desc) - expectedBlocks map[string][]ulid.ULID - }{ - "one ACTIVE instance in the ring with replication factor = 1": { - replicationFactor: 1, - setupRing: func(r *ring.Desc) { - r.AddIngester("instance-1", "127.0.0.1", "", []uint32{0}, ring.ACTIVE) - }, - expectedBlocks: map[string][]ulid.ULID{ - "127.0.0.1": {block1, block2, block3, block4}, - "127.0.0.2": {}, - }, - }, - "two ACTIVE instances in the ring with replication factor = 1": { - replicationFactor: 1, - setupRing: func(r *ring.Desc) { - r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) - r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE) - }, - expectedBlocks: map[string][]ulid.ULID{ - "127.0.0.1": {block1, block3}, - "127.0.0.2": {block2, block4}, - }, - }, - "one ACTIVE instance in the ring with replication factor = 2": { - replicationFactor: 2, - setupRing: func(r *ring.Desc) { - r.AddIngester("instance-1", "127.0.0.1", "", []uint32{0}, ring.ACTIVE) - }, - expectedBlocks: map[string][]ulid.ULID{ - "127.0.0.1": {block1, block2, block3, block4}, - "127.0.0.2": {}, - }, - }, - "two ACTIVE instances in the ring with replication factor = 2": { - replicationFactor: 2, - setupRing: func(r *ring.Desc) { - r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) - r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE) - }, - expectedBlocks: map[string][]ulid.ULID{ - "127.0.0.1": {block1, block2, block3, block4}, - "127.0.0.2": {block1, block2, block3, block4}, - }, - }, - "multiple ACTIVE instances in the ring with replication factor = 2": { - replicationFactor: 2, - setupRing: func(r *ring.Desc) { - r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) - r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) - r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block4Hash + 1}, ring.ACTIVE) - }, - expectedBlocks: map[string][]ulid.ULID{ - "127.0.0.1": {block1, block3 /* replicated: */, block2, block4}, - "127.0.0.2": {block2 /* replicated: */, block1}, - "127.0.0.3": {block4 /* replicated: */, block3}, - }, - }, - "one unhealthy instance in the ring with replication factor = 1": { - replicationFactor: 1, - setupRing: func(r *ring.Desc) { - r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) - r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) - - r.Ingesters["instance-3"] = ring.IngesterDesc{ - Addr: "127.0.0.3", - Timestamp: time.Now().Add(-time.Hour).Unix(), - State: ring.ACTIVE, - Tokens: []uint32{block4Hash + 1}, - } - }, - expectedBlocks: map[string][]ulid.ULID{ - // No shard has the blocks of the unhealthy instance. - "127.0.0.1": {block1, block3}, - "127.0.0.2": {block2}, - "127.0.0.3": {}, - }, - }, - "one unhealthy instance in the ring with replication factor = 2": { - replicationFactor: 2, - setupRing: func(r *ring.Desc) { - r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) - r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) - - r.Ingesters["instance-3"] = ring.IngesterDesc{ - Addr: "127.0.0.3", - Timestamp: time.Now().Add(-time.Hour).Unix(), - State: ring.ACTIVE, - Tokens: []uint32{block4Hash + 1}, - } - }, - expectedBlocks: map[string][]ulid.ULID{ - "127.0.0.1": {block1, block3 /* replicated: */, block2, block4}, - "127.0.0.2": {block2 /* replicated: */, block1}, - "127.0.0.3": {}, - }, - }, - "two unhealthy instances in the ring with replication factor = 2": { - replicationFactor: 2, - setupRing: func(r *ring.Desc) { - r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) - - r.Ingesters["instance-2"] = ring.IngesterDesc{ - Addr: "127.0.0.2", - Timestamp: time.Now().Add(-time.Hour).Unix(), - State: ring.ACTIVE, - Tokens: []uint32{block2Hash + 1, block3Hash + 1}, - } - - r.Ingesters["instance-3"] = ring.IngesterDesc{ - Addr: "127.0.0.3", - Timestamp: time.Now().Add(-time.Hour).Unix(), - State: ring.ACTIVE, - Tokens: []uint32{block4Hash + 1}, - } - }, - expectedBlocks: map[string][]ulid.ULID{ - // There may be some blocks missing depending if there are shared blocks - // between the two unhealthy nodes. - "127.0.0.1": {block1 /* replicated: */, block4}, - "127.0.0.2": {}, - "127.0.0.3": {}, - }, - }, - "two unhealthy instances in the ring with replication factor = 3": { - replicationFactor: 3, - setupRing: func(r *ring.Desc) { - r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) - r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) - - r.Ingesters["instance-3"] = ring.IngesterDesc{ - Addr: "127.0.0.3", - Timestamp: time.Now().Add(-time.Hour).Unix(), - State: ring.ACTIVE, - Tokens: []uint32{block3Hash + 1}, - } - - r.Ingesters["instance-4"] = ring.IngesterDesc{ - Addr: "127.0.0.4", - Timestamp: time.Now().Add(-time.Hour).Unix(), - State: ring.ACTIVE, - Tokens: []uint32{block4Hash + 1}, - } - }, - expectedBlocks: map[string][]ulid.ULID{ - // There may be some blocks missing depending if there are shared blocks - // between the two unhealthy nodes. - "127.0.0.1": {block1 /* replicated: */, block3, block4}, - "127.0.0.2": {block2 /* replicated: */, block1, block4}, - "127.0.0.3": {}, - "127.0.0.4": {}, - }, - }, - "LEAVING instance in the ring should continue to keep its shard blocks but they should also be replicated to another instance": { - replicationFactor: 1, - setupRing: func(r *ring.Desc) { - r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) - r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) - r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block4Hash + 1}, ring.LEAVING) - }, - expectedBlocks: map[string][]ulid.ULID{ - "127.0.0.1": {block1, block3 /* replicated: */, block4}, - "127.0.0.2": {block2}, - "127.0.0.3": {block4}, - }, - }, - "JOINING instance in the ring should get its shard blocks but they should also be replicated to another instance": { - replicationFactor: 1, - setupRing: func(r *ring.Desc) { - r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) - r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) - r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block4Hash + 1}, ring.JOINING) - }, - expectedBlocks: map[string][]ulid.ULID{ - "127.0.0.1": {block1, block3 /* replicated: */, block4}, - "127.0.0.2": {block2}, - "127.0.0.3": {block4}, - }, - }, - } - - for testName, testData := range tests { - testName := testName - testData := testData - - t.Run(testName, func(t *testing.T) { - t.Parallel() - - ctx := context.Background() - store := consul.NewInMemoryClient(ring.GetCodec()) - - // Initialize the ring state. - require.NoError(t, store.CAS(ctx, "test", func(in interface{}) (interface{}, bool, error) { - d := ring.NewDesc() - testData.setupRing(d) - return d, true, nil - })) - - cfg := ring.Config{ - ReplicationFactor: testData.replicationFactor, - HeartbeatTimeout: time.Minute, - } - - r, err := ring.NewWithStoreClientAndStrategy(cfg, "test", "test", store, &BlocksReplicationStrategy{}) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(ctx, r)) - defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck - - // Wait until the ring client has synced. - require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE)) - - for instanceAddr, expectedBlocks := range testData.expectedBlocks { - filter := NewShardingMetadataFilter(r, instanceAddr, log.NewNopLogger()) - synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}) - synced.WithLabelValues(shardExcludedMeta).Set(0) - - metas := map[ulid.ULID]*metadata.Meta{ - block1: {}, - block2: {}, - block3: {}, - block4: {}, - } - - err = filter.Filter(ctx, metas, synced) - require.NoError(t, err) - - var actualBlocks []ulid.ULID - for id := range metas { - actualBlocks = append(actualBlocks, id) - } - - assert.ElementsMatch(t, expectedBlocks, actualBlocks) - - // Assert on the metric used to keep track of the blocks filtered out. - synced.Submit() - assert.Equal(t, float64(numAllBlocks-len(testData.expectedBlocks[instanceAddr])), testutil.ToFloat64(synced)) - } - }) - } -} diff --git a/pkg/storegateway/sharding_strategy.go b/pkg/storegateway/sharding_strategy.go new file mode 100644 index 00000000000..068c914a8dc --- /dev/null +++ b/pkg/storegateway/sharding_strategy.go @@ -0,0 +1,201 @@ +package storegateway + +import ( + "context" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/objstore" + + "github.com/cortexproject/cortex/pkg/ring" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" +) + +const ( + shardExcludedMeta = "shard-excluded" +) + +type ShardingStrategy interface { + // FilterUsers whose blocks should be loaded by the store-gateway. Returns the list of user IDs + // that should be synced by the store-gateway. + FilterUsers(ctx context.Context, userIDs []string) []string + + // FilterBlocks that should be loaded by the store-gateway. + FilterBlocks(ctx context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error +} + +// ShardingLimits is the interface that should be implemented by the limits provider, +// limiting the scope of the limits to the ones required by sharding strategies. +type ShardingLimits interface { + StoreGatewayTenantShardSize(userID string) int +} + +// NoShardingStrategy is a no-op strategy. When this strategy is used, no tenant/block is filtered out. +type NoShardingStrategy struct{} + +func NewNoShardingStrategy() *NoShardingStrategy { + return &NoShardingStrategy{} +} + +func (s *NoShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { + return userIDs +} + +func (s *NoShardingStrategy) FilterBlocks(_ context.Context, _ string, _ map[ulid.ULID]*metadata.Meta, _ *extprom.TxGaugeVec) error { + return nil +} + +// DefaultShardingStrategy is a sharding strategy based on the hash ring formed by store-gateways. +// Not go-routine safe. +type DefaultShardingStrategy struct { + r *ring.Ring + instanceAddr string + logger log.Logger +} + +// NewDefaultShardingStrategy creates DefaultShardingStrategy. +func NewDefaultShardingStrategy(r *ring.Ring, instanceAddr string, logger log.Logger) *DefaultShardingStrategy { + return &DefaultShardingStrategy{ + r: r, + instanceAddr: instanceAddr, + logger: logger, + } +} + +// FilterUsers implements ShardingStrategy. +func (s *DefaultShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { + return userIDs +} + +// FilterBlocks implements ShardingStrategy. +func (s *DefaultShardingStrategy) FilterBlocks(_ context.Context, _ string, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error { + filterBlocksByRingSharding(s.r, s.instanceAddr, metas, synced, s.logger) + return nil +} + +// ShuffleShardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways, +// where each tenant blocks are sharded across a subset of store-gateway instances. +type ShuffleShardingStrategy struct { + r *ring.Ring + instanceID string + instanceAddr string + limits ShardingLimits + logger log.Logger +} + +// NewShuffleShardingStrategy makes a new ShuffleShardingStrategy. +func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger) *ShuffleShardingStrategy { + return &ShuffleShardingStrategy{ + r: r, + instanceID: instanceID, + instanceAddr: instanceAddr, + limits: limits, + logger: logger, + } +} + +// FilterUsers implements ShardingStrategy. +func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string { + var filteredIDs []string + + for _, userID := range userIDs { + subRing := GetShuffleShardingSubring(s.r, userID, s.limits) + + // Include the user only if it belongs to this store-gateway shard. + if subRing.HasInstance(s.instanceID) { + filteredIDs = append(filteredIDs, userID) + } + } + + return filteredIDs +} + +// FilterBlocks implements ShardingStrategy. +func (s *ShuffleShardingStrategy) FilterBlocks(_ context.Context, userID string, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error { + subRing := GetShuffleShardingSubring(s.r, userID, s.limits) + filterBlocksByRingSharding(subRing, s.instanceAddr, metas, synced, s.logger) + return nil +} + +func filterBlocksByRingSharding(r ring.ReadRing, instanceAddr string, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec, logger log.Logger) { + // Buffer internally used by the ring (give extra room for a JOINING + LEAVING instance). + buf := make([]ring.IngesterDesc, 0, r.ReplicationFactor()+2) + + for blockID := range metas { + key := cortex_tsdb.HashBlockID(blockID) + set, err := r.Get(key, ring.BlocksSync, buf) + + // If there are no healthy instances in the replication set or + // the replication set for this block doesn't include this instance + // then we filter it out. + if err != nil || !set.Includes(instanceAddr) { + if err != nil { + level.Warn(logger).Log("msg", "excluded block because failed to get replication set", "block", blockID.String(), "err", err) + } + + synced.WithLabelValues(shardExcludedMeta).Inc() + delete(metas, blockID) + } + } +} + +// GetShuffleShardingSubring returns the subring to be used for a given user. This function +// should be used both by store-gateway and querier in order to guarantee the same logic is used. +func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLimits) ring.ReadRing { + shardSize := limits.StoreGatewayTenantShardSize(userID) + + // A shard size of 0 means shuffle sharding is disabled for this specific user, + // so we just return the full ring so that blocks will be sharded across all store-gateways. + if shardSize <= 0 { + return ring + } + + return ring.Subring(cortex_tsdb.HashTenantID(userID), shardSize) +} + +type shardingMetadataFilterAdapter struct { + userID string + strategy ShardingStrategy +} + +func NewShardingMetadataFilterAdapter(userID string, strategy ShardingStrategy) block.MetadataFilter { + return &shardingMetadataFilterAdapter{ + userID: userID, + strategy: strategy, + } +} + +// Filter implements block.MetadataFilter. +func (a *shardingMetadataFilterAdapter) Filter(ctx context.Context, metas map[ulid.ULID]*metadata.Meta, synced *extprom.TxGaugeVec) error { + return a.strategy.FilterBlocks(ctx, a.userID, metas, synced) +} + +type shardingBucketReaderAdapter struct { + objstore.InstrumentedBucketReader + + userID string + strategy ShardingStrategy +} + +func NewShardingBucketReaderAdapter(userID string, strategy ShardingStrategy, wrapped objstore.InstrumentedBucketReader) objstore.InstrumentedBucketReader { + return &shardingBucketReaderAdapter{ + InstrumentedBucketReader: wrapped, + userID: userID, + strategy: strategy, + } +} + +// Iter implements objstore.BucketReader. +func (a *shardingBucketReaderAdapter) Iter(ctx context.Context, dir string, f func(string) error) error { + // Skip iterating the bucket if the tenant doesn't belong to the shard. From the caller + // perspective, this will look like the tenant has no blocks in the storage. + if len(a.strategy.FilterUsers(ctx, []string{a.userID})) == 0 { + return nil + } + + return a.InstrumentedBucketReader.Iter(ctx, dir, f) +} diff --git a/pkg/storegateway/sharding_strategy_test.go b/pkg/storegateway/sharding_strategy_test.go new file mode 100644 index 00000000000..9ab039e8249 --- /dev/null +++ b/pkg/storegateway/sharding_strategy_test.go @@ -0,0 +1,651 @@ +package storegateway + +import ( + "context" + "testing" + "time" + + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/extprom" + + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util/services" +) + +func TestDefaultShardingStrategy(t *testing.T) { + // The following block IDs have been picked to have increasing hash values + // in order to simplify the tests. + block1 := ulid.MustNew(1, nil) // hash: 283204220 + block2 := ulid.MustNew(2, nil) // hash: 444110359 + block3 := ulid.MustNew(5, nil) // hash: 2931974232 + block4 := ulid.MustNew(6, nil) // hash: 3092880371 + numAllBlocks := 4 + + block1Hash := cortex_tsdb.HashBlockID(block1) + block2Hash := cortex_tsdb.HashBlockID(block2) + block3Hash := cortex_tsdb.HashBlockID(block3) + block4Hash := cortex_tsdb.HashBlockID(block4) + + tests := map[string]struct { + replicationFactor int + setupRing func(*ring.Desc) + expectedBlocks map[string][]ulid.ULID + }{ + "one ACTIVE instance in the ring with replication factor = 1": { + replicationFactor: 1, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{0}, ring.ACTIVE) + }, + expectedBlocks: map[string][]ulid.ULID{ + "127.0.0.1": {block1, block2, block3, block4}, + "127.0.0.2": {}, + }, + }, + "two ACTIVE instances in the ring with replication factor = 1": { + replicationFactor: 1, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE) + }, + expectedBlocks: map[string][]ulid.ULID{ + "127.0.0.1": {block1, block3}, + "127.0.0.2": {block2, block4}, + }, + }, + "one ACTIVE instance in the ring with replication factor = 2": { + replicationFactor: 2, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{0}, ring.ACTIVE) + }, + expectedBlocks: map[string][]ulid.ULID{ + "127.0.0.1": {block1, block2, block3, block4}, + "127.0.0.2": {}, + }, + }, + "two ACTIVE instances in the ring with replication factor = 2": { + replicationFactor: 2, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE) + }, + expectedBlocks: map[string][]ulid.ULID{ + "127.0.0.1": {block1, block2, block3, block4}, + "127.0.0.2": {block1, block2, block3, block4}, + }, + }, + "multiple ACTIVE instances in the ring with replication factor = 2": { + replicationFactor: 2, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block4Hash + 1}, ring.ACTIVE) + }, + expectedBlocks: map[string][]ulid.ULID{ + "127.0.0.1": {block1, block3 /* replicated: */, block2, block4}, + "127.0.0.2": {block2 /* replicated: */, block1}, + "127.0.0.3": {block4 /* replicated: */, block3}, + }, + }, + "one unhealthy instance in the ring with replication factor = 1": { + replicationFactor: 1, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) + + r.Ingesters["instance-3"] = ring.IngesterDesc{ + Addr: "127.0.0.3", + Timestamp: time.Now().Add(-time.Hour).Unix(), + State: ring.ACTIVE, + Tokens: []uint32{block4Hash + 1}, + } + }, + expectedBlocks: map[string][]ulid.ULID{ + // No shard has the blocks of the unhealthy instance. + "127.0.0.1": {block1, block3}, + "127.0.0.2": {block2}, + "127.0.0.3": {}, + }, + }, + "one unhealthy instance in the ring with replication factor = 2": { + replicationFactor: 2, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) + + r.Ingesters["instance-3"] = ring.IngesterDesc{ + Addr: "127.0.0.3", + Timestamp: time.Now().Add(-time.Hour).Unix(), + State: ring.ACTIVE, + Tokens: []uint32{block4Hash + 1}, + } + }, + expectedBlocks: map[string][]ulid.ULID{ + "127.0.0.1": {block1, block3 /* replicated: */, block2, block4}, + "127.0.0.2": {block2 /* replicated: */, block1}, + "127.0.0.3": {}, + }, + }, + "two unhealthy instances in the ring with replication factor = 2": { + replicationFactor: 2, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) + + r.Ingesters["instance-2"] = ring.IngesterDesc{ + Addr: "127.0.0.2", + Timestamp: time.Now().Add(-time.Hour).Unix(), + State: ring.ACTIVE, + Tokens: []uint32{block2Hash + 1, block3Hash + 1}, + } + + r.Ingesters["instance-3"] = ring.IngesterDesc{ + Addr: "127.0.0.3", + Timestamp: time.Now().Add(-time.Hour).Unix(), + State: ring.ACTIVE, + Tokens: []uint32{block4Hash + 1}, + } + }, + expectedBlocks: map[string][]ulid.ULID{ + // There may be some blocks missing depending if there are shared blocks + // between the two unhealthy nodes. + "127.0.0.1": {block1 /* replicated: */, block4}, + "127.0.0.2": {}, + "127.0.0.3": {}, + }, + }, + "two unhealthy instances in the ring with replication factor = 3": { + replicationFactor: 3, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) + + r.Ingesters["instance-3"] = ring.IngesterDesc{ + Addr: "127.0.0.3", + Timestamp: time.Now().Add(-time.Hour).Unix(), + State: ring.ACTIVE, + Tokens: []uint32{block3Hash + 1}, + } + + r.Ingesters["instance-4"] = ring.IngesterDesc{ + Addr: "127.0.0.4", + Timestamp: time.Now().Add(-time.Hour).Unix(), + State: ring.ACTIVE, + Tokens: []uint32{block4Hash + 1}, + } + }, + expectedBlocks: map[string][]ulid.ULID{ + // There may be some blocks missing depending if there are shared blocks + // between the two unhealthy nodes. + "127.0.0.1": {block1 /* replicated: */, block3, block4}, + "127.0.0.2": {block2 /* replicated: */, block1, block4}, + "127.0.0.3": {}, + "127.0.0.4": {}, + }, + }, + "LEAVING instance in the ring should continue to keep its shard blocks but they should also be replicated to another instance": { + replicationFactor: 1, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block4Hash + 1}, ring.LEAVING) + }, + expectedBlocks: map[string][]ulid.ULID{ + "127.0.0.1": {block1, block3 /* replicated: */, block4}, + "127.0.0.2": {block2}, + "127.0.0.3": {block4}, + }, + }, + "JOINING instance in the ring should get its shard blocks but they should also be replicated to another instance": { + replicationFactor: 1, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block4Hash + 1}, ring.JOINING) + }, + expectedBlocks: map[string][]ulid.ULID{ + "127.0.0.1": {block1, block3 /* replicated: */, block4}, + "127.0.0.2": {block2}, + "127.0.0.3": {block4}, + }, + }, + } + + for testName, testData := range tests { + testName := testName + testData := testData + + t.Run(testName, func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + store := consul.NewInMemoryClient(ring.GetCodec()) + + // Initialize the ring state. + require.NoError(t, store.CAS(ctx, "test", func(in interface{}) (interface{}, bool, error) { + d := ring.NewDesc() + testData.setupRing(d) + return d, true, nil + })) + + cfg := ring.Config{ + ReplicationFactor: testData.replicationFactor, + HeartbeatTimeout: time.Minute, + } + + r, err := ring.NewWithStoreClientAndStrategy(cfg, "test", "test", store, &BlocksReplicationStrategy{}) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, r)) + defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck + + // Wait until the ring client has synced. + require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE)) + + for instanceAddr, expectedBlocks := range testData.expectedBlocks { + filter := NewDefaultShardingStrategy(r, instanceAddr, log.NewNopLogger()) + synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}) + synced.WithLabelValues(shardExcludedMeta).Set(0) + + metas := map[ulid.ULID]*metadata.Meta{ + block1: {}, + block2: {}, + block3: {}, + block4: {}, + } + + err = filter.FilterBlocks(ctx, "user-1", metas, synced) + require.NoError(t, err) + + var actualBlocks []ulid.ULID + for id := range metas { + actualBlocks = append(actualBlocks, id) + } + + assert.ElementsMatch(t, expectedBlocks, actualBlocks) + + // Assert on the metric used to keep track of the blocks filtered out. + synced.Submit() + assert.Equal(t, float64(numAllBlocks-len(testData.expectedBlocks[instanceAddr])), testutil.ToFloat64(synced)) + } + }) + } +} + +func TestShuffleShardingStrategy(t *testing.T) { + // The following block IDs have been picked to have increasing hash values + // in order to simplify the tests. + block1 := ulid.MustNew(1, nil) // hash: 283204220 + block2 := ulid.MustNew(2, nil) // hash: 444110359 + block3 := ulid.MustNew(5, nil) // hash: 2931974232 + block4 := ulid.MustNew(6, nil) // hash: 3092880371 + numAllBlocks := 4 + + block1Hash := cortex_tsdb.HashBlockID(block1) + block2Hash := cortex_tsdb.HashBlockID(block2) + block3Hash := cortex_tsdb.HashBlockID(block3) + block4Hash := cortex_tsdb.HashBlockID(block4) + + // Ensure the user ID we use belongs to the instances holding the token for the block 1 + // (it's expected by the assertions below). + userID := "user-A" + require.LessOrEqual(t, cortex_tsdb.HashTenantID(userID), block1Hash) + + type usersExpectation struct { + instanceID string + instanceAddr string + users []string + } + + type blocksExpectation struct { + instanceID string + instanceAddr string + blocks []ulid.ULID + } + + tests := map[string]struct { + replicationFactor int + limits ShardingLimits + setupRing func(*ring.Desc) + expectedUsers []usersExpectation + expectedBlocks []blocksExpectation + }{ + "one ACTIVE instance in the ring with RF = 1 and SS = 1": { + replicationFactor: 1, + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{0}, ring.ACTIVE) + }, + expectedUsers: []usersExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: nil}, + }, + expectedBlocks: []blocksExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block2, block3, block4}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{}}, + }, + }, + "one ACTIVE instance in the ring with RF = 2 and SS = 1 (should still sync blocks on the only available instance)": { + replicationFactor: 1, + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{0}, ring.ACTIVE) + }, + expectedUsers: []usersExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: nil}, + }, + expectedBlocks: []blocksExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block2, block3, block4}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{}}, + }, + }, + "one ACTIVE instance in the ring with RF = 2 and SS = 2 (should still sync blocks on the only available instance)": { + replicationFactor: 1, + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{0}, ring.ACTIVE) + }, + expectedUsers: []usersExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: nil}, + }, + expectedBlocks: []blocksExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block2, block3, block4}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{}}, + }, + }, + "two ACTIVE instances in the ring with RF = 1 and SS = 1 (should sync blocks on 1 instance because of the shard size)": { + replicationFactor: 1, + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE) + }, + expectedUsers: []usersExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: nil}, + }, + expectedBlocks: []blocksExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block2, block3, block4}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{}}, + }, + }, + "two ACTIVE instances in the ring with RF = 1 and SS = 2 (should sync blocks on 2 instances because of the shard size)": { + replicationFactor: 1, + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE) + }, + expectedUsers: []usersExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string{userID}}, + }, + expectedBlocks: []blocksExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block3}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{block2, block4}}, + }, + }, + "two ACTIVE instances in the ring with RF = 2 and SS = 1 (should sync blocks on 1 instance because of the shard size)": { + replicationFactor: 2, + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 1}, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE) + }, + expectedUsers: []usersExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: nil}, + }, + expectedBlocks: []blocksExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block2, block3, block4}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{}}, + }, + }, + "two ACTIVE instances in the ring with RF = 2 and SS = 2 (should sync all blocks on 2 instances)": { + replicationFactor: 2, + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE) + }, + expectedUsers: []usersExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string{userID}}, + }, + expectedBlocks: []blocksExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block2, block3, block4}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{block1, block2, block3, block4}}, + }, + }, + "multiple ACTIVE instances in the ring with RF = 2 and SS = 3": { + replicationFactor: 2, + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 3}, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block4Hash + 1}, ring.ACTIVE) + }, + expectedUsers: []usersExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string{userID}}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", users: []string{userID}}, + }, + expectedBlocks: []blocksExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block3 /* replicated: */, block2, block4}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{block2 /* replicated: */, block1}}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{block4 /* replicated: */, block3}}, + }, + }, + "one unhealthy instance in the ring with RF = 1 and SS = 3": { + replicationFactor: 1, + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 3}, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) + + r.Ingesters["instance-3"] = ring.IngesterDesc{ + Addr: "127.0.0.3", + Timestamp: time.Now().Add(-time.Hour).Unix(), + State: ring.ACTIVE, + Tokens: []uint32{block4Hash + 1}, + } + }, + expectedUsers: []usersExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string{userID}}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", users: []string{userID}}, + }, + expectedBlocks: []blocksExpectation{ + // No shard has the blocks of the unhealthy instance. + {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block3}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{block2}}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{}}, + }, + }, + "one unhealthy instance in the ring with RF = 2 and SS = 3": { + replicationFactor: 2, + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 3}, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.ACTIVE) + + r.Ingesters["instance-3"] = ring.IngesterDesc{ + Addr: "127.0.0.3", + Timestamp: time.Now().Add(-time.Hour).Unix(), + State: ring.ACTIVE, + Tokens: []uint32{block4Hash + 1}, + } + }, + expectedUsers: []usersExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string{userID}}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", users: []string{userID}}, + }, + expectedBlocks: []blocksExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block3 /* replicated: */, block2, block4}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{block2 /* replicated: */, block1}}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{}}, + }, + }, + "one unhealthy instance in the ring with RF = 2 and SS = 2": { + replicationFactor: 2, + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block4Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block3Hash + 1}, ring.ACTIVE) + + r.Ingesters["instance-2"] = ring.IngesterDesc{ + Addr: "127.0.0.2", + Timestamp: time.Now().Add(-time.Hour).Unix(), + State: ring.ACTIVE, + Tokens: []uint32{block2Hash + 1}, + } + }, + expectedUsers: []usersExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string{userID}}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", users: nil}, + }, + expectedBlocks: []blocksExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block2, block3, block4}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{ /* no blocks because unhealthy */ }}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{ /* no blocks because not belonging to the shard */ }}, + }, + }, + "LEAVING instance in the ring should continue to keep its shard blocks but they should also be replicated to another instance": { + replicationFactor: 1, + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.LEAVING) + r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block4Hash + 1}, ring.ACTIVE) + }, + expectedUsers: []usersExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string{userID}}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", users: nil}, + }, + expectedBlocks: []blocksExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block3, block4 /* replicated: */, block2}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{block2}}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{ /* no blocks because not belonging to the shard */ }}, + }, + }, + "JOINING instance in the ring should get its shard blocks but they should also be replicated to another instance": { + replicationFactor: 1, + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 2}, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1}, ring.JOINING) + r.AddIngester("instance-3", "127.0.0.3", "", []uint32{block4Hash + 1}, ring.ACTIVE) + }, + expectedUsers: []usersExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string{userID}}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", users: nil}, + }, + expectedBlocks: []blocksExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block3, block4 /* replicated: */, block2}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{block2}}, + {instanceID: "instance-3", instanceAddr: "127.0.0.3", blocks: []ulid.ULID{ /* no blocks because not belonging to the shard */ }}, + }, + }, + "SS = 0 disables shuffle sharding": { + replicationFactor: 1, + limits: &shardingLimitsMock{storeGatewayTenantShardSize: 0}, + setupRing: func(r *ring.Desc) { + r.AddIngester("instance-1", "127.0.0.1", "", []uint32{block1Hash + 1, block3Hash + 1}, ring.ACTIVE) + r.AddIngester("instance-2", "127.0.0.2", "", []uint32{block2Hash + 1, block4Hash + 1}, ring.ACTIVE) + }, + expectedUsers: []usersExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", users: []string{userID}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", users: []string{userID}}, + }, + expectedBlocks: []blocksExpectation{ + {instanceID: "instance-1", instanceAddr: "127.0.0.1", blocks: []ulid.ULID{block1, block3}}, + {instanceID: "instance-2", instanceAddr: "127.0.0.2", blocks: []ulid.ULID{block2, block4}}, + }, + }, + } + + for testName, testData := range tests { + testName := testName + testData := testData + + t.Run(testName, func(t *testing.T) { + t.Parallel() + + ctx := context.Background() + store := consul.NewInMemoryClient(ring.GetCodec()) + + // Initialize the ring state. + require.NoError(t, store.CAS(ctx, "test", func(in interface{}) (interface{}, bool, error) { + d := ring.NewDesc() + testData.setupRing(d) + return d, true, nil + })) + + cfg := ring.Config{ + ReplicationFactor: testData.replicationFactor, + HeartbeatTimeout: time.Minute, + } + + r, err := ring.NewWithStoreClientAndStrategy(cfg, "test", "test", store, &BlocksReplicationStrategy{}) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, r)) + defer services.StopAndAwaitTerminated(ctx, r) //nolint:errcheck + + // Wait until the ring client has synced. + require.NoError(t, ring.WaitInstanceState(ctx, r, "instance-1", ring.ACTIVE)) + + // Assert on filter users. + for _, expected := range testData.expectedUsers { + filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger()) + assert.Equal(t, expected.users, filter.FilterUsers(ctx, []string{userID})) + } + + // Assert on filter blocks. + for _, expected := range testData.expectedBlocks { + filter := NewShuffleShardingStrategy(r, expected.instanceID, expected.instanceAddr, testData.limits, log.NewNopLogger()) + synced := extprom.NewTxGaugeVec(nil, prometheus.GaugeOpts{}, []string{"state"}) + synced.WithLabelValues(shardExcludedMeta).Set(0) + + metas := map[ulid.ULID]*metadata.Meta{ + block1: {}, + block2: {}, + block3: {}, + block4: {}, + } + + err = filter.FilterBlocks(ctx, userID, metas, synced) + require.NoError(t, err) + + var actualBlocks []ulid.ULID + for id := range metas { + actualBlocks = append(actualBlocks, id) + } + + assert.ElementsMatch(t, expected.blocks, actualBlocks) + + // Assert on the metric used to keep track of the blocks filtered out. + synced.Submit() + assert.Equal(t, float64(numAllBlocks-len(expected.blocks)), testutil.ToFloat64(synced)) + } + }) + } +} + +type shardingLimitsMock struct { + storeGatewayTenantShardSize int +} + +func (m *shardingLimitsMock) StoreGatewayTenantShardSize(_ string) int { + return m.storeGatewayTenantShardSize +} diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 2a7a38aa886..e88bb47b2b8 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -62,6 +62,9 @@ type Limits struct { CardinalityLimit int `yaml:"cardinality_limit"` MaxCacheFreshness time.Duration `yaml:"max_cache_freshness"` + // Store-gateway. + StoreGatewayTenantShardSize int `yaml:"store_gateway_tenant_shard_size"` + // Config for overrides, convenient if it goes here. [Deprecated in favor of RuntimeConfig flag in cortex.Config] PerTenantOverrideConfig string `yaml:"per_tenant_override_config"` PerTenantOverridePeriod time.Duration `yaml:"per_tenant_override_period"` @@ -108,6 +111,9 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides. [deprecated, use -runtime-config.file instead]") f.DurationVar(&l.PerTenantOverridePeriod, "limits.per-user-override-period", 10*time.Second, "Period with which to reload the overrides. [deprecated, use -runtime-config.reload-period instead]") + + // Store-gateway. + f.IntVar(&l.StoreGatewayTenantShardSize, "experimental.store-gateway.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used. Must be set when the store-gateway sharding is enabled with the shuffle-sharding strategy. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.") } // Validate the limits config and returns an error if the validation @@ -340,6 +346,11 @@ func (o *Overrides) SubringSize(userID string) int { return o.getOverridesForUser(userID).SubringSize } +// StoreGatewayTenantShardSize returns the size of the store-gateway shard size for a given user. +func (o *Overrides) StoreGatewayTenantShardSize(userID string) int { + return o.getOverridesForUser(userID).StoreGatewayTenantShardSize +} + func (o *Overrides) getOverridesForUser(userID string) *Limits { if o.tenantLimits != nil { l := o.tenantLimits(userID)