Skip to content

Commit 1e5ec9e

Browse files
friedrichgmoki1202
andauthored
add/support enabled_tenants and disabled_tenants feature in storegateway (#5638)
* add enabled and disabled tenants in storegateway Signed-off-by: Shashank <[email protected]> * added test cases Signed-off-by: Shashank <[email protected]> * Fix implementation and tests Signed-off-by: Friedrich Gonzalez <[email protected]> * Update docs Signed-off-by: Friedrich Gonzalez <[email protected]> * Update changelog Signed-off-by: Friedrich Gonzalez <[email protected]> * Replace underscore for dash in flag Signed-off-by: Friedrich Gonzalez <[email protected]> --------- Signed-off-by: Shashank <[email protected]> Signed-off-by: Friedrich Gonzalez <[email protected]> Co-authored-by: Shashank <[email protected]>
1 parent 3b492ef commit 1e5ec9e

9 files changed

+126
-32
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Changelog
22

33
## master / unreleased
4+
* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638
45

56
## 1.16.0 in progress
67

docs/blocks-storage/store-gateway.md

+13
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,19 @@ store_gateway:
331331
# shuffle-sharding.
332332
# CLI flag: -store-gateway.sharding-strategy
333333
[sharding_strategy: <string> | default = "default"]
334+
335+
# Comma separated list of tenants whose store metrics this storegateway can
336+
# process. If specified, only these tenants will be handled by storegateway,
337+
# otherwise this storegateway will be enabled for all the tenants in the
338+
# store-gateway cluster.
339+
# CLI flag: -store-gateway.enabled-tenants
340+
[enabled_tenants: <string> | default = ""]
341+
342+
# Comma separated list of tenants whose store metrics this storegateway cannot
343+
# process. If specified, a storegateway that would normally pick the specified
344+
# tenant(s) for processing will ignore them instead.
345+
# CLI flag: -store-gateway.disabled-tenants
346+
[disabled_tenants: <string> | default = ""]
334347
```
335348
336349
### `blocks_storage_config`

docs/configuration/config-file-reference.md

+13
Original file line numberDiff line numberDiff line change
@@ -4912,6 +4912,19 @@ sharding_ring:
49124912
# The sharding strategy to use. Supported values are: default, shuffle-sharding.
49134913
# CLI flag: -store-gateway.sharding-strategy
49144914
[sharding_strategy: <string> | default = "default"]
4915+
4916+
# Comma separated list of tenants whose store metrics this storegateway can
4917+
# process. If specified, only these tenants will be handled by storegateway,
4918+
# otherwise this storegateway will be enabled for all the tenants in the
4919+
# store-gateway cluster.
4920+
# CLI flag: -store-gateway.enabled-tenants
4921+
[enabled_tenants: <string> | default = ""]
4922+
4923+
# Comma separated list of tenants whose store metrics this storegateway cannot
4924+
# process. If specified, a storegateway that would normally pick the specified
4925+
# tenant(s) for processing will ignore them instead.
4926+
# CLI flag: -store-gateway.disabled-tenants
4927+
[disabled_tenants: <string> | default = ""]
49154928
```
49164929

49174930
### `tracing_config`

pkg/storegateway/bucket_index_metadata_fetcher_test.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) {
5555
NewIgnoreDeletionMarkFilter(logger, bucket.NewUserBucketClient(userID, bkt, nil), 2*time.Hour, 1),
5656
}
5757

