Skip to content
This repository was archived by the owner on Aug 4, 2021. It is now read-only.

Commit d6c1747

Browse files
Add support for leader election
In HA setup two or more Prometheus instances scrape the same data and send it to adapters. We want to prevent multiple adapters to write the same/similar data (no consistency guarantees in HA Prometheus setup) in parallel. The idea is to use leader election and allow only one adapter (leader) to write to the database. Provided leader election implementation relies on PostgreSQL advisory locks and provides at least once semantics for metrics data (some duplicates possible during failover and re-election).
1 parent dd23566 commit d6c1747

File tree

6 files changed

+301
-40
lines changed

6 files changed

+301
-40
lines changed

Gopkg.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

main.go

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/prometheus/client_golang/prometheus"
3636
"github.com/prometheus/common/model"
3737

38+
"database/sql"
3839
"fmt"
3940
"github.com/prometheus/client_model/go"
4041
"github.com/prometheus/prometheus/prompb"
@@ -47,6 +48,7 @@ type config struct {
4748
pgPrometheusConfig pgprometheus.Config
4849
logLevel string
4950
readOnly bool
51+
haGroupLockId int
5052
}
5153

5254
const (
@@ -91,6 +93,7 @@ var (
9193
[]string{"path"},
9294
)
9395
writeThroughtput = util.NewThroughputCalc(tickInterval)
96+
election util.Election
9497
)
9598

