Skip to content
This repository was archived by the owner on Aug 4, 2021. It is now read-only.

Commit 194f4c0

Browse files
Add support for leader election
In HA setup two or more Prometheus instances scrape the same data and send it to adapters. We want to prevent multiple adapters to write the same/similar data (no consistency guarantees in HA Prometheus setup) in parallel. The idea is to use leader election and allow only one adapter (leader) to write to the database. Provided leader election implementation relies on PostgreSQL advisory locks and does not provide strong data guarantess (some duplicates or data loss possible during failover - btw Prometheus HA doesn't provide any guarantees that multiple Prometheus nodes would see the same data). REST interface for leader election should enable plugging in any external leader election system you might use already (eg. Zookeeper)
1 parent dd23566 commit 194f4c0

File tree

8 files changed

+530
-51
lines changed

8 files changed

+530
-51
lines changed

Gopkg.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

main.go

Lines changed: 53 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/prometheus/client_golang/prometheus"
3636
"github.com/prometheus/common/model"
3737

38+
"database/sql"
3839
"fmt"
3940
"github.com/prometheus/client_model/go"
4041
"github.com/prometheus/prometheus/prompb"
@@ -47,6 +48,8 @@ type config struct {
4748
pgPrometheusConfig pgprometheus.Config
4849
logLevel string
4950
readOnly bool
51+
haGroupLockId int
52+
restElection bool
5053
}
5154

5255
const (
@@ -90,7 +93,8 @@ var (
9093
},
9194
[]string{"path"},
9295
)
93-
writeThroughtput = util.NewThroughputCalc(tickInterval)
96+
writeThroughput = util.NewThroughputCalc(tickInterval)
97+
elector *util.Elector
9498
)
9599

