Skip to content

Commit 6054659

Browse files
committed
add enabled and disabled tenants in storegateway
Signed-off-by: Shashank <[email protected]>
1 parent 4d433cc commit 6054659

File tree

2 files changed

+42
-9
lines changed

2 files changed

+42
-9
lines changed

pkg/storegateway/gateway.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2424
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
2525
"github.com/cortexproject/cortex/pkg/util"
26+
"github.com/cortexproject/cortex/pkg/util/flagext"
2627
"github.com/cortexproject/cortex/pkg/util/services"
2728
"github.com/cortexproject/cortex/pkg/util/validation"
2829
)
@@ -54,6 +55,9 @@ type Config struct {
5455
ShardingEnabled bool `yaml:"sharding_enabled"`
5556
ShardingRing RingConfig `yaml:"sharding_ring" doc:"description=The hash ring configuration. This option is required only if blocks sharding is enabled."`
5657
ShardingStrategy string `yaml:"sharding_strategy"`
58+
59+
EnabledTenants flagext.StringSliceCSV `yaml:"enabled_tenants"`
60+
DisabledTenants flagext.StringSliceCSV `yaml:"disabled_tenants"`
5761
}
5862

5963
// RegisterFlags registers the Config flags.
@@ -62,6 +66,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
6266

6367
f.BoolVar(&cfg.ShardingEnabled, "store-gateway.sharding-enabled", false, "Shard blocks across multiple store gateway instances."+sharedOptionWithQuerier)
6468
f.StringVar(&cfg.ShardingStrategy, "store-gateway.sharding-strategy", util.ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
69+
f.Var(&cfg.EnabledTenants, "store-gateway.enabled_tenants", "Comma separated list of tenants whose store metrics this storegateway can process. If specified, only these tenants will be handled by storegateway, otherwise this storegateway will be enabled for all the tenants in the store-gateway cluster.")
70+
f.Var(&cfg.DisabledTenants, "store-gateway.disabled_tenants", "Comma separated list of tenants whose store metrics this storegateway cannot process. If specified, a storegateway that would normally pick the specified tenant(s) for processing will ignore them instead.")
6571
}
6672

6773
// Validate the Config.
@@ -99,6 +105,8 @@ type StoreGateway struct {
99105
subservicesWatcher *services.FailureWatcher
100106

101107
bucketSync *prometheus.CounterVec
108+
109+
allowedTenants *util.AllowedTenants
102110
}
103111

104112
func NewStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*StoreGateway, error) {
@@ -135,13 +143,21 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf
135143
Name: "cortex_storegateway_bucket_sync_total",
136144
Help: "Total number of times the bucket sync operation triggered.",
137145
}, []string{"reason"}),
146+
allowedTenants: util.NewAllowedTenants(gatewayCfg.EnabledTenants, gatewayCfg.DisabledTenants),
138147
}
139148

140149
// Init metrics.
141150
g.bucketSync.WithLabelValues(syncReasonInitial)
142151
g.bucketSync.WithLabelValues(syncReasonPeriodic)
143152
g.bucketSync.WithLabelValues(syncReasonRingChange)
144153

154+
if len(gatewayCfg.EnabledTenants) > 0 {
155+
level.Info(g.logger).Log("msg", "storegateway using enabled users", "enabled", strings.Join(gatewayCfg.EnabledTenants, ", "))
156+
}
157+
if len(gatewayCfg.DisabledTenants) > 0 {
158+
level.Info(g.logger).Log("msg", "storegateway using disabled users", "disabled", strings.Join(gatewayCfg.DisabledTenants, ", "))
159+
}
160+
145161
// Init sharding strategy.
146162
var shardingStrategy ShardingStrategy
147163

pkg/storegateway/sharding_strategy.go

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/cortexproject/cortex/pkg/ring"
1414
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
15+
"github.com/cortexproject/cortex/pkg/util"
1516
)
1617

1718
const (
@@ -53,9 +54,10 @@ func (s *NoShardingStrategy) FilterBlocks(_ context.Context, _ string, _ map[uli
5354
// DefaultShardingStrategy is a sharding strategy based on the hash ring formed by store-gateways.
5455
// Not go-routine safe.
5556
type DefaultShardingStrategy struct {
56-
r *ring.Ring
57-
instanceAddr string
58-
logger log.Logger
57+
r *ring.Ring
58+
instanceAddr string
59+
logger log.Logger
60+
allowedTenants *util.AllowedTenants
5961
}
6062

6163
// NewDefaultShardingStrategy creates DefaultShardingStrategy.
@@ -69,7 +71,15 @@ func NewDefaultShardingStrategy(r *ring.Ring, instanceAddr string, logger log.Lo
6971

7072
// FilterUsers implements ShardingStrategy.
7173
func (s *DefaultShardingStrategy) FilterUsers(_ context.Context, userIDs []string) []string {
72-
return userIDs
74+
allUserIDs := userIDs
75+
76+
for _, userID := range userIDs {
77+
if !s.allowedTenants.IsAllowed(userID) {
78+
level.Debug(s.logger).Log("msg", "ignoring storage gateway for user, not allowed", "user", userID)
79+
}
80+
}
81+
82+
return allUserIDs
7383
}
7484

7585
// FilterBlocks implements ShardingStrategy.
@@ -81,11 +91,12 @@ func (s *DefaultShardingStrategy) FilterBlocks(_ context.Context, _ string, meta
8191
// ShuffleShardingStrategy is a shuffle sharding strategy, based on the hash ring formed by store-gateways,
8292
// where each tenant blocks are sharded across a subset of store-gateway instances.
8393
type ShuffleShardingStrategy struct {
84-
r *ring.Ring
85-
instanceID string
86-
instanceAddr string
87-
limits ShardingLimits
88-
logger log.Logger
94+
r *ring.Ring
95+
instanceID string
96+
instanceAddr string
97+
limits ShardingLimits
98+
logger log.Logger
99+
allowedTenants *util.AllowedTenants
89100
}
90101

91102
// NewShuffleShardingStrategy makes a new ShuffleShardingStrategy.
@@ -106,6 +117,12 @@ func (s *ShuffleShardingStrategy) FilterUsers(_ context.Context, userIDs []strin
106117
for _, userID := range userIDs {
107118
subRing := GetShuffleShardingSubring(s.r, userID, s.limits)
108119

120+
//filter out users not owned by this shard.
121+
if s.allowedTenants.IsAllowed(userID) {
122+
level.Debug(s.logger).Log("msg", "ignoring storage gateway for user, not allowed", "user", userID)
123+
continue
124+
}
125+
109126
// Include the user only if it belongs to this store-gateway shard.
110127
if subRing.HasInstance(s.instanceID) {
111128
filteredIDs = append(filteredIDs, userID)

0 commit comments

Comments
 (0)