58-
fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, logger, reg, filters)
58+
fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(logger, nil), nil, logger, reg, filters)
5959
metas, partials, err := fetcher.Fetch(ctx)
6060
require.NoError(t, err)
6161
assert.Equal(t, map[ulid.ULID]*metadata.Meta{
@@ -109,7 +109,7 @@ func TestBucketIndexMetadataFetcher_Fetch_KeyPermissionDenied(t *testing.T) {
109109

110110
bkt.MockGet(userID+"/bucket-index.json.gz", "c", bucket.ErrCustomerManagedKeyAccessDenied)
111111

112-
fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, log.NewNopLogger(), reg, nil)
112+
fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(log.NewNopLogger(), nil), nil, log.NewNopLogger(), reg, nil)
113113
metas, _, err := fetcher.Fetch(ctx)
114114
require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied))
115115
assert.Empty(t, metas)
@@ -157,7 +157,7 @@ func TestBucketIndexMetadataFetcher_Fetch_NoBucketIndex(t *testing.T) {
157157
logs := &concurrency.SyncBuffer{}
158158
logger := log.NewLogfmtLogger(logs)
159159

160-
fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, logger, reg, nil)
160+
fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(logger, nil), nil, logger, reg, nil)
161161
metas, partials, err := fetcher.Fetch(ctx)
162162
require.NoError(t, err)
163163
assert.Empty(t, metas)
@@ -212,7 +212,7 @@ func TestBucketIndexMetadataFetcher_Fetch_CorruptedBucketIndex(t *testing.T) {
212212
// Upload a corrupted bucket index.
213213
require.NoError(t, bkt.Upload(ctx, path.Join(userID, bucketindex.IndexCompressedFilename), strings.NewReader("invalid}!")))
214214

215-
fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, logger, reg, nil)
215+
fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(logger, nil), nil, logger, reg, nil)
216216
metas, partials, err := fetcher.Fetch(ctx)
217217
require.NoError(t, err)
218218
assert.Empty(t, metas)

pkg/storegateway/bucket_stores_test.go

+13-8
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func TestBucketStores_CustomerKeyError(t *testing.T) {
120120
require.NoError(t, err)
121121

122122
reg := prometheus.NewPedanticRegistry()
123-
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), mBucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
123+
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), mBucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
124124
require.NoError(t, err)
125125

