Skip to content

Commit fa0f9e5

Browse files
committed
Read alertmanager state from storage if peer settling fails.
Signed-off-by: Steve Simpson <[email protected]>
1 parent 7691456 commit fa0f9e5

File tree

4 files changed

+73
-7
lines changed

4 files changed

+73
-7
lines changed

pkg/alertmanager/alertmanager.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ import (
4343
"github.com/prometheus/common/model"
4444
"github.com/prometheus/common/route"
4545

46+
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
4647
"github.com/cortexproject/cortex/pkg/util/services"
4748
)
4849

@@ -71,6 +72,7 @@ type Config struct {
7172
ShardingEnabled bool
7273
ReplicationFactor int
7374
Replicator Replicator
75+
Store alertstore.AlertStore
7476
}
7577

7678
// An Alertmanager manages the alerts for one user.
@@ -161,7 +163,7 @@ func New(cfg *Config, reg *prometheus.Registry) (*Alertmanager, error) {
161163
am.state = cfg.Peer
162164
} else if cfg.ShardingEnabled {
163165
level.Debug(am.logger).Log("msg", "starting tenant alertmanager with ring-based replication")
164-
am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, am.logger, am.registry)
166+
am.state = newReplicatedStates(cfg.UserID, cfg.ReplicationFactor, cfg.Replicator, cfg.Store, am.logger, am.registry)
165167
} else {
166168
level.Debug(am.logger).Log("msg", "starting tenant alertmanager without replication")
167169
am.state = &NilPeer{}

pkg/alertmanager/multitenant.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -855,6 +855,7 @@ func (am *MultitenantAlertmanager) newAlertmanager(userID string, amConfig *amco
855855
ShardingEnabled: am.cfg.ShardingEnabled,
856856
Replicator: am,
857857
ReplicationFactor: am.cfg.ShardingRing.ReplicationFactor,
858+
Store: am.store,
858859
}, reg)
859860
if err != nil {
860861
return nil, fmt.Errorf("unable to start Alertmanager for user %v: %v", userID, err)

pkg/alertmanager/state_replication.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@ import (
1515
"github.com/prometheus/alertmanager/cluster/clusterpb"
1616
"github.com/prometheus/client_golang/prometheus"
1717

18+
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
1819
"github.com/cortexproject/cortex/pkg/util/services"
1920
)
2021

2122
const (
2223
defaultSettleReadTimeout = 15 * time.Second
24+
defaultStoreReadTimeout = 15 * time.Second
2325
)
2426

2527
// state represents the Alertmanager silences and notification log internal state.
@@ -31,12 +33,14 @@ type state struct {
3133
reg prometheus.Registerer
3234

3335
settleReadTimeout time.Duration
36+
storeReadTimeout time.Duration
3437

3538
mtx sync.Mutex
3639
states map[string]cluster.State
3740

3841
replicationFactor int
3942
replicator Replicator
43+
store alertstore.AlertStore
4044

4145
partialStateMergesTotal *prometheus.CounterVec
4246
partialStateMergesFailed *prometheus.CounterVec
@@ -47,17 +51,19 @@ type state struct {
4751
}
4852

4953
// newReplicatedStates creates a new state struct, which manages state to be replicated between alertmanagers.
50-
func newReplicatedStates(userID string, rf int, re Replicator, l log.Logger, r prometheus.Registerer) *state {
54+
func newReplicatedStates(userID string, rf int, re Replicator, st alertstore.AlertStore, l log.Logger, r prometheus.Registerer) *state {
5155

5256
s := &state{
5357
logger: l,
5458
userID: userID,
5559
replicationFactor: rf,
5660
replicator: re,
61+
store: st,
5762
states: make(map[string]cluster.State, 2), // we use two, one for the notifications and one for silences.
5863
msgc: make(chan *clusterpb.Part),
5964
reg: r,
6065
settleReadTimeout: defaultSettleReadTimeout,
66+
storeReadTimeout: defaultStoreReadTimeout,
6167
partialStateMergesTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
6268
Name: "alertmanager_partial_state_merges_total",
6369
Help: "Number of times we have received a partial state to merge for a key.",
@@ -167,7 +173,22 @@ func (s *state) starting(ctx context.Context) error {
167173
}
168174
}
169175

170-
level.Info(s.logger).Log("msg", "state not settled but continuing anyway", "err", err)
176+
level.Info(s.logger).Log("msg", "state not settled; trying to read from storage", "err", err)
177+
178+
// Attempt to read the state from persistent storage instead.
179+
storeReadCtx, cancel := context.WithTimeout(ctx, s.storeReadTimeout)
180+
defer cancel()
181+
182+
fullState, err := s.store.GetFullState(storeReadCtx, s.userID)
183+
if err == nil {
184+
if err = s.mergeFullStates([]*clusterpb.FullState{fullState.State}); err == nil {
185+
level.Info(s.logger).Log("msg", "state read from storage; proceeding")
186+
return nil
187+
}
188+
}
189+
190+
level.Info(s.logger).Log("msg", "failed to read state from storage; continuing anyway", "err", err)
191+
171192
return nil
172193
}
173194

pkg/alertmanager/state_replication_test.go

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818

1919
"github.com/go-kit/kit/log"
2020

21+
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
22+
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
2123
"github.com/cortexproject/cortex/pkg/util/services"
2224
)
2325

@@ -76,6 +78,25 @@ func (f *fakeReplicator) ReadFullStateForUser(ctx context.Context, userID string
7678
return f.read.res, f.read.err
7779
}
7880

81+
type fakeAlertStore struct {
82+
alertstore.AlertStore
83+
84+
states map[string]alertspb.FullStateDesc
85+
}
86+
87+
func newFakeAlertStore() *fakeAlertStore {
88+
return &fakeAlertStore{
89+
states: make(map[string]alertspb.FullStateDesc),
90+
}
91+
}
92+
93+
func (f *fakeAlertStore) GetFullState(ctx context.Context, user string) (alertspb.FullStateDesc, error) {
94+
if result, ok := f.states[user]; ok {
95+
return result, nil
96+
}
97+
return alertspb.FullStateDesc{}, alertspb.ErrNotFound
98+
}
99+
79100
func TestStateReplication(t *testing.T) {
80101
tc := []struct {
81102
name string
@@ -102,7 +123,8 @@ func TestStateReplication(t *testing.T) {
102123
reg := prometheus.NewPedanticRegistry()
103124
replicator := newFakeReplicator()
104125
replicator.read = readStateResult{res: nil, err: nil}
105-
s := newReplicatedStates("user-1", tt.replicationFactor, replicator, log.NewNopLogger(), reg)
126+
store := newFakeAlertStore()
127+
s := newReplicatedStates("user-1", tt.replicationFactor, replicator, store, log.NewNopLogger(), reg)
106128

107129
require.False(t, s.Ready())
108130
{
@@ -163,6 +185,7 @@ func TestStateReplication_Settle(t *testing.T) {
163185
name string
164186
replicationFactor int
165187
read readStateResult
188+
states map[string]alertspb.FullStateDesc
166189
results map[string][][]byte
167190
}{
168191
{
@@ -228,9 +251,26 @@ func TestStateReplication_Settle(t *testing.T) {
228251
},
229252
},
230253
{
231-
name: "when reading the full state fails, still become ready.",
254+
name: "when reading from replicas fails, state is read from storage.",
255+
replicationFactor: 3,
256+
read: readStateResult{err: errors.New("Read Error 1")},
257+
states: map[string]alertspb.FullStateDesc{
258+
"user-1": {
259+
State: &clusterpb.FullState{
260+
Parts: []clusterpb.Part{{Key: "key1", Data: []byte("Datum1")}},
261+
},
262+
},
263+
},
264+
results: map[string][][]byte{
265+
"key1": {[]byte("Datum1")},
266+
"key2": nil,
267+
},
268+
},
269+
{
270+
name: "when reading from replicas and from storage fails, still become ready.",
232271
replicationFactor: 3,
233272
read: readStateResult{err: errors.New("Read Error 1")},
273+
states: map[string]alertspb.FullStateDesc{},
234274
results: map[string][][]byte{
235275
"key1": nil,
236276
"key2": nil,
@@ -253,7 +293,9 @@ func TestStateReplication_Settle(t *testing.T) {
253293

254294
replicator := newFakeReplicator()
255295
replicator.read = tt.read
256-
s := newReplicatedStates("user-1", tt.replicationFactor, replicator, log.NewNopLogger(), reg)
296+
store := newFakeAlertStore()
297+
store.states = tt.states
298+
s := newReplicatedStates("user-1", tt.replicationFactor, replicator, store, log.NewNopLogger(), reg)
257299

258300
key1State := &fakeState{}
259301
key2State := &fakeState{}
@@ -322,7 +364,7 @@ func TestStateReplication_GetFullState(t *testing.T) {
322364
for _, tt := range tc {
323365
t.Run(tt.name, func(t *testing.T) {
324366
reg := prometheus.NewPedanticRegistry()
325-
s := newReplicatedStates("user-1", 1, nil, log.NewNopLogger(), reg)
367+
s := newReplicatedStates("user-1", 1, nil, nil, log.NewNopLogger(), reg)
326368

327369
for key, datum := range tt.data {
328370
state := &fakeState{binary: datum}

0 commit comments

Comments
 (0)