Skip to content

Commit 2b94533

Browse files
authored
Optimise alertmanager config loading (#3898)
* Added ListAllUsers() to AlertStore Signed-off-by: Marco Pracucci <[email protected]> * Added local store unit tests Signed-off-by: Marco Pracucci <[email protected]> * Added GetAlertConfigs to AlertStore Signed-off-by: Marco Pracucci <[email protected]> * Replace ListAlertConfigs with ListAllUsers + GetAlertConfigs in MultitenantAlertmanager Signed-off-by: Marco Pracucci <[email protected]> * Removed unused ListAlertConfigs Signed-off-by: Marco Pracucci <[email protected]> * Replace noopAlertStore with a the filesystem-based storage Signed-off-by: Marco Pracucci <[email protected]> * Concurrently load alertmanager configs from object storage Signed-off-by: Marco Pracucci <[email protected]> * Added CHANGELOG entry Signed-off-by: Marco Pracucci <[email protected]> * Fixed PR number in CHANGELOG entry Signed-off-by: Marco Pracucci <[email protected]> * Fixed linter Signed-off-by: Marco Pracucci <[email protected]> * Addressed nits in reviews Signed-off-by: Marco Pracucci <[email protected]> * Improved unit tests Signed-off-by: Marco Pracucci <[email protected]> * Created concurrency.ForEach() utility which breaks on first error and used it Signed-off-by: Marco Pracucci <[email protected]> * Simplify alert store used in unit tests Signed-off-by: Marco Pracucci <[email protected]> * Simplified ForEachUser() and ForEach() utilities Signed-off-by: Marco Pracucci <[email protected]>
1 parent 9c46081 commit 2b94533

File tree

14 files changed

+612
-187
lines changed

14 files changed

+612
-187
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
* [ENHANCEMENT] Store-gateway: added metrics to chunk buffer pool behaviour. #3880
8989
* `cortex_bucket_store_chunk_pool_requested_bytes_total`
9090
* `cortex_bucket_store_chunk_pool_returned_bytes_total`
91+
* [ENHANCEMENT] Alertmanager: load alertmanager configurations from object storage concurrently, and only load necessary configurations, speeding configuration synchronization process and executing fewer "GET object" operations to the storage when sharding is enabled. #3898
9192
* [BUGFIX] Cortex: Fixed issue where fatal errors and various log messages where not logged. #3778
9293
* [BUGFIX] HA Tracker: don't track as error in the `cortex_kv_request_duration_seconds` metric a CAS operation intentionally aborted. #3745
9394
* [BUGFIX] Querier / ruler: do not log "error removing stale clients" if the ring is empty. #3761

pkg/alertmanager/alertstore/bucketclient/bucket_client.go

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"context"
66
"io/ioutil"
7+
"sync"
78

89
"github.com/go-kit/kit/log"
910
"github.com/pkg/errors"
@@ -12,11 +13,15 @@ import (
1213

1314
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
1415
"github.com/cortexproject/cortex/pkg/storage/bucket"
16+
"github.com/cortexproject/cortex/pkg/util/concurrency"
1517
)
1618

1719
const (
1820
// The bucket prefix under which all tenants alertmanager configs are stored.
1921
alertsPrefix = "alerts"
22+
23+
// How many users to load concurrently.
24+
fetchConcurrency = 16
2025
)
2126

2227
// BucketAlertStore is used to support the AlertStore interface against an object storage backend. It is implemented
@@ -35,27 +40,43 @@ func NewBucketAlertStore(bkt objstore.Bucket, cfgProvider bucket.TenantConfigPro
3540
}
3641
}
3742