126126
if tc.mockInitialSync {
@@ -200,7 +200,7 @@ func TestBucketStores_InitialSync(t *testing.T) {
200200
require.NoError(t, err)
201201

202202
reg := prometheus.NewPedanticRegistry()
203-
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
203+
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
204204
require.NoError(t, err)
205205

206206
// Query series before the initial sync.
@@ -276,7 +276,7 @@ func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) {
276276
bucket = &failFirstGetBucket{Bucket: bucket}
277277

278278
reg := prometheus.NewPedanticRegistry()
279-
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
279+
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
280280
require.NoError(t, err)
281281

282282
// Initial sync should succeed even if a transient error occurs.
@@ -336,7 +336,7 @@ func TestBucketStores_SyncBlocks(t *testing.T) {
336336
require.NoError(t, err)
337337

338338
reg := prometheus.NewPedanticRegistry()
339-
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
339+
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
340340
require.NoError(t, err)
341341

342342
// Run an initial sync to discover 1 block.
@@ -397,11 +397,16 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) {
397397
tests := map[string]struct {
398398
shardingStrategy ShardingStrategy
399399
expectedStores int32
400+
allowedTenants *util.AllowedTenants
400401
}{
401402
"when sharding is disabled all users should be synced": {
402-
shardingStrategy: NewNoShardingStrategy(),
403+
shardingStrategy: NewNoShardingStrategy(log.NewNopLogger(), nil),
403404
expectedStores: 3,
404405
},
406+
"sharding disabled, user-1 disabled": {
407+
shardingStrategy: NewNoShardingStrategy(log.NewNopLogger(), util.NewAllowedTenants(nil, []string{"user-1"})),
408+
expectedStores: 2,
409+
},
405410
"when sharding is enabled only stores for filtered users should be created": {
406411
shardingStrategy: func() ShardingStrategy {
407412
s := &mockShardingStrategy{}
@@ -465,7 +470,7 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t
465470
require.NoError(t, err)
466471

467472
reg := prometheus.NewPedanticRegistry()
468-
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
473+
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
469474
require.NoError(t, err)
470475
require.NoError(t, stores.InitialSync(ctx))
471476

@@ -521,7 +526,7 @@ func TestBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReached(t *t
521526
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
522527
require.NoError(t, err)
523528

524-
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
529+
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
525530
require.NoError(t, err)
526531
require.NoError(t, stores.InitialSync(context.Background()))
527532

@@ -542,7 +547,7 @@ func TestBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabl
542547
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
543548
require.NoError(t, err)
544549

545-
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
550+
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
546551
require.NoError(t, err)
547552
require.NoError(t, stores.InitialSync(context.Background()))
548553

pkg/storegateway/gateway.go

+16-3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2424
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
2525
"github.com/cortexproject/cortex/pkg/util"
26+
"github.com/cortexproject/cortex/pkg/util/flagext"
2627
"github.com/cortexproject/cortex/pkg/util/services"
2728
"github.com/cortexproject/cortex/pkg/util/validation"
2829
)
@@ -58,6 +59,9 @@ type Config struct {
5859
ShardingEnabled bool `yaml:"sharding_enabled"`
5960
ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration. This option is required only if blocks sharding is enabled."`
6061
ShardingStrategy string `yaml:"sharding_strategy"`
62+
63+
EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
64+
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`
6165
}
6266

6367
// RegisterFlags registers the Config flags.
@@ -66,6 +70,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
6670

6771
f.BoolVar(&cfg.ShardingEnabled, "store-gateway.sharding-enabled", false, "Shard blocks across multiple store gateway instances."+sharedOptionWithQuerier)
6872
f.StringVar(&cfg.ShardingStrategy, "store-gateway.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
73+
f.Var(&cfg.EnabledTenants, "store-gateway.enabled-tenants", "Comma separated list of tenants whose store metrics this storegateway can process. If specified, only these tenants will be handled by storegateway, otherwise this storegateway will be enabled for all the tenants in the store-gateway cluster.")
74+
f.Var(&cfg.DisabledTenants, "store-gateway.disabled-tenants", "Comma separated list of tenants whose store metrics this storegateway cannot process. If specified, a storegateway that would normally pick the specified tenant(s) for processing will ignore them instead.")
6975
}
7076

7177
// Validate the Config.
@@ -140,6 +146,7 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf
140146
Help: "Total number of times the bucket sync operation triggered.",
141147
}, []string{"reason"}),
142148
}
149+
allowedTenants := util.NewAllowedTenants(gatewayCfg.EnabledTenants, gatewayCfg.DisabledTenants)
143150

144151
// Init metrics.
145152
g.bucketSync.WithLabelValues(syncReasonInitial)
@@ -161,6 +168,12 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf
161168
Help: instanceLimitsMetricHelp,
162169
ConstLabels: map[string]string{limitLabel: "max_chunk_pool_bytes"},
163170
}).Set(float64(storageCfg.BucketStore.MaxChunkPoolBytes))
171+
if len(gatewayCfg.EnabledTenants) > 0 {
172+
level.Info(g.logger).Log("msg", "storegateway using enabled users", "enabled", strings.Join(gatewayCfg.EnabledTenants, ", "))
173+
}
174+
if len(gatewayCfg.DisabledTenants) > 0 {
175+
level.Info(g.logger).Log("msg", "storegateway using disabled users", "disabled", strings.Join(gatewayCfg.DisabledTenants, ", "))
176+
}
164177

165178
// Init sharding strategy.
166179
var shardingStrategy ShardingStrategy
@@ -192,14 +205,14 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf
192205
// Instance the right strategy.
193206
switch gatewayCfg.ShardingStrategy {
194207
case util.ShardingStrategyDefault:
195-
shardingStrategy = NewDefaultShardingStrategy(g.ring, lifecyclerCfg.Addr, logger)
208+
shardingStrategy = NewDefaultShardingStrategy(g.ring, lifecyclerCfg.Addr, logger, allowedTenants)
196209
case util.ShardingStrategyShuffle:
197-
shardingStrategy = NewShuffleShardingStrategy(g.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, limits, logger, g.gatewayCfg.ShardingRing.ZoneStableShuffleSharding)
210+
shardingStrategy = NewShuffleShardingStrategy(g.ring, lifecyclerCfg.ID, lifecyclerCfg.Addr, limits, logger, allowedTenants, g.gatewayCfg.ShardingRing.ZoneStableShuffleSharding)
198211
default:
199212
return nil, errInvalidShardingStrategy
200213
}
201214
} else {
202-
shardingStrategy = NewNoShardingStrategy()
215+
shardingStrategy = NewNoShardingStrategy(logger, allowedTenants)
203216
}
204217

205218
g.stores, err = NewBucketStores(storageCfg, shardingStrategy, bucketClient, limits, logLevel, logger, extprom.WrapRegistererWith(prometheus.Labels{"component": "store-gateway"}, reg))

pkg/storegateway/gateway_test.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) {
133133
ctx := context.Background()
134134
gatewayCfg := mockGatewayConfig()
135135
gatewayCfg.ShardingEnabled = true
136+
gatewayCfg.DisabledTenants = []string{"user-disabled"}
136137
storageCfg := mockStorageConfig(t)
137138
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
138139
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
@@ -153,7 +154,7 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) {
153154
defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck
154155
assert.False(t, g.ringLifecycler.IsRegistered())
155156

156-
bucketClient.MockIterWithCallback("", []string{"user-1", "user-2"}, nil, func() {
157+
bucketClient.MockIterWithCallback("", []string{"user-1", "user-2", "user-disabled"}, nil, func() {
157158
// During the initial sync, we expect the instance to always be in the JOINING
158159
// state within the ring.
159160
assert.True(t, g.ringLifecycler.IsRegistered())
@@ -163,6 +164,7 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) {
163164
})
164165
bucketClient.MockIter("user-1/", []string{}, nil)
165166
bucketClient.MockIter("user-2/", []string{}, nil)
167+
bucketClient.MockIter("user-disabled/", []string{}, nil)
166168

167169
// Once successfully started, the instance should be ACTIVE in the ring.
168170
require.NoError(t, services.StartAndAwaitRunning(ctx, g))
@@ -174,6 +176,7 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) {
174176

175177
assert.NotNil(t, g.stores.getStore("user-1"))
176178
assert.NotNil(t, g.stores.getStore("user-2"))
179+
assert.Nil(t, g.stores.getStore("user-disabled"))
177180
assert.Nil(t, g.stores.getStore("user-unknown"))
178181
})
179182
}
@@ -184,20 +187,23 @@ func TestStoreGateway_InitialSyncWithShardingDisabled(t *testing.T) {
184187
ctx := context.Background()
185188
gatewayCfg := mockGatewayConfig()
186189
gatewayCfg.ShardingEnabled = false
190+
gatewayCfg.DisabledTenants = []string{"user-disabled"}
187191
storageCfg := mockStorageConfig(t)
188192
bucketClient := &bucket.ClientMock{}
189193

190194
g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, nil, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), nil)
191195
require.NoError(t, err)
192196
defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck
193197

194-
bucketClient.MockIter("", []string{"user-1", "user-2"}, nil)
198+
bucketClient.MockIter("", []string{"user-1", "user-2", "user-disabled"}, nil)
195199
bucketClient.MockIter("user-1/", []string{}, nil)
196200
bucketClient.MockIter("user-2/", []string{}, nil)
201+
bucketClient.MockIter("user-disabled/", []string{}, nil)
197202

198203
require.NoError(t, services.StartAndAwaitRunning(ctx, g))
199204
assert.NotNil(t, g.stores.getStore("user-1"))
200205
assert.NotNil(t, g.stores.getStore("user-2"))
206+
assert.Nil(t, g.stores.getStore("user-disabled"))
201207
assert.Nil(t, g.stores.getStore("user-unknown"))
202208
}
203209

pkg/storegateway/sharding_strategy.go

+36-12
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,35 @@ type ShardingLimits interface {
3636
StoreGatewayTenantShardSize(userID string) float64
3737
}
3838

39+
func filterDisallowedTenants(userIDs []string, logger log.Logger, allowedTenants *util.AllowedTenants) []string {
40+
filteredUserIDs := []string{}
41+
for _, userID := range userIDs {
42+
if !allowedTenants.IsAllowed(userID) {
43+
level.Debug(logger).Log("msg", "ignoring storage gateway for user, not allowed", "user", userID)
44+
continue
45+
}
46+
47+
filteredUserIDs = append(filteredUserIDs, userID)
48+
}
49+
50+
return filteredUserIDs
51+
}
52+
3953
// NoShardingStrategy is a no-op strategy. When this strategy is used, no tenant/block is filtered out.
40-
type NoShardingStrategy struct{}
54+
type NoShardingStrategy struct {
55+
logger log.Logger
56+
allowedTenants *util.AllowedTenants
57+
}
4158

42-
func NewNoShardingStrategy() *NoShardingStrategy {
43-
return &NoShardingStrategy{}
59+
func NewNoShardingStrategy(logger log.Logger, allowedTenants *util.AllowedTenants) *NoShardingStrategy {
60+
return &NoShardingStrategy{
61+
logger: logger,
62+
allowedTenants: allowedTenants,
63+
}
4464
}
4565

4666
func (s *NoShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string {
47-
return userIDs
67+
return filterDisallowedTenants(userIDs, s.logger, s.allowedTenants)
4868
}
4969

5070
func (s *NoShardingStrategy) FilterBlocks(_ context.Context, _ string, _ map[ulid.ULID]*metadata.Meta, _ map[ulid.ULID]struct{}, _ block.GaugeVec) error {
@@ -54,23 +74,26 @@ func (s *NoShardingStrategy) FilterBlocks(_ context.Context, _ string, _ map[uli
5474
// DefaultShardingStrategy is a sharding strategy based on the hash ring formed by store-gateways.
5575
// Not go-routine safe.
5676
type DefaultShardingStrategy struct {
57-
r *ring.Ring
58-
instanceAddr string
59-
logger log.Logger
77+
r *ring.Ring
78+
instanceAddr string
79+
logger log.Logger
80+
allowedTenants *util.AllowedTenants
6081
}
6182

6283
// NewDefaultShardingStrategy creates DefaultShardingStrategy.
63-
func NewDefaultShardingStrategy(r *ring.Ring, instanceAddr string, logger log.Logger) *DefaultShardingStrategy {
84+
func NewDefaultShardingStrategy(r *ring.Ring, instanceAddr string, logger log.Logger, allowedTenants *util.AllowedTenants) *DefaultShardingStrategy {
6485
return &DefaultShardingStrategy{
6586
r: r,
6687
instanceAddr: instanceAddr,
6788
logger: logger,
89+
90+
allowedTenants: allowedTenants,
6891
}
6992
}
7093

7194
// FilterUsers implements ShardingStrategy.
7295
func (s *DefaultShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string {
73-
return userIDs
96+
return filterDisallowedTenants(userIDs, s.logger, s.allowedTenants)
7497
}
7598

7699
// FilterBlocks implements ShardingStrategy.
@@ -89,10 +112,11 @@ type ShuffleShardingStrategy struct {
89112
logger log.Logger
90113

91114
zoneStableShuffleSharding bool
115+
allowedTenants *util.AllowedTenants
92116
}
93117

94118
// NewShuffleShardingStrategy makes a new ShuffleShardingStrategy.
95-
func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger, zoneStableShuffleSharding bool) *ShuffleShardingStrategy {
119+
func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, limits ShardingLimits, logger log.Logger, allowedTenants *util.AllowedTenants, zoneStableShuffleSharding bool) *ShuffleShardingStrategy {
96120
return &ShuffleShardingStrategy{
97121
r: r,
98122
instanceID: instanceID,
@@ -101,14 +125,14 @@ func NewShuffleShardingStrategy(r *ring.Ring, instanceID, instanceAddr string, l
101125
logger: logger,
102126

103127
zoneStableShuffleSharding: zoneStableShuffleSharding,
128+
allowedTenants: allowedTenants,
104129
}
105130
}
106131

107132
// FilterUsers implements ShardingStrategy.
108133
func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string {
109134
var filteredIDs []string
110-
111-
for _, userID := range userIDs {
135+
for _, userID := range filterDisallowedTenants(userIDs, s.logger, s.allowedTenants) {
112136
subRing := GetShuffleShardingSubring(s.r, userID, s.limits, s.zoneStableShuffleSharding)
113137

114138
// Include the user only if it belongs to this store-gateway shard.

0 commit comments

Comments
 (0)