96100
func init() {
@@ -99,7 +103,7 @@ func init() {
99103
prometheus.MustRegister(failedSamples)
100104
prometheus.MustRegister(sentBatchDuration)
101105
prometheus.MustRegister(httpRequestDuration)
102-
writeThroughtput.Start()
106+
writeThroughput.Start()
103107
}
104108

105109
func main() {
@@ -110,6 +114,12 @@ func main() {
110114
http.Handle(cfg.telemetryPath, prometheus.Handler())
111115

112116
writer, reader := buildClients(cfg)
117+
pgClient, ok := writer.(*pgprometheus.Client)
118+
if ok {
119+
elector = initElector(cfg, pgClient.DB)
120+
} else {
121+
log.Info("msg", "Running in read-only mode. This instance can't participate in leader election")
122+
}
113123

114124
http.Handle("/write", timeHandler("write", write(writer)))
115125
http.Handle("/read", timeHandler("read", read(reader)))
@@ -137,7 +147,8 @@ func parseFlags() *config {
137147
flag.StringVar(&cfg.telemetryPath, "web.telemetry-path", "/metrics", "Address to listen on for web endpoints.")
138148
flag.StringVar(&cfg.logLevel, "log.level", "debug", "The log level to use [ \"error\", \"warn\", \"info\", \"debug\" ].")
139149
flag.BoolVar(&cfg.readOnly, "read.only", false, "Read-only mode. Don't write to database.")
140-
150+
flag.IntVar(&cfg.haGroupLockId, "leader-election.pg-advisory-lock-id", 0, "Unique advisory lock id per adapter high-availability group. Set it if you want to use leader election implementation based on PostgreSQL advisory lock")
151+
flag.BoolVar(&cfg.restElection, "leader-election.rest", false, "Enable REST interface for the leader election")
141152
flag.Parse()
142153

143154
return cfg
@@ -173,6 +184,28 @@ func buildClients(cfg *config) (writer, reader) {
173184
return pgClient, pgClient
174185
}
175186

187+
func initElector(cfg *config, db *sql.DB) *util.Elector {
188+
if cfg.restElection && cfg.haGroupLockId != 0 {
189+
log.Error("msg", "Use either REST or PgAdvisoryLock for the leader election")
190+
os.Exit(1)
191+
}
192+
if cfg.restElection {
193+
return util.NewElector(util.NewRestElection())
194+
}
195+
if cfg.haGroupLockId != 0 {
196+
lock, err := util.NewPgAdvisoryLock(cfg.haGroupLockId, db)
197+
if err != nil {
198+
log.Error("msg", "Error creating advisory lock", "haGroupLockId", cfg.haGroupLockId, "err", err)
199+
os.Exit(1)
200+
}
201+
log.Info("msg", "Initialized leader election based on PostgreSQL advisory lock")
202+
return util.NewElector(lock)
203+
} else {
204+
log.Warn("msg", "No adapter leader election. Group lock id is not set. Possible duplicate write load if running adapter in high-availability mode")
205+
return nil
206+
}
207+
}
208+
176209
func write(writer writer) http.Handler {
177210
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
178211
compressed, err := ioutil.ReadAll(r.Body)
@@ -208,10 +241,10 @@ func write(writer writer) http.Handler {
208241
if err != nil {
209242
log.Warn("msg", "Couldn't get a counter", "labelValue", writer.Name(), "err", err)
210243
}
211-
writeThroughtput.SetCurrent(getCounterValue(counter))
244+
writeThroughput.SetCurrent(getCounterValue(counter))
212245

213246
select {
214-
case d := <-writeThroughtput.Values:
247+
case d := <-writeThroughput.Values:
215248
log.Info("msg", "Samples write throughput", "samples/sec", d)
216249
default:
217250
}
@@ -306,7 +339,21 @@ func protoToSamples(req *prompb.WriteRequest) model.Samples {
306339

307340
func sendSamples(w writer, samples model.Samples) error {
308341
begin := time.Now()
309-
err := w.Write(samples)
342+
shouldWrite := true
343+
var err error
344+
if elector != nil {
345+
shouldWrite, err = elector.IsLeader()
346+
if err != nil {
347+
log.Error("msg", "IsLeader check failed", "err", err)
348+
return err
349+
}
350+
}
351+
if shouldWrite {
352+
err = w.Write(samples)
353+
} else {
354+
log.Debug("msg", fmt.Sprintf("Election id %v: Instance is not a leader. Can't write data", elector.Id()))
355+
return nil
356+
}
310357
duration := time.Since(begin).Seconds()
311358
if err != nil {
312359
failedSamples.WithLabelValues(w.Name()).Add(float64(len(samples)))

postgresql/client.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func ParseFlags(cfg *Config) *Config {
6464

6565
// Client sends Prometheus samples to PostgreSQL
6666
type Client struct {
67-
db *sql.DB
67+
DB *sql.DB
6868
cfg *Config
6969
}
7070

@@ -101,7 +101,7 @@ func NewClient(cfg *Config) *Client {
101101
db.SetMaxIdleConns(cfg.maxIdleConns)
102102

103103
client := &Client{
104-
db: db,
104+
DB: db,
105105
cfg: cfg,
106106
}
107107

@@ -121,7 +121,7 @@ func NewClient(cfg *Config) *Client {
121121
}
122122

123123
func (c *Client) setupPgPrometheus() error {
124-
tx, err := c.db.Begin()
124+
tx, err := c.DB.Begin()
125125

126126
if err != nil {
127127
return err
@@ -193,7 +193,7 @@ func metricString(m model.Metric) string {
193193
// Write implements the Writer interface and writes metric samples to the database
194194
func (c *Client) Write(samples model.Samples) error {
195195
begin := time.Now()
196-
tx, err := c.db.Begin()
196+
tx, err := c.DB.Begin()
197197

198198
if err != nil {
199199
log.Error("msg", "Error on Begin when writing samples", "err", err)
@@ -360,7 +360,7 @@ func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) {
360360

361361
log.Debug("msg", "Executed query", "query", command)
362362

363-
rows, err := c.db.Query(command)
363+
rows, err := c.DB.Query(command)
364364

365365
if err != nil {
366366
return nil, err
@@ -439,7 +439,7 @@ func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) {
439439

440440
// HealthCheck implements the healtcheck interface
441441
func (c *Client) HealthCheck() error {
442-
rows, err := c.db.Query("SELECT 1")
442+
rows, err := c.DB.Query("SELECT 1")
443443

444444
if err != nil {
445445
log.Debug("msg", "Health check error", "err", err)

postgresql/client_test.go

Lines changed: 66 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/prometheus/common/model"
88
"github.com/prometheus/prometheus/prompb"
99
"github.com/timescale/prometheus-postgresql-adapter/log"
10+
"github.com/timescale/prometheus-postgresql-adapter/util"
1011
"testing"
1112
)
1213

@@ -30,17 +31,17 @@ func TestBuildCommand(t *testing.T) {
3031
StartTimestampMs: 0,
3132
EndTimestampMs: 20000,
3233
Matchers: []*prompb.LabelMatcher{
33-
&prompb.LabelMatcher{
34+
{
3435
Type: prompb.LabelMatcher_EQ,
3536
Name: "__name__",
3637
Value: "cpu_usage",
3738
},
38-
&prompb.LabelMatcher{
39+
{
3940
Type: prompb.LabelMatcher_EQ,
4041
Name: "job",
4142
Value: "nginx",
4243
},
43-
&prompb.LabelMatcher{
44+
{
4445
Type: prompb.LabelMatcher_RE,
4546
Name: "host",
4647
Value: "local.*",
@@ -58,30 +59,7 @@ func TestBuildCommand(t *testing.T) {
5859
}
5960

6061
func TestWriteCommand(t *testing.T) {
61-
flag.Parse()
62-
if len(*database) == 0 {
63-
t.Skip()
64-
}
65-
66-
db, err := sql.Open("postgres", "host=localhost user=postgres sslmode=disable")
67-
if err != nil {
68-
t.Fatal(err)
69-
}
70-
71-
_, err = db.Exec(fmt.Sprintf("DROP DATABASE IF EXISTS %s", *database))
72-
if err != nil {
73-
t.Fatal(err)
74-
}
75-
76-
_, err = db.Exec(fmt.Sprintf("CREATE DATABASE %s", *database))
77-
if err != nil {
78-
t.Fatal(err)
79-
}
80-
81-
err = db.Close()
82-
if err != nil {
83-
t.Fatal(err)
84-
}
62+
dbSetup(t)
8563

8664
cfg := &Config{}
8765
ParseFlags(cfg)
@@ -90,30 +68,30 @@ func TestWriteCommand(t *testing.T) {
9068
c := NewClient(cfg)
9169

9270
sample := []*model.Sample{
93-
&model.Sample{
71+
{
9472
Metric: model.Metric{
9573
model.MetricNameLabel: "test_metric",
9674
"label1": "1",
9775
},
9876
Value: 123.1,
9977
Timestamp: 1234567,
10078
},
101-
&model.Sample{
79+
{
10280
Metric: model.Metric{
10381
model.MetricNameLabel: "test_metric",
10482
"label1": "1",
10583
},
10684
Value: 123.2,
10785
Timestamp: 1234568,
10886
},
109-
&model.Sample{
87+
{
11088
Metric: model.Metric{
11189
model.MetricNameLabel: "test_metric",
11290
},
11391
Value: 123.2,
11492
Timestamp: 1234569,
11593
},
116-
&model.Sample{
94+
{
11795
Metric: model.Metric{
11896
model.MetricNameLabel: "test_metric_2",
11997
"label1": "1",
@@ -125,7 +103,7 @@ func TestWriteCommand(t *testing.T) {
125103

126104
c.Write(sample)
127105

128-
db, err = sql.Open("postgres", fmt.Sprintf("host=localhost dbname=%s user=postgres sslmode=disable", *database))
106+
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost dbname=%s user=postgres sslmode=disable", *database))
129107
if err != nil {
130108
t.Fatal(err)
131109
}
@@ -140,3 +118,59 @@ func TestWriteCommand(t *testing.T) {
140118
t.Fatal("Wrong cnt: ", cnt)
141119
}
142120
}
121+
122+
func TestPgAdvisoryLock(t *testing.T) {
123+
db := dbSetup(t)
124+
lock, err := util.NewPgAdvisoryLock(1, db)
125+
if err != nil {
126+
t.Fatal(err)
127+
}
128+
if !lock.Locked() {
129+
t.Error("Couldn't obtain the lock")
130+
}
131+
132+
newLock, err := util.NewPgAdvisoryLock(1, db)
133+
if err != nil {
134+
t.Fatal(err)
135+
}
136+
if newLock.Locked() {
137+
t.Error("Lock should have already been taken")
138+
}
139+
140+
if err = lock.Release(); err != nil {
141+
t.Errorf("Failed to release a lock. Error: %v", err)
142+
}
143+
144+
if lock.Locked() {
145+
t.Error("Should be unlocked after release")
146+
}
147+
148+
newLock.TryLock()
149+
150+
if !newLock.Locked() {
151+
t.Error("New lock should take over")
152+
}
153+
}
154+
155+
func dbSetup(t *testing.T) *sql.DB {
156+
flag.Parse()
157+
if len(*database) == 0 {
158+
t.Skip()
159+
}
160+
161+
db, err := sql.Open("postgres", "host=localhost user=postgres sslmode=disable")
162+
if err != nil {
163+
t.Fatal(err)
164+
}
165+
166+
_, err = db.Exec(fmt.Sprintf("DROP DATABASE IF EXISTS %s", *database))
167+
if err != nil {
168+
t.Fatal(err)
169+
}
170+
171+
_, err = db.Exec(fmt.Sprintf("CREATE DATABASE %s", *database))
172+
if err != nil {
173+
t.Fatal(err)
174+
}
175+
return db
176+
}

0 commit comments

Comments
 (0)