Skip to content

Commit 85a2161

Browse files
committed
Allow shard sizes to be percent of instances
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
1 parent 6669540 commit 85a2161

File tree

13 files changed

+111
-39
lines changed

13 files changed

+111
-39
lines changed

docs/configuration/config-file-reference.md

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2938,13 +2938,14 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
29382938

29392939
# Maximum number of queriers that can handle requests for a single tenant. If
29402940
# set to 0 or value higher than number of available queriers, *all* queriers
2941-
# will handle requests for the tenant. Each frontend (or query-scheduler, if
2942-
# used) will select the same set of queriers for the same tenant (given that all
2943-
# queriers are connected to all frontends / query-schedulers). This option only
2944-
# works with queriers connecting to the query-frontend / query-scheduler, not
2945-
# when using downstream URL.
2941+
# will handle requests for the tenant. If the value is < 1, it will be treated
2942+
# as a percentage and the gets a percentage of the total queriers. Each frontend
2943+
# (or query-scheduler, if used) will select the same set of queriers for the
2944+
# same tenant (given that all queriers are connected to all frontends /
2945+
# query-schedulers). This option only works with queriers connecting to the
2946+
# query-frontend / query-scheduler, not when using downstream URL.
29462947
# CLI flag: -frontend.max-queriers-per-tenant
2947-
[max_queriers_per_tenant: <int> | default = 0]
2948+
[max_queriers_per_tenant: <float> | default = 0]
29482949

29492950
# Maximum number of outstanding requests per tenant per request queue (either
29502951
# query frontend or query scheduler); requests beyond this error with HTTP 429.
@@ -2973,9 +2974,10 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
29732974
# The default tenant's shard size when the shuffle-sharding strategy is used.
29742975
# Must be set when the store-gateway sharding is enabled with the
29752976
# shuffle-sharding strategy. When this setting is specified in the per-tenant
2976-
# overrides, a value of 0 disables shuffle sharding for the tenant.
2977+
# overrides, a value of 0 disables shuffle sharding for the tenant. If the value
2978+
# is < 1 the shard size will be a percentage of the total store-gateways.
29772979
# CLI flag: -store-gateway.tenant-shard-size
2978-
[store_gateway_tenant_shard_size: <int> | default = 0]
2980+
[store_gateway_tenant_shard_size: <float> | default = 0]
29792981

29802982
# The maximum number of data bytes to download per gRPC request in Store
29812983
# Gateway, including Series/LabelNames/LabelValues requests. 0 to disable.

pkg/frontend/v1/frontend.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,18 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
4343

4444
type Limits interface {
4545
// Returns max queriers to use per tenant, or 0 if shuffle sharding is disabled.
46-
MaxQueriersPerUser(user string) int
46+
MaxQueriersPerUser(user string) float64
4747

4848
queue.Limits
4949
}
5050

5151
// MockLimits implements the Limits interface. Used in tests only.
5252
type MockLimits struct {
53-
Queriers int
53+
Queriers float64
5454
queue.MockLimits
5555
}
5656