38-
// ListAlertConfigs implements alertstore.AlertStore.
39-
func (s *BucketAlertStore) ListAlertConfigs(ctx context.Context) (map[string]alertspb.AlertConfigDesc, error) {
40-
cfgs := map[string]alertspb.AlertConfigDesc{}
43+
// ListAllUsers implements alertstore.AlertStore.
44+
func (s *BucketAlertStore) ListAllUsers(ctx context.Context) ([]string, error) {
45+
var userIDs []string
4146

4247
err := s.bucket.Iter(ctx, "", func(key string) error {
43-
userID := key
48+
userIDs = append(userIDs, key)
49+
return nil
50+
})
51+
52+
return userIDs, err
53+
}
54+
55+
// GetAlertConfigs implements alertstore.AlertStore.
56+
func (s *BucketAlertStore) GetAlertConfigs(ctx context.Context, userIDs []string) (map[string]alertspb.AlertConfigDesc, error) {
57+
var (
58+
cfgsMx = sync.Mutex{}
59+
cfgs = make(map[string]alertspb.AlertConfigDesc, len(userIDs))
60+
)
61+
62+
err := concurrency.ForEach(ctx, concurrency.CreateJobsFromStrings(userIDs), fetchConcurrency, func(ctx context.Context, job interface{}) error {
63+
userID := job.(string)
4464

4565
cfg, err := s.getAlertConfig(ctx, userID)
46-
if err != nil {
66+
if s.bucket.IsObjNotFoundErr(err) {
67+
return nil
68+
} else if err != nil {
4769
return errors.Wrapf(err, "failed to fetch alertmanager config for user %s", userID)
4870
}
4971

50-
cfgs[cfg.User] = cfg
72+
cfgsMx.Lock()
73+
cfgs[userID] = cfg
74+
cfgsMx.Unlock()
75+
5176
return nil
5277
})
5378

54-
if err != nil {
55-
return nil, err
56-
}
57-
58-
return cfgs, nil
79+
return cfgs, err
5980
}
6081

6182
// GetAlertConfig implements alertstore.AlertStore.

pkg/alertmanager/alertstore/configdb/store.go

Lines changed: 57 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -33,52 +33,48 @@ func NewStore(c client.Client) *Store {
3333
}
3434
}
3535

36-
// ListAlertConfigs implements alertstore.AlertStore.
37-
func (c *Store) ListAlertConfigs(ctx context.Context) (map[string]alertspb.AlertConfigDesc, error) {
38-
39-
configs, err := c.configClient.GetAlerts(ctx, c.since)
40-
36+
// ListAllUsers implements alertstore.AlertStore.
37+
func (c *Store) ListAllUsers(ctx context.Context) ([]string, error) {
38+
configs, err := c.reloadConfigs(ctx)
4139
if err != nil {
4240
return nil, err
4341
}
4442

45-
for user, cfg := range configs.Configs {
46-
if cfg.IsDeleted() {
47-
delete(c.alertConfigs, user)
48-
continue
49-
}
43+
userIDs := make([]string, 0, len(configs))
44+
for userID := range configs {
45+
userIDs = append(userIDs, userID)
46+
}
5047

51-
var templates []*alertspb.TemplateDesc
52-
for fn, template := range cfg.Config.TemplateFiles {
53-
templates = append(templates, &alertspb.TemplateDesc{
54-
Filename: fn,
55-
Body: template,
56-
})
57-
}
48+
return userIDs, nil
49+
}
5850

59-
c.alertConfigs[user] = alertspb.AlertConfigDesc{
60-
User: user,
61-
RawConfig: cfg.Config.AlertmanagerConfig,
62-
Templates: templates,
63-
}
51+
// GetAlertConfigs implements alertstore.AlertStore.
52+
func (c *Store) GetAlertConfigs(ctx context.Context, userIDs []string) (map[string]alertspb.AlertConfigDesc, error) {
53+
// Refresh the local state.
54+
configs, err := c.reloadConfigs(ctx)
55+
if err != nil {
56+
return nil, err
6457
}
6558

66-
c.since = configs.GetLatestConfigID()
59+
filtered := make(map[string]alertspb.AlertConfigDesc, len(userIDs))
60+
for _, userID := range userIDs {
61+
if cfg, ok := configs[userID]; ok {
62+
filtered[userID] = cfg
63+
}
64+
}
6765

68-
return c.alertConfigs, nil
66+
return filtered, nil
6967
}
7068

7169
// GetAlertConfig implements alertstore.AlertStore.
7270
func (c *Store) GetAlertConfig(ctx context.Context, user string) (alertspb.AlertConfigDesc, error) {
73-
74-
// Refresh the local state before fetching an specific one.
75-
_, err := c.ListAlertConfigs(ctx)
71+
// Refresh the local state.
72+
configs, err := c.reloadConfigs(ctx)
7673
if err != nil {
7774
return alertspb.AlertConfigDesc{}, err
7875
}
7976

80-
cfg, exists := c.alertConfigs[user]
81-
77+
cfg, exists := configs[user]
8278
if !exists {
8379
return alertspb.AlertConfigDesc{}, alertspb.ErrNotFound
8480
}
@@ -95,3 +91,35 @@ func (c *Store) SetAlertConfig(ctx context.Context, cfg alertspb.AlertConfigDesc
9591
func (c *Store) DeleteAlertConfig(ctx context.Context, user string) error {
9692
return errReadOnly
9793
}
94+
95+
func (c *Store) reloadConfigs(ctx context.Context) (map[string]alertspb.AlertConfigDesc, error) {
96+
configs, err := c.configClient.GetAlerts(ctx, c.since)
97+
if err != nil {
98+
return nil, err
99+
}
100+
101+
for user, cfg := range configs.Configs {
102+
if cfg.IsDeleted() {
103+
delete(c.alertConfigs, user)
104+
continue
105+
}
106+
107+
var templates []*alertspb.TemplateDesc
108+
for fn, template := range cfg.Config.TemplateFiles {
109+
templates = append(templates, &alertspb.TemplateDesc{
110+
Filename: fn,
111+
Body: template,
112+
})
113+
}
114+
115+
c.alertConfigs[user] = alertspb.AlertConfigDesc{
116+
User: user,
117+
RawConfig: cfg.Config.AlertmanagerConfig,
118+
Templates: templates,
119+
}
120+
}
121+
122+
c.since = configs.GetLatestConfigID()
123+
124+
return c.alertConfigs, nil
125+
}

pkg/alertmanager/alertstore/local/store.go

Lines changed: 62 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,65 @@ func NewStore(cfg StoreConfig) (*Store, error) {
4242
return &Store{cfg}, nil
4343
}
4444

45-
// ListAlertConfigs implements alertstore.AlertStore.
46-
func (f *Store) ListAlertConfigs(_ context.Context) (map[string]alertspb.AlertConfigDesc, error) {
45+
// ListAllUsers implements alertstore.AlertStore.
46+
func (f *Store) ListAllUsers(_ context.Context) ([]string, error) {
47+
configs, err := f.reloadConfigs()
48+
if err != nil {
49+
return nil, err
50+
}
51+
52+
userIDs := make([]string, 0, len(configs))
53+
for userID := range configs {
54+
userIDs = append(userIDs, userID)
55+
}
56+
57+
return userIDs, nil
58+
}
59+
60+
// GetAlertConfigs implements alertstore.AlertStore.
61+
func (f *Store) GetAlertConfigs(_ context.Context, userIDs []string) (map[string]alertspb.AlertConfigDesc, error) {
62+
configs, err := f.reloadConfigs()
63+
if err != nil {
64+
return nil, err
65+
}
66+
67+
filtered := make(map[string]alertspb.AlertConfigDesc, len(userIDs))
68+
for _, userID := range userIDs {
69+
if cfg, ok := configs[userID]; ok {
70+
filtered[userID] = cfg
71+
}
72+
}
73+
74+
return filtered, nil
75+
}
76+
77+
// GetAlertConfig implements alertstore.AlertStore.
78+
func (f *Store) GetAlertConfig(_ context.Context, user string) (alertspb.AlertConfigDesc, error) {
79+
cfgs, err := f.reloadConfigs()
80+
if err != nil {
81+
return alertspb.AlertConfigDesc{}, err
82+
}
83+
84+
cfg, exists := cfgs[user]
85+
86+
if !exists {
87+
return alertspb.AlertConfigDesc{}, alertspb.ErrNotFound
88+
}
89+
90+
return cfg, nil
91+
}
92+
93+
// SetAlertConfig implements alertstore.AlertStore.
94+
func (f *Store) SetAlertConfig(_ context.Context, cfg alertspb.AlertConfigDesc) error {
95+
return errReadOnly
96+
}
97+
98+
// DeleteAlertConfig implements alertstore.AlertStore.
99+
func (f *Store) DeleteAlertConfig(_ context.Context, user string) error {
100+
return errReadOnly
101+
}
102+
103+
func (f *Store) reloadConfigs() (map[string]alertspb.AlertConfigDesc, error) {
47104
configs := map[string]alertspb.AlertConfigDesc{}
48105
err := filepath.Walk(f.cfg.Path, func(path string, info os.FileInfo, err error) error {
49106
if err != nil {
@@ -59,13 +116,13 @@ func (f *Store) ListAlertConfigs(_ context.Context) (map[string]alertspb.AlertCo
59116
// Ensure the file is a valid Alertmanager Config.
60117
_, err = config.LoadFile(path)
61118
if err != nil {
62-
return errors.Wrap(err, "unable to load file "+path)
119+
return errors.Wrapf(err, "unable to load file %s", path)
63120
}
64121

65122
// Load the file to be returned by the store.
66123
content, err := ioutil.ReadFile(path)
67124
if err != nil {
68-
return errors.Wrap(err, "unable to read file "+path)
125+
return errors.Wrapf(err, "unable to read file %s", path)
69126
}
70127

71128
// The file name must correspond to the user tenant ID
@@ -78,35 +135,5 @@ func (f *Store) ListAlertConfigs(_ context.Context) (map[string]alertspb.AlertCo
78135
return nil
79136
})
80137

81-
if err != nil {
82-
return nil, err
83-
}
84-
85-
return configs, nil
86-
}
87-
88-
// GetAlertConfig implements alertstore.AlertStore.
89-
func (f *Store) GetAlertConfig(ctx context.Context, user string) (alertspb.AlertConfigDesc, error) {
90-
cfgs, err := f.ListAlertConfigs(ctx)
91-
if err != nil {
92-
return alertspb.AlertConfigDesc{}, err
93-
}
94-
95-
cfg, exists := cfgs[user]
96-
97-
if !exists {
98-
return alertspb.AlertConfigDesc{}, alertspb.ErrNotFound
99-
}
100-
101-
return cfg, nil
102-
}
103-
104-
// SetAlertConfig implements alertstore.AlertStore.
105-
func (f *Store) SetAlertConfig(_ context.Context, cfg alertspb.AlertConfigDesc) error {
106-
return errReadOnly
107-
}
108-
109-
// DeleteAlertConfig implements alertstore.AlertStore.
110-
func (f *Store) DeleteAlertConfig(_ context.Context, user string) error {
111-
return errReadOnly
138+
return configs, err
112139
}

0 commit comments

Comments
 (0)