9699
func init() {
@@ -110,6 +113,7 @@ func main() {
110113
http.Handle(cfg.telemetryPath, prometheus.Handler())
111114

112115
writer, reader := buildClients(cfg)
116+
election = initElection(cfg, writer.(*pgprometheus.Client).DB)
113117

114118
http.Handle("/write", timeHandler("write", write(writer)))
115119
http.Handle("/read", timeHandler("read", read(reader)))
@@ -137,6 +141,7 @@ func parseFlags() *config {
137141
flag.StringVar(&cfg.telemetryPath, "web.telemetry-path", "/metrics", "Address to listen on for web endpoints.")
138142
flag.StringVar(&cfg.logLevel, "log.level", "debug", "The log level to use [ \"error\", \"warn\", \"info\", \"debug\" ].")
139143
flag.BoolVar(&cfg.readOnly, "read.only", false, "Read-only mode. Don't write to database.")
144+
flag.IntVar(&cfg.haGroupLockId, "ha.group-advisory-lock-id", 0, "Unique advisory lock id per adapter high-availability group. Set it if you want to use leader election implementation based on PostgreSQL advisory lock")
140145

141146
flag.Parse()
142147

@@ -173,6 +178,21 @@ func buildClients(cfg *config) (writer, reader) {
173178
return pgClient, pgClient
174179
}
175180

181+
func initElection(cfg *config, db *sql.DB) util.Election {
182+
if cfg.haGroupLockId != 0 {
183+
lock, err := util.NewPgAdvisoryLock(cfg.haGroupLockId, db)
184+
if err != nil {
185+
log.Error("msg", "Error creating advisory lock", "haGroupLockId", cfg.haGroupLockId, "err", err)
186+
os.Exit(1)
187+
}
188+
log.Info("msg", "Initialized leader election based on PostgreSQL advisory lock")
189+
return lock
190+
} else {
191+
log.Warn("msg", "No adapter leader election. Group lock id is not set. Possible duplicate write load if running adapter in high-availability mode")
192+
return nil
193+
}
194+
}
195+
176196
func write(writer writer) http.Handler {
177197
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
178198
compressed, err := ioutil.ReadAll(r.Body)
@@ -306,7 +326,31 @@ func protoToSamples(req *prompb.WriteRequest) model.Samples {
306326

307327
func sendSamples(w writer, samples model.Samples) error {
308328
begin := time.Now()
309-
err := w.Write(samples)
329+
shouldWrite := true
330+
if election != nil {
331+
isLeader, err := election.IsLeader()
332+
if err != nil {
333+
log.Error("msg", "IsLeader check failed", "err", err)
334+
return err
335+
}
336+
if isLeader {
337+
shouldWrite = isLeader
338+
} else {
339+
isLeader, err := election.BecomeLeader()
340+
if err != nil {
341+
log.Error("msg", "Error occurred while trying to become a leader", "err", err)
342+
return err
343+
}
344+
shouldWrite = isLeader
345+
}
346+
}
347+
var err error
348+
if shouldWrite {
349+
err = w.Write(samples)
350+
} else {
351+
log.Debug("msg", fmt.Sprintf("Election id %v: Instance is not a leader. Can't write data", election.Id()))
352+
return nil
353+
}
310354
duration := time.Since(begin).Seconds()
311355
if err != nil {
312356
failedSamples.WithLabelValues(w.Name()).Add(float64(len(samples)))

postgresql/client.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func ParseFlags(cfg *Config) *Config {
6464

6565
// Client sends Prometheus samples to PostgreSQL
6666
type Client struct {
67-
db *sql.DB
67+
DB *sql.DB
6868
cfg *Config
6969
}
7070

@@ -101,7 +101,7 @@ func NewClient(cfg *Config) *Client {
101101
db.SetMaxIdleConns(cfg.maxIdleConns)
102102

103103
client := &Client{
104-
db: db,
104+
DB: db,
105105
cfg: cfg,
106106
}
107107

@@ -121,7 +121,7 @@ func NewClient(cfg *Config) *Client {
121121
}
122122

123123
func (c *Client) setupPgPrometheus() error {
124-
tx, err := c.db.Begin()
124+
tx, err := c.DB.Begin()
125125

126126
if err != nil {
127127
return err
@@ -193,7 +193,7 @@ func metricString(m model.Metric) string {
193193
// Write implements the Writer interface and writes metric samples to the database
194194
func (c *Client) Write(samples model.Samples) error {
195195
begin := time.Now()
196-
tx, err := c.db.Begin()
196+
tx, err := c.DB.Begin()
197197

198198
if err != nil {
199199
log.Error("msg", "Error on Begin when writing samples", "err", err)
@@ -360,7 +360,7 @@ func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) {
360360

361361
log.Debug("msg", "Executed query", "query", command)
362362

363-
rows, err := c.db.Query(command)
363+
rows, err := c.DB.Query(command)
364364

365365
if err != nil {
366366
return nil, err
@@ -439,7 +439,7 @@ func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) {
439439

440440
// HealthCheck implements the healtcheck interface
441441
func (c *Client) HealthCheck() error {
442-
rows, err := c.db.Query("SELECT 1")
442+
rows, err := c.DB.Query("SELECT 1")
443443

444444
if err != nil {
445445
log.Debug("msg", "Health check error", "err", err)

postgresql/client_test.go

Lines changed: 66 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/prometheus/common/model"
88
"github.com/prometheus/prometheus/prompb"
99
"github.com/timescale/prometheus-postgresql-adapter/log"
10+
"github.com/timescale/prometheus-postgresql-adapter/util"
1011
"testing"
1112
)
1213

@@ -30,17 +31,17 @@ func TestBuildCommand(t *testing.T) {
3031
StartTimestampMs: 0,
3132
EndTimestampMs: 20000,
3233
Matchers: []*prompb.LabelMatcher{
33-
&prompb.LabelMatcher{
34+
{
3435
Type: prompb.LabelMatcher_EQ,
3536
Name: "__name__",
3637
Value: "cpu_usage",
3738
},
38-
&prompb.LabelMatcher{
39+
{
3940
Type: prompb.LabelMatcher_EQ,
4041
Name: "job",
4142
Value: "nginx",
4243
},
43-
&prompb.LabelMatcher{
44+
{
4445
Type: prompb.LabelMatcher_RE,
4546
Name: "host",
4647
Value: "local.*",
@@ -58,30 +59,7 @@ func TestBuildCommand(t *testing.T) {
5859
}
5960

6061
func TestWriteCommand(t *testing.T) {
61-
flag.Parse()
62-
if len(*database) == 0 {
63-
t.Skip()
64-
}
65-
66-
db, err := sql.Open("postgres", "host=localhost user=postgres sslmode=disable")
67-
if err != nil {
68-
t.Fatal(err)
69-
}
70-
71-
_, err = db.Exec(fmt.Sprintf("DROP DATABASE IF EXISTS %s", *database))
72-
if err != nil {
73-
t.Fatal(err)
74-
}
75-
76-
_, err = db.Exec(fmt.Sprintf("CREATE DATABASE %s", *database))
77-
if err != nil {
78-
t.Fatal(err)
79-
}
80-
81-
err = db.Close()
82-
if err != nil {
83-
t.Fatal(err)
84-
}
62+
dbSetup(t)
8563

8664
cfg := &Config{}
8765
ParseFlags(cfg)
@@ -90,30 +68,30 @@ func TestWriteCommand(t *testing.T) {
9068
c := NewClient(cfg)
9169

9270
sample := []*model.Sample{
93-
&model.Sample{
71+
{
9472
Metric: model.Metric{
9573
model.MetricNameLabel: "test_metric",
9674
"label1": "1",
9775
},
9876
Value: 123.1,
9977
Timestamp: 1234567,
10078
},
101-
&model.Sample{
79+
{
10280
Metric: model.Metric{
10381
model.MetricNameLabel: "test_metric",
10482
"label1": "1",
10583
},
10684
Value: 123.2,
10785
Timestamp: 1234568,
10886
},
109-
&model.Sample{
87+
{
11088
Metric: model.Metric{
11189
model.MetricNameLabel: "test_metric",
11290
},
11391
Value: 123.2,
11492
Timestamp: 1234569,
11593
},
116-
&model.Sample{
94+
{
11795
Metric: model.Metric{
11896
model.MetricNameLabel: "test_metric_2",
11997
"label1": "1",
@@ -125,7 +103,7 @@ func TestWriteCommand(t *testing.T) {
125103

126104
c.Write(sample)
127105

128-
db, err = sql.Open("postgres", fmt.Sprintf("host=localhost dbname=%s user=postgres sslmode=disable", *database))
106+
db, err := sql.Open("postgres", fmt.Sprintf("host=localhost dbname=%s user=postgres sslmode=disable", *database))
129107
if err != nil {
130108
t.Fatal(err)
131109
}
@@ -140,3 +118,59 @@ func TestWriteCommand(t *testing.T) {
140118
t.Fatal("Wrong cnt: ", cnt)
141119
}
142120
}
121+
122+
func TestPgAdvisoryLock(t *testing.T) {
123+
db := dbSetup(t)
124+
lock, err := util.NewPgAdvisoryLock(1, db)
125+
if err != nil {
126+
t.Fatal(err)
127+
}
128+
if !lock.Locked() {
129+
t.Error("Couldn't obtain the lock")
130+
}
131+
132+
newLock, err := util.NewPgAdvisoryLock(1, db)
133+
if err != nil {
134+
t.Fatal(err)
135+
}
136+
if newLock.Locked() {
137+
t.Error("Lock should have already been taken")
138+
}
139+
140+
if err = lock.Release(); err != nil {
141+
t.Errorf("Failed to release a lock. Error: %v", err)
142+
}
143+
144+
if lock.Locked() {
145+
t.Error("Should be unlocked after release")
146+
}
147+
148+
newLock.TryLock()
149+
150+
if !newLock.Locked() {
151+
t.Error("New lock should take over")
152+
}
153+
}
154+
155+
func dbSetup(t *testing.T) *sql.DB {
156+
flag.Parse()
157+
if len(*database) == 0 {
158+
t.Skip()
159+
}
160+
161+
db, err := sql.Open("postgres", "host=localhost user=postgres sslmode=disable")
162+
if err != nil {
163+
t.Fatal(err)
164+
}
165+
166+
_, err = db.Exec(fmt.Sprintf("DROP DATABASE IF EXISTS %s", *database))
167+
if err != nil {
168+
t.Fatal(err)
169+
}
170+
171+
_, err = db.Exec(fmt.Sprintf("CREATE DATABASE %s", *database))
172+
if err != nil {
173+
t.Fatal(err)
174+
}
175+
return db
176+
}

util/election.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package util
2+
3+
// Election defines an interface for adapter leader election.
4+
// If you are running Prometheus in HA mode where each Prometheus instance sends data to corresponding adapter you probably
5+
// want to allow writes into the database from only one adapter at the time. We need to elect a leader who can write to
6+
// the database. If leader goes down, another leader is elected. Look at `lock.go` for an implementation based on PostgreSQL
7+
// advisory locks. Should be easy to plug in different leader election implementations.
8+
type Election interface {
9+
Id() string
10+
BecomeLeader() (bool, error)
11+
IsLeader() (bool, error)
12+
Resign() error
13+
}

0 commit comments

Comments
 (0)