57-
func (l MockLimits) MaxQueriersPerUser(_ string) int {
57+
func (l MockLimits) MaxQueriersPerUser(_ string) float64 {
5858
return l.Queriers
5959
}
6060

@@ -338,7 +338,7 @@ func (f *Frontend) queueRequest(ctx context.Context, req *request) error {
338338
req.queueSpan, _ = opentracing.StartSpanFromContext(ctx, "queued")
339339

340340
// aggregate the max queriers limit in the case of a multi tenant query
341-
maxQueriers := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, f.limits.MaxQueriersPerUser)
341+
maxQueriers := validation.SmallestPositiveNonZeroFloat64PerTenant(tenantIDs, f.limits.MaxQueriersPerUser)
342342

343343
joinedTenantID := tenant.JoinTenantIDs(tenantIDs)
344344
f.activeUsers.UpdateUserTimestamp(joinedTenantID, now)

pkg/querier/blocks_store_queryable.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ type BlocksStoreLimits interface {
9797
bucket.TenantConfigProvider
9898

9999
MaxChunksPerQueryFromStore(userID string) int
100-
StoreGatewayTenantShardSize(userID string) int
100+
StoreGatewayTenantShardSize(userID string) float64
101101
}
102102

103103
type blocksStoreQueryableMetrics struct {

pkg/querier/blocks_store_queryable_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1558,14 +1558,14 @@ func (m *storeGatewaySeriesClientMock) Recv() (*storepb.SeriesResponse, error) {
15581558

15591559
type blocksStoreLimitsMock struct {
15601560
maxChunksPerQuery int
1561-
storeGatewayTenantShardSize int
1561+
storeGatewayTenantShardSize float64
15621562
}
15631563

15641564
func (m *blocksStoreLimitsMock) MaxChunksPerQueryFromStore(_ string) int {
15651565
return m.maxChunksPerQuery
15661566
}
15671567

1568-
func (m *blocksStoreLimitsMock) StoreGatewayTenantShardSize(_ string) int {
1568+
func (m *blocksStoreLimitsMock) StoreGatewayTenantShardSize(_ string) float64 {
15691569
return m.storeGatewayTenantShardSize
15701570
}
15711571

pkg/scheduler/queue/queue.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/prometheus/client_golang/prometheus/promauto"
1111
"go.uber.org/atomic"
1212

13+
"github.com/cortexproject/cortex/pkg/util"
1314
"github.com/cortexproject/cortex/pkg/util/services"
1415
)
1516

@@ -86,15 +87,16 @@ func NewRequestQueue(maxOutstandingPerTenant int, forgetDelay time.Duration, que
8687
// between calls.
8788
//
8889
// If request is successfully enqueued, successFn is called with the lock held, before any querier can receive the request.
89-
func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers int, successFn func()) error {
90+
func (q *RequestQueue) EnqueueRequest(userID string, req Request, maxQueriers float64, successFn func()) error {
9091
q.mtx.Lock()
9192
defer q.mtx.Unlock()
9293

9394
if q.stopped {
9495
return ErrStopped
9596
}
9697

97-
queue := q.queues.getOrAddQueue(userID, maxQueriers)
98+
shardSize := util.DynamicShardSize(maxQueriers, len(q.queues.queriers))
99+
queue := q.queues.getOrAddQueue(userID, shardSize)
98100
if queue == nil {
99101
// This can only happen if userID is "".
100102
return errors.New("no queue found")

pkg/scheduler/scheduler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
143143
// Limits needed for the Query Scheduler - interface used for decoupling.
144144
type Limits interface {
145145
// MaxQueriersPerUser returns max queriers to use per tenant, or 0 if shuffle sharding is disabled.
146-
MaxQueriersPerUser(user string) int
146+
MaxQueriersPerUser(user string) float64
147147

148148
queue.Limits
149149
}
@@ -307,7 +307,7 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr
307307
if err != nil {
308308
return err
309309
}
310-
maxQueriers := validation.SmallestPositiveNonZeroIntPerTenant(tenantIDs, s.limits.MaxQueriersPerUser)
310+
maxQueriers := validation.SmallestPositiveNonZeroFloat64PerTenant(tenantIDs, s.limits.MaxQueriersPerUser)
311311

312312
s.activeUsers.UpdateUserTimestamp(userID, now)
313313
return s.requestQueue.EnqueueRequest(userID, req, maxQueriers, func() {

pkg/storegateway/gateway_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ func TestStoreGateway_InitialSyncWithWaitRingStability(t *testing.T) {
241241

242242
tests := map[string]struct {
243243
shardingStrategy string
244-
tenantShardSize int // Used only when the sharding strategy is shuffle-sharding.
244+
tenantShardSize float64 // Used only when the sharding strategy is shuffle-sharding.
245245
replicationFactor int
246246
numGateways int
247247
expectedBlocksLoaded int
@@ -361,8 +361,8 @@ func TestStoreGateway_InitialSyncWithWaitRingStability(t *testing.T) {
361361
assert.Equal(t, float64(2*testData.numGateways), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_discovered"))
362362

363363
if testData.shardingStrategy == util.ShardingStrategyShuffle {
364-
assert.Equal(t, float64(testData.tenantShardSize*numBlocks), metrics.GetSumOfGauges("cortex_blocks_meta_synced"))
365-
assert.Equal(t, float64(testData.tenantShardSize*numUsers), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_synced"))
364+
assert.Equal(t, float64(int(testData.tenantShardSize)*numBlocks), metrics.GetSumOfGauges("cortex_blocks_meta_synced"))
365+
assert.Equal(t, float64(int(testData.tenantShardSize)*numUsers), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_synced"))
366366
} else {
367367
assert.Equal(t, float64(testData.numGateways*numBlocks), metrics.GetSumOfGauges("cortex_blocks_meta_synced"))
368368
assert.Equal(t, float64(testData.numGateways*numUsers), metrics.GetSumOfGauges("cortex_bucket_stores_tenants_synced"))

pkg/storegateway/sharding_strategy.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/cortexproject/cortex/pkg/ring"
1414
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
15+
"github.com/cortexproject/cortex/pkg/util"
1516
)
1617

1718
const (
@@ -32,7 +33,7 @@ type ShardingStrategy interface {
3233
// ShardingLimits is the interface that should be implemented by the limits provider,
3334
// limiting the scope of the limits to the ones required by sharding strategies.
3435
type ShardingLimits interface {
35-
StoreGatewayTenantShardSize(userID string) int
36+
StoreGatewayTenantShardSize(userID string) float64
3637
}
3738

3839
// NoShardingStrategy is a no-op strategy. When this strategy is used, no tenant/block is filtered out.
@@ -173,7 +174,7 @@ func filterBlocksByRingSharding(r ring.ReadRing, instanceAddr string, metas map[
173174
// GetShuffleShardingSubring returns the subring to be used for a given user. This function
174175
// should be used both by store-gateway and querier in order to guarantee the same logic is used.
175176
func GetShuffleShardingSubring(ring *ring.Ring, userID string, limits ShardingLimits) ring.ReadRing {
176-
shardSize := limits.StoreGatewayTenantShardSize(userID)
177+
shardSize := util.DynamicShardSize(limits.StoreGatewayTenantShardSize(userID), ring.InstancesCount())
177178

178179
// A shard size of 0 means shuffle sharding is disabled for this specific user,
179180
// so we just return the full ring so that blocks will be sharded across all store-gateways.

pkg/storegateway/sharding_strategy_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -662,9 +662,9 @@ func TestShuffleShardingStrategy(t *testing.T) {
662662
}
663663

664664
type shardingLimitsMock struct {
665-
storeGatewayTenantShardSize int
665+
storeGatewayTenantShardSize float64
666666
}
667667

668-
func (m *shardingLimitsMock) StoreGatewayTenantShardSize(_ string) int {
668+
func (m *shardingLimitsMock) StoreGatewayTenantShardSize(_ string) float64 {
669669
return m.storeGatewayTenantShardSize
670670
}

pkg/util/shard.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,12 @@ func ShuffleShardExpectedInstancesPerZone(shardSize, numZones int) int {
4343
func ShuffleShardExpectedInstances(shardSize, numZones int) int {
4444
return ShuffleShardExpectedInstancesPerZone(shardSize, numZones) * numZones
4545
}
46+
47+
// DynamicShardSize returns the shard size as a percentage of numInstances if the value is < 1. If the value is > 1, the value is rounded and returned.
48+
func DynamicShardSize(value float64, numInstances int) int {
49+
var shardSize = int(math.Ceil(value))
50+
if value < 1 {
51+
shardSize = int(math.Ceil(float64(numInstances) * value))
52+
}
53+
return shardSize
54+
}

pkg/util/shard_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,61 @@ func TestShuffleShardExpectedInstances(t *testing.T) {
8181
assert.Equal(t, test.expected, ShuffleShardExpectedInstances(test.shardSize, test.numZones))
8282
}
8383
}
84+
85+
func TestDynamicShardSize(t *testing.T) {
86+
tests := []struct {
87+
value float64
88+
numInstances int
89+
expected int
90+
}{
91+
{
92+
value: 0,
93+
numInstances: 100,
94+
expected: 0,
95+
},
96+
{
97+
value: 0.1,
98+
numInstances: 100,
99+
expected: 10,
100+
},
101+
{
102+
value: 0.01,
103+
numInstances: 100,
104+
expected: 1,
105+
},
106+
{
107+
value: 3,
108+
numInstances: 100,
109+
expected: 3,
110+
},
111+
{
112+
value: 0.4,
113+
numInstances: 100,
114+
expected: 40,
115+
},
116+
{
117+
value: 1,
118+
numInstances: 100,
119+
expected: 1,
120+
},
121+
{
122+
value: 0.99999,
123+
numInstances: 100,
124+
expected: 100,
125+
},
126+
{
127+
value: 0.5,
128+
numInstances: 3,
129+
expected: 2,
130+
},
131+
{
132+
value: 0.8,
133+
numInstances: 3,
134+
expected: 3,
135+
},
136+
}
137+
138+
for _, test := range tests {
139+
assert.Equal(t, test.expected, DynamicShardSize(test.value, test.numInstances))
140+
}
141+
}

pkg/util/validation/limits.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ type Limits struct {
8282
MaxQueryLength model.Duration `yaml:"max_query_length" json:"max_query_length"`
8383
MaxQueryParallelism int `yaml:"max_query_parallelism" json:"max_query_parallelism"`
8484
MaxCacheFreshness model.Duration `yaml:"max_cache_freshness" json:"max_cache_freshness"`
85-
MaxQueriersPerTenant int `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"`
85+
MaxQueriersPerTenant float64 `yaml:"max_queriers_per_tenant" json:"max_queriers_per_tenant"`
8686
QueryVerticalShardSize int `yaml:"query_vertical_shard_size" json:"query_vertical_shard_size" doc:"hidden"`
8787

8888
// Query Frontend / Scheduler enforced limits.
@@ -95,8 +95,8 @@ type Limits struct {
9595
RulerMaxRuleGroupsPerTenant int `yaml:"ruler_max_rule_groups_per_tenant" json:"ruler_max_rule_groups_per_tenant"`
9696

9797
// Store-gateway.
98-
StoreGatewayTenantShardSize int `yaml:"store_gateway_tenant_shard_size" json:"store_gateway_tenant_shard_size"`
99-
MaxDownloadedBytesPerRequest int `yaml:"max_downloaded_bytes_per_request" json:"max_downloaded_bytes_per_request"`
98+
StoreGatewayTenantShardSize float64 `yaml:"store_gateway_tenant_shard_size" json:"store_gateway_tenant_shard_size"`
99+
MaxDownloadedBytesPerRequest int `yaml:"max_downloaded_bytes_per_request" json:"max_downloaded_bytes_per_request"`
100100

101101
// Compactor.
102102
CompactorBlocksRetentionPeriod model.Duration `yaml:"compactor_blocks_retention_period" json:"compactor_blocks_retention_period"`
@@ -168,7 +168,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
168168
f.IntVar(&l.MaxQueryParallelism, "querier.max-query-parallelism", 14, "Maximum number of split queries will be scheduled in parallel by the frontend.")
169169
_ = l.MaxCacheFreshness.Set("1m")
170170
f.Var(&l.MaxCacheFreshness, "frontend.max-cache-freshness", "Most recent allowed cacheable result per-tenant, to prevent caching very recent results that might still be in flux.")
171-
f.IntVar(&l.MaxQueriersPerTenant, "frontend.max-queriers-per-tenant", 0, "Maximum number of queriers that can handle requests for a single tenant. If set to 0 or value higher than number of available queriers, *all* queriers will handle requests for the tenant. Each frontend (or query-scheduler, if used) will select the same set of queriers for the same tenant (given that all queriers are connected to all frontends / query-schedulers). This option only works with queriers connecting to the query-frontend / query-scheduler, not when using downstream URL.")
171+
f.Float64Var(&l.MaxQueriersPerTenant, "frontend.max-queriers-per-tenant", 0, "Maximum number of queriers that can handle requests for a single tenant. If set to 0 or value higher than number of available queriers, *all* queriers will handle requests for the tenant. If the value is < 1, it will be treated as a percentage and the gets a percentage of the total queriers. Each frontend (or query-scheduler, if used) will select the same set of queriers for the same tenant (given that all queriers are connected to all frontends / query-schedulers). This option only works with queriers connecting to the query-frontend / query-scheduler, not when using downstream URL.")
172172
f.IntVar(&l.QueryVerticalShardSize, "frontend.query-vertical-shard-size", 0, "[Experimental] Number of shards to use when distributing shardable PromQL queries.")
173173

174174
f.IntVar(&l.MaxOutstandingPerTenant, "frontend.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per request queue (either query frontend or query scheduler); requests beyond this error with HTTP 429.")
@@ -182,7 +182,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
182182
f.IntVar(&l.CompactorTenantShardSize, "compactor.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the compactor. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")
183183

184184
// Store-gateway.
185-
f.IntVar(&l.StoreGatewayTenantShardSize, "store-gateway.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used. Must be set when the store-gateway sharding is enabled with the shuffle-sharding strategy. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")
185+
f.Float64Var(&l.StoreGatewayTenantShardSize, "store-gateway.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used. Must be set when the store-gateway sharding is enabled with the shuffle-sharding strategy. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant. If the value is < 1 the shard size will be a percentage of the total store-gateways.")
186186
f.IntVar(&l.MaxDownloadedBytesPerRequest, "store-gateway.max-downloaded-bytes-per-request", 0, "The maximum number of data bytes to download per gRPC request in Store Gateway, including Series/LabelNames/LabelValues requests. 0 to disable.")
187187

188188
// Alertmanager.
@@ -455,7 +455,7 @@ func (o *Overrides) MaxCacheFreshness(userID string) time.Duration {
455455
}
456456

457457
// MaxQueriersPerUser returns the maximum number of queriers that can handle requests for this user.
458-
func (o *Overrides) MaxQueriersPerUser(userID string) int {
458+
func (o *Overrides) MaxQueriersPerUser(userID string) float64 {
459459
return o.GetOverridesForUser(userID).MaxQueriersPerTenant
460460
}
461461

@@ -547,7 +547,7 @@ func (o *Overrides) RulerMaxRuleGroupsPerTenant(userID string) int {
547547
}
548548

549549
// StoreGatewayTenantShardSize returns the store-gateway shard size for a given user.
550-
func (o *Overrides) StoreGatewayTenantShardSize(userID string) int {
550+
func (o *Overrides) StoreGatewayTenantShardSize(userID string) float64 {
551551
return o.GetOverridesForUser(userID).StoreGatewayTenantShardSize
552552
}
553553

@@ -687,12 +687,12 @@ func SmallestPositiveIntPerTenant(tenantIDs []string, f func(string) int) int {
687687
return *result
688688
}
689689

690-
// SmallestPositiveNonZeroIntPerTenant is returning the minimal positive and
690+
// SmallestPositiveNonZeroFloat64PerTenant is returning the minimal positive and
691691
// non-zero value of the supplied limit function for all given tenants. In many
692692
// limits a value of 0 means unlimted so the method will return 0 only if all
693693
// inputs have a limit of 0 or an empty tenant list is given.
694-
func SmallestPositiveNonZeroIntPerTenant(tenantIDs []string, f func(string) int) int {
695-
var result *int
694+
func SmallestPositiveNonZeroFloat64PerTenant(tenantIDs []string, f func(string) float64) float64 {
695+
var result *float64
696696
for _, tenantID := range tenantIDs {
697697
v := f(tenantID)
698698
if v > 0 && (result == nil || v < *result) {

pkg/util/validation/limits_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ func TestSmallestPositiveIntPerTenant(t *testing.T) {
262262
}
263263
}
264264

265-
func TestSmallestPositiveNonZeroIntPerTenant(t *testing.T) {
265+
func TestSmallestPositiveNonZeroFloat64PerTenant(t *testing.T) {
266266
tenantLimits := map[string]*Limits{
267267
"tenant-a": {
268268
MaxQueriersPerTenant: 5,
@@ -280,7 +280,7 @@ func TestSmallestPositiveNonZeroIntPerTenant(t *testing.T) {
280280

281281
for _, tc := range []struct {
282282
tenantIDs []string
283-
expLimit int
283+
expLimit float64
284284
}{
285285
{tenantIDs: []string{}, expLimit: 0},
286286
{tenantIDs: []string{"tenant-a"}, expLimit: 5},
@@ -290,7 +290,7 @@ func TestSmallestPositiveNonZeroIntPerTenant(t *testing.T) {
290290
{tenantIDs: []string{"tenant-c", "tenant-d", "tenant-e"}, expLimit: 0},
291291
{tenantIDs: []string{"tenant-a", "tenant-b", "tenant-c"}, expLimit: 5},
292292
} {
293-
assert.Equal(t, tc.expLimit, SmallestPositiveNonZeroIntPerTenant(tc.tenantIDs, ov.MaxQueriersPerUser))
293+
assert.Equal(t, tc.expLimit, SmallestPositiveNonZeroFloat64PerTenant(tc.tenantIDs, ov.MaxQueriersPerUser))
294294
}
295295
}
296296

0 commit comments

Comments
 (0)