Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
* [CHANGE] TLS server validation is now enabled by default, a new parameter `tls_insecure_skip_verify` can be set to true to skip validation optionally. #3030
* [CHANGE] `cortex_ruler_config_update_failures_total` has been removed in favor of `cortex_ruler_config_last_reload_successful`. #3056
* [FEATURE] Logging of the source IP passed along by a reverse proxy is now supported by setting the `-server.log-source-ips-enabled`. For non standard headers the settings `-server.log-source-ips-header` and `-server.log-source-ips-regex` can be used. #2985
* [FEATURE] Experimental blocks storage: added shuffle sharding support to store-gateway blocks sharding. Added the following additional metrics to store-gateway: #3069
* `cortex_bucket_stores_tenants_discovered`
* `cortex_bucket_stores_tenants_synced`
* [ENHANCEMENT] Add support for azure storage in China, German and US Government environments. #2988
* [ENHANCEMENT] Query-tee: added a small tolerance to floating point sample values comparison. #2994
* [ENHANCEMENT] Query-tee: add support for doing a passthrough of requests to preferred backend for unregistered routes #3018
Expand Down
18 changes: 18 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ Blocks can be replicated across multiple store-gateway instances based on a repl

This feature can be enabled via `-experimental.store-gateway.sharding-enabled=true` and requires the backend [hash ring](../architecture.md#the-hash-ring) to be configured via `-experimental.store-gateway.sharding-ring.*` flags (or their respective YAML config options).

### Sharding strategies

The store-gateway supports two sharding strategies:

- `default`
- `shuffle-sharding`

The **`default`** sharding strategy spreads the blocks of each tenant across all store-gateway instances. It's the easiest form of sharding supported, but doesn't provide any workload isolation between different tenants.

The **`shuffle-sharding`** strategy spreads the blocks of a tenant across a subset of store-gateway instances. This way, the number of store-gateway instances loading blocks of a single tenant is limited and the blast radius of any issue that could be introduced by the ternant's workload is limited to its shard instances.

The shuffle sharding strategy can be enabled via `-experimental.store-gateway.sharding-strategy=shuffle-sharding` and requires the `-experimental.store-gateway.tenant-shard-size` flag (or their respective YAML config options) to be set to the default shard size, which is the default number of store-gateway instances each tenant should be sharded to. The shard size can then be overridden on a per-tenant basis setting the `store_gateway_tenant_shard_size` in the limits overrides.

### Auto-forget

When a store-gateway instance cleanly shutdowns, it automatically unregisters itself from the ring. However, in the event of a crash or node failure, the instance will not be unregistered from the ring, potentially leaving a spurious entry in the ring forever.
Expand Down Expand Up @@ -193,6 +206,11 @@ store_gateway:
# shutdown and restored at startup.
# CLI flag: -experimental.store-gateway.tokens-file-path
[tokens_file_path: <string> | default = ""]

# The sharding strategy to use. Supported values are: default,
# shuffle-sharding.
# CLI flag: -experimental.store-gateway.sharding-strategy
[sharding_strategy: <string> | default = "default"]
```

### `blocks_storage_config`
Expand Down
13 changes: 13 additions & 0 deletions docs/blocks-storage/store-gateway.template
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ Blocks can be replicated across multiple store-gateway instances based on a repl

This feature can be enabled via `-experimental.store-gateway.sharding-enabled=true` and requires the backend [hash ring](../architecture.md#the-hash-ring) to be configured via `-experimental.store-gateway.sharding-ring.*` flags (or their respective YAML config options).

### Sharding strategies

The store-gateway supports two sharding strategies:

- `default`
- `shuffle-sharding`

The **`default`** sharding strategy spreads the blocks of each tenant across all store-gateway instances. It's the easiest form of sharding supported, but doesn't provide any workload isolation between different tenants.

The **`shuffle-sharding`** strategy spreads the blocks of a tenant across a subset of store-gateway instances. This way, the number of store-gateway instances loading blocks of a single tenant is limited and the blast radius of any issue that could be introduced by the ternant's workload is limited to its shard instances.

The shuffle sharding strategy can be enabled via `-experimental.store-gateway.sharding-strategy=shuffle-sharding` and requires the `-experimental.store-gateway.tenant-shard-size` flag (or their respective YAML config options) to be set to the default shard size, which is the default number of store-gateway instances each tenant should be sharded to. The shard size can then be overridden on a per-tenant basis setting the `store_gateway_tenant_shard_size` in the limits overrides.

### Auto-forget

When a store-gateway instance cleanly shutdowns, it automatically unregisters itself from the ring. However, in the event of a crash or node failure, the instance will not be unregistered from the ring, potentially leaving a spurious entry in the ring forever.
Expand Down
11 changes: 11 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2802,6 +2802,13 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -frontend.max-cache-freshness
[max_cache_freshness: <duration> | default = 1m]

# The default tenant's shard size when the shuffle-sharding strategy is used.
# Must be set when the store-gateway sharding is enabled with the
# shuffle-sharding strategy. When this setting is specified in the per-tenant
# overrides, a value of 0 disables shuffle sharding for the tenant.
# CLI flag: -experimental.store-gateway.tenant-shard-size
[store_gateway_tenant_shard_size: <int> | default = 0]

# File name of per-user overrides. [deprecated, use -runtime-config.file
# instead]
# CLI flag: -limits.per-user-override-config
Expand Down Expand Up @@ -3604,6 +3611,10 @@ sharding_ring:
# shutdown and restored at startup.
# CLI flag: -experimental.store-gateway.tokens-file-path
[tokens_file_path: <string> | default = ""]

# The sharding strategy to use. Supported values are: default, shuffle-sharding.
# CLI flag: -experimental.store-gateway.sharding-strategy
[sharding_strategy: <string> | default = "default"]
```

### `purger_config`
Expand Down
53 changes: 35 additions & 18 deletions integration/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,37 @@ import (

func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
tests := map[string]struct {
blocksShardingEnabled bool
blocksShardingStrategy string // Empty means sharding is disabled.
tenantShardSize int
ingesterStreamingEnabled bool
indexCacheBackend string
}{
"blocks sharding enabled, ingester gRPC streaming disabled, inmemory index cache": {
blocksShardingEnabled: true,
ingesterStreamingEnabled: false,
indexCacheBackend: tsdb.IndexCacheBackendInMemory,
},
"blocks sharding enabled, ingester gRPC streaming enabled, inmemory index cache": {
blocksShardingEnabled: true,
ingesterStreamingEnabled: true,
indexCacheBackend: tsdb.IndexCacheBackendInMemory,
},
"blocks sharding disabled, ingester gRPC streaming disabled, memcached index cache": {
blocksShardingEnabled: false,
blocksShardingStrategy: "",
ingesterStreamingEnabled: false,
// Memcached index cache is required to avoid flaky tests when the blocks sharding is disabled
// because two different requests may hit two different store-gateways, so if the cache is not
// shared there's no guarantee we'll have a cache hit.
indexCacheBackend: tsdb.IndexCacheBackendMemcached,
},
"blocks sharding enabled, ingester gRPC streaming enabled, memcached index cache": {
blocksShardingEnabled: true,
"blocks default sharding, ingester gRPC streaming disabled, inmemory index cache": {
blocksShardingStrategy: "default",
ingesterStreamingEnabled: false,
indexCacheBackend: tsdb.IndexCacheBackendInMemory,
},
"blocks default sharding, ingester gRPC streaming enabled, inmemory index cache": {
blocksShardingStrategy: "default",
ingesterStreamingEnabled: true,
indexCacheBackend: tsdb.IndexCacheBackendInMemory,
},
"blocks default sharding, ingester gRPC streaming enabled, memcached index cache": {
blocksShardingStrategy: "default",
ingesterStreamingEnabled: true,
indexCacheBackend: tsdb.IndexCacheBackendMemcached,
},
"blocks shuffle sharding, ingester gRPC streaming enabled, memcached index cache": {
blocksShardingStrategy: "shuffle-sharding",
tenantShardSize: 1,
ingesterStreamingEnabled: true,
indexCacheBackend: tsdb.IndexCacheBackendMemcached,
},
Expand All @@ -70,7 +77,9 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
"-experimental.blocks-storage.bucket-store.sync-interval": "1s",
"-experimental.blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
"-experimental.blocks-storage.bucket-store.index-cache.backend": testCfg.indexCacheBackend,
"-experimental.store-gateway.sharding-enabled": strconv.FormatBool(testCfg.blocksShardingEnabled),
"-experimental.store-gateway.sharding-enabled": strconv.FormatBool(testCfg.blocksShardingStrategy != ""),
"-experimental.store-gateway.sharding-strategy": testCfg.blocksShardingStrategy,
"-experimental.store-gateway.tenant-shard-size": fmt.Sprintf("%d", testCfg.tenantShardSize),
"-querier.ingester-streaming": strconv.FormatBool(testCfg.ingesterStreamingEnabled),
})

Expand All @@ -92,7 +101,7 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
require.NoError(t, s.StartAndWaitReady(distributor, ingester, storeGateway1, storeGateway2))

// Start the querier with configuring store-gateway addresses if sharding is disabled.
if !testCfg.blocksShardingEnabled {
if testCfg.blocksShardingStrategy == "" {
flags = mergeFlags(flags, map[string]string{
"-experimental.querier.store-gateway-addresses": strings.Join([]string{storeGateway1.NetworkGRPCEndpoint(), storeGateway2.NetworkGRPCEndpoint()}, ","),
})
Expand All @@ -103,7 +112,7 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
// Wait until both the distributor and querier have updated the ring. The querier will also watch
// the store-gateway ring if blocks sharding is enabled.
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
if testCfg.blocksShardingEnabled {
if testCfg.blocksShardingStrategy != "" {
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(float64(512+(512*storeGateways.NumInstances()))), "cortex_ring_tokens_total"))
} else {
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
Expand Down Expand Up @@ -153,12 +162,20 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) {
// Wait until the store-gateway has synched the new uploaded blocks. When sharding is enabled
// we don't known which store-gateway instance will synch the blocks, so we need to wait on
// metrics extracted from all instances.
if testCfg.blocksShardingEnabled {
if testCfg.blocksShardingStrategy != "" {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2), "cortex_bucket_store_blocks_loaded"))
} else {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64(2*storeGateways.NumInstances())), "cortex_bucket_store_blocks_loaded"))
}

