Skip to content

Commit e9eebc6

Browse files
committed
Allowing ruler replication to be configurable
1 parent 73c8e5a commit e9eebc6

File tree

2 files changed

+21
-8
lines changed

2 files changed

+21
-8
lines changed

pkg/ruler/ruler_ring.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,18 @@ var ListRuleRingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE, ring.LEAVING},
3838
// is used to strip down the config to the minimum, and avoid confusion
3939
// to the user.
4040
type RingConfig struct {
41-
KVStore kv.Config `yaml:"kvstore"`
42-
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
43-
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
41+
KVStore kv.Config `yaml:"kvstore"`
42+
HeartbeatPeriod time.Duration `yaml:"heartbeat_period"`
43+
HeartbeatTimeout time.Duration `yaml:"heartbeat_timeout"`
44+
ReplicationFactor int `yaml:"replication_factor"`
45+
ZoneAwarenessEnabled bool `yaml:"zone_awareness_enabled"`
4446

4547
// Instance details
4648
InstanceID string `yaml:"instance_id" doc:"hidden"`
4749
InstanceInterfaceNames []string `yaml:"instance_interface_names"`
4850
InstancePort int `yaml:"instance_port" doc:"hidden"`
4951
InstanceAddr string `yaml:"instance_addr" doc:"hidden"`
52+
InstanceZone string `yaml:"instance_availability_zone" doc:"hidden"`
5053
NumTokens int `yaml:"num_tokens"`
5154

5255
FinalSleep time.Duration `yaml:"final_sleep"`
@@ -70,13 +73,16 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) {
7073
f.DurationVar(&cfg.HeartbeatPeriod, "ruler.ring.heartbeat-period", 5*time.Second, "Period at which to heartbeat to the ring. 0 = disabled.")
7174
f.DurationVar(&cfg.HeartbeatTimeout, "ruler.ring.heartbeat-timeout", time.Minute, "The heartbeat timeout after which rulers are considered unhealthy within the ring. 0 = never (timeout disabled).")
7275
f.DurationVar(&cfg.FinalSleep, "ruler.ring.final-sleep", 0*time.Second, "The sleep seconds when ruler is shutting down. Need to be close to or larger than KV Store information propagation delay")
76+
f.IntVar(&cfg.ReplicationFactor, "ruler.ring.replication-factor", 1, "The replication factor to use when loading rule groups for API HA.")
77+
f.BoolVar(&cfg.ZoneAwarenessEnabled, "ruler.ring.zone-awareness-enabled", false, "True to enable zone-awareness and load rule groups across different availability zones for API HA.")
7378

7479
// Instance flags
7580
cfg.InstanceInterfaceNames = []string{"eth0", "en0"}
7681
f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), "ruler.ring.instance-interface-names", "Name of network interface to read address from.")
7782
f.StringVar(&cfg.InstanceAddr, "ruler.ring.instance-addr", "", "IP address to advertise in the ring.")
7883
f.IntVar(&cfg.InstancePort, "ruler.ring.instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).")
7984
f.StringVar(&cfg.InstanceID, "ruler.ring.instance-id", hostname, "Instance ID to register in the ring.")
85+
f.StringVar(&cfg.InstanceZone, "ruler.ring.instance-availability-zone", "", "The availability zone where this instance is running. Required if zone-awareness is enabled.")
8086
f.IntVar(&cfg.NumTokens, "ruler.ring.num-tokens", 128, "Number of tokens for each ruler.")
8187
}
8288

@@ -93,6 +99,7 @@ func (cfg *RingConfig) ToLifecyclerConfig(logger log.Logger) (ring.BasicLifecycl
9399
return ring.BasicLifecyclerConfig{
94100
ID: cfg.InstanceID,
95101
Addr: fmt.Sprintf("%s:%d", instanceAddr, instancePort),
102+
Zone: cfg.InstanceZone,
96103
HeartbeatPeriod: cfg.HeartbeatPeriod,
97104
TokensObservePeriod: 0,
98105
NumTokens: cfg.NumTokens,
@@ -107,9 +114,10 @@ func (cfg *RingConfig) ToRingConfig() ring.Config {
107114
rc.KVStore = cfg.KVStore
108115
rc.HeartbeatTimeout = cfg.HeartbeatTimeout
109116
rc.SubringCacheDisabled = true
117+
rc.ZoneAwarenessEnabled = cfg.ZoneAwarenessEnabled
110118

111-
// Each rule group is loaded to *exactly* one ruler.
112-
rc.ReplicationFactor = 1
119+
// Each rule group is evaluated by *exactly* one ruler, but it can be loaded by multiple rulers for API HA
120+
rc.ReplicationFactor = cfg.ReplicationFactor
113121

114122
return rc
115123
}

pkg/ruler/ruler_test.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ func defaultRulerConfig(t testing.TB) Config {
7676
cfg.Ring.InstanceAddr = "localhost"
7777
cfg.Ring.InstanceID = "localhost"
7878
cfg.Ring.FinalSleep = 0
79+
cfg.Ring.ReplicationFactor = 1
7980
cfg.EnableQueryStats = false
8081

8182
return cfg
@@ -617,6 +618,7 @@ func TestGetRules(t *testing.T) {
617618
KVStore: kv.Config{
618619
Mock: kvStore,
619620
},
621+
ReplicationFactor: 1,
620622
}
621623

622624
r, _ := buildRuler(t, cfg, nil, store, rulerAddrMap)
@@ -1122,7 +1124,8 @@ func TestSharding(t *testing.T) {
11221124
KVStore: kv.Config{
11231125
Mock: kvStore,
11241126
},
1125-
HeartbeatTimeout: 1 * time.Minute,
1127+
HeartbeatTimeout: 1 * time.Minute,
1128+
ReplicationFactor: 1,
11261129
},
11271130
FlushCheckPeriod: 0,
11281131
EnabledTenants: tc.enabledUsers,
@@ -1259,7 +1262,8 @@ func Test_LoadPartialGroups(t *testing.T) {
12591262
KVStore: kv.Config{
12601263
Mock: kvStore,
12611264
},
1262-
HeartbeatTimeout: 1 * time.Minute,
1265+
HeartbeatTimeout: 1 * time.Minute,
1266+
ReplicationFactor: 1,
12631267
},
12641268
FlushCheckPeriod: 0,
12651269
}
@@ -1782,7 +1786,8 @@ func TestRulerDisablesRuleGroups(t *testing.T) {
17821786
KVStore: kv.Config{
17831787
Mock: kvStore,
17841788
},
1785-
HeartbeatTimeout: 1 * time.Minute,
1789+
HeartbeatTimeout: 1 * time.Minute,
1790+
ReplicationFactor: 1,
17861791
},
17871792
FlushCheckPeriod: 0,
17881793
}

0 commit comments

Comments
 (0)