// Check how many tenants have been discovered and synced by store-gateways.
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64(1*storeGateways.NumInstances())), "cortex_bucket_stores_tenants_discovered"))
if testCfg.blocksShardingStrategy == "shuffle-sharding" {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64(1)), "cortex_bucket_stores_tenants_synced"))
} else {
require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64(1*storeGateways.NumInstances())), "cortex_bucket_stores_tenants_synced"))
}

// Query back the series (1 only in the storage, 1 only in the ingesters, 1 on both).
result, err := c.Query("series_1", series1Timestamp)
require.NoError(t, err)
Expand Down
5 changes: 4 additions & 1 deletion pkg/cortex/cortex.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@ func (c *Config) Validate(log log.Logger) error {
return errors.Wrap(err, "invalid query_range config")
}
if err := c.TableManager.Validate(); err != nil {
return errors.Wrap(err, "invalid table_manager config")
return errors.Wrap(err, "invalid table-manager config")
}
if err := c.StoreGateway.Validate(c.LimitsConfig); err != nil {
return errors.Wrap(err, "invalid store-gateway config")
}

if c.Storage.Engine == storage.StorageEngineBlocks && c.Querier.SecondStoreEngine != storage.StorageEngineChunks && len(c.Schema.Configs) > 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_balanced_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *blocksStoreBalancedSet) resolve(ctx context.Context) error {
return nil
}

func (s *blocksStoreBalancedSet) GetClientsFor(blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) {
func (s *blocksStoreBalancedSet) GetClientsFor(_ string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) {
addresses := s.dnsProvider.Addresses()
if len(addresses) == 0 {
return nil, fmt.Errorf("no address resolved for the store-gateway service addresses %s", strings.Join(s.serviceAddresses, ","))
Expand Down
5 changes: 2 additions & 3 deletions pkg/querier/blocks_store_balanced_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestBlocksStoreBalancedSet_GetClientsFor(t *testing.T) {
clientsCount := map[string]int{}

for i := 0; i < numGets; i++ {
clients, err := s.GetClientsFor([]ulid.ULID{block1}, map[ulid.ULID][]string{})
clients, err := s.GetClientsFor("", []ulid.ULID{block1}, map[ulid.ULID][]string{})
require.NoError(t, err)
require.Len(t, clients, 1)

Expand Down Expand Up @@ -138,13 +138,12 @@ func TestBlocksStoreBalancedSet_GetClientsFor_Exclude(t *testing.T) {
require.NoError(t, services.StartAndAwaitRunning(ctx, s))
defer services.StopAndAwaitTerminated(ctx, s) //nolint:errcheck

clients, err := s.GetClientsFor(testData.queryBlocks, testData.exclude)
clients, err := s.GetClientsFor("", testData.queryBlocks, testData.exclude)
assert.Equal(t, testData.expectedErr, err)

if testData.expectedErr == nil {
assert.Equal(t, testData.expectedClients, getStoreGatewayClientAddrs(clients))
}
})
}

}
7 changes: 4 additions & 3 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type BlocksStoreSet interface {
// GetClientsFor returns the store gateway clients that should be used to
// query the set of blocks in input. The exclude parameter is the map of
// blocks -> store-gateway addresses that should be excluded.
GetClientsFor(blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error)
GetClientsFor(userID string, blockIDs []ulid.ULID, exclude map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error)
}

// BlocksFinder is the interface used to find blocks for a given user and time range.
Expand All @@ -82,6 +82,7 @@ type BlocksStoreClient interface {
// BlocksStoreLimits is the interface that should be implemented by the limits provider.
type BlocksStoreLimits interface {
MaxChunksPerQuery(userID string) int
StoreGatewayTenantShardSize(userID string) int
}

type blocksStoreQueryableMetrics struct {
Expand Down Expand Up @@ -193,7 +194,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
reg.MustRegister(storesRing)
}

stores, err = newBlocksStoreReplicationSet(storesRing, querierCfg.StoreGatewayClient, logger, reg)
stores, err = newBlocksStoreReplicationSet(storesRing, gatewayCfg.ShardingStrategy, limits, querierCfg.StoreGatewayClient, logger, reg)
if err != nil {
return nil, errors.Wrap(err, "failed to create store set")
}
Expand Down Expand Up @@ -368,7 +369,7 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
for attempt := 1; attempt <= maxFetchSeriesAttempts; attempt++ {
// Find the set of store-gateway instances having the blocks. The exclude parameter is the
// map of blocks queried so far, with the list of store-gateway addresses for each block.
clients, err := q.stores.GetClientsFor(remainingBlocks, attemptedBlocks)
clients, err := q.stores.GetClientsFor(q.userID, remainingBlocks, attemptedBlocks)
if err != nil {
// If it's a retry and we get an error, it means there are no more store-gateways left
// from which running another attempt, so we're just stopping retrying.
Expand Down
9 changes: 7 additions & 2 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ type blocksStoreSetMock struct {
nextResult int
}

func (m *blocksStoreSetMock) GetClientsFor(_ []ulid.ULID, _ map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) {
func (m *blocksStoreSetMock) GetClientsFor(_ string, _ []ulid.ULID, _ map[ulid.ULID][]string) (map[BlocksStoreClient][]ulid.ULID, error) {
if m.nextResult >= len(m.mockedResponses) {
panic("not enough mocked results")
}
Expand Down Expand Up @@ -815,13 +815,18 @@ func (m *storeGatewaySeriesClientMock) Recv() (*storepb.SeriesResponse, error) {
}

type blocksStoreLimitsMock struct {
maxChunksPerQuery int
maxChunksPerQuery int
storeGatewayTenantShardSize int
}

func (m *blocksStoreLimitsMock) MaxChunksPerQuery(_ string) int {
return m.maxChunksPerQuery
}

func (m *blocksStoreLimitsMock) StoreGatewayTenantShardSize(userID string) int {
return m.storeGatewayTenantShardSize
}

func mockSeriesResponse(lbls labels.Labels, timeMillis int64, value float64) *storepb.SeriesResponse {
// Generate a chunk containing a single value (for simplicity).
chunk := chunkenc.NewXORChunk()
Expand Down
Loading