Skip to content
This repository was archived by the owner on Aug 4, 2021. It is now read-only.

Leader election for Prometheus HA setup #37

Merged

Conversation

niksajakovljevic
Copy link
Contributor

@niksajakovljevic niksajakovljevic commented Sep 6, 2018

In HA setup two or more Prometheus instances scrape the same data and send it to adapters.
We want to prevent multiple adapters to write the same/similar data (no consistency guarantees in HA Prometheus setup) in parallel. The idea is to use leader election and allow only one adapter (leader) to write to the database. Provided leader election implementation relies on PostgreSQL advisory locks and does not provide strong data guarantees (some duplicates or data loss possible during failover). REST interface for leader election should enable plugging in any external leader election system you might use already (eg. Zookeeper, Consul...)

@@ -64,7 +64,7 @@ func ParseFlags(cfg *Config) *Config {

// Client sends Prometheus samples to PostgreSQL
type Client struct {
db *sql.DB
Db *sql.DB
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the go linter prefers this to be DB instead of Db

util/lock.go Outdated
connCheckInterval = time.Millisecond * 250
)

// One implementation of leader election protocol based on PostgreSQL advisory locks. All adapters withing a HA group are trying
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same godoc thing as above

@RobAtticus
Copy link
Member

Let's start using the same standards for git commit messages as we do for main repo, so basically squash all the commits being merging.

Also maybe change the first line to Add support for leader election...?

util/lock.go Outdated
}
l.conn = conn
}
rows, err := l.conn.QueryContext(context.Background(), fmt.Sprintf("SELECT pg_try_advisory_lock(%d)", l.groupLockId))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using sprintf for creating sql statements should be avoided. You should use something like the following:

QueryContext(context.Background(),"SELECT pg_try_advisory_lock(?)", l.groupLockId)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason pq driver does not support that syntax when invoking SQL functions. I tried even SELECT pg_try_advisory_lock(key => $1) but no luck

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think correct syntax is SELECT pg_try_advisory_lock($1) which works for me

util/lock.go Outdated
func (l *PgAdvisoryLock) TryLock() (bool, error) {
l.mutex.Lock()
if l.closed {
return false, fmt.Errorf("lock is closed. please create a new lock")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you define the defer that unlocks l.mutex below this return, is it possible that a deadlock might be reached, or am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! Although if the lock is closed we return an error but repeated invocation can cause the deadlock

Copy link

@amytai amytai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super familiar with this codebase, but is there any way to write an regression test with 2 clients that are actually running the leader election on different processes? And making sure that 1) only one is the leader, 2) that there is proper failover when one of the clients dies / releases the lock?

// to metrics that is written (some duplicates or data loss are possible during failover)
type PgAdvisoryLock struct {
conn *sql.Conn
mutex sync.RWMutex
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually not sure if this mutex is doing anything. It's not like a single PgAdvisoryLock struct is going to be shared across multiple clients? So you don't need to serialize access to this struct because it's only ever access in a single-threaded manner by the client that instantiated it..?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually we have one lock per app and it is being shared among different go routines(http requests)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, OK, this was my confusion with the test as well. I saw the test that you wrote, but that wasn't what I had in mind, because I thought you would want to test when clients are in completely different processes. But I guess that's not how the adapter is executed. I'm still not seeing how a single PgAdvisoryLock is shared across (requests?) but maybe that is code that is outside of this PR. I'll ask you offline.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO having separate processes would not bring any benefit to testing but rather increase test complexity/fragility a lot. PgAdvisoryLock is being shared among go routines which are created on each request: https://golang.org/pkg/net/http/#Server.Serve

}

func (l *PgAdvisoryLock) Release() error {
l.mutex.Lock()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to check that Assert(obtained)? Otherwise another client could accidentally (or maliciously) unlock the lock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case we would return an error but I agree that it's a good practice to check the boolean state first.

@niksajakovljevic
Copy link
Contributor Author

Copy link
Member

@erimatnor erimatnor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this implementation would benefit from separating the two concerns of:

  1. Setting the "leader"
  2. Picking a leader, where using advisory locks is one particular way to set 1

Now you have a configuration parameter to set read-only mode, but no way to promote the instance at runtime to write mode. If you would make this possible, then the advisory lock stuff is just a way to toggle read vs write mode, while you could also plug in other ways to set read/write mode (preferably by external means).

The problem I have with this implementation is that the read-only mode and leader-election with advisory locks seem to be two disjoint mechanisms, while the latter, IMO, should just be a way to set or unset the former.

main.go Outdated
err := w.Write(samples)
shouldWrite := true
if election != nil {
isLeader, err := election.IsLeader()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isLeader seems redundant. Can't you just use shouldWrite here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, again I did it only to make it easier understanding what's going on. I am not strongly opinionated on this one, so we can remove that extra var?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was actually confusing to me to see both, since I first tried to figure out the difference between the two variables, given their names. But this might be a personal thing.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm with Erik, I having both is more confusing.

util/lock.go Outdated
connCheckInterval = time.Millisecond * 250
)

// PgAdvisoryLock is implementation of leader election protocol based on PostgreSQL advisory locks. All adapters withing a HA group are trying
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While not strictly wrong, I think calling this a "leader election protocol" is a bit of a stretch as I think most people would associate this with some kind of distributed algorithm (e.g., quorum-based).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a feeling that locks are pretty common way to coordinate and implement leader election (Consul, Zookeeper), however I am open for better wording if you have some idea?

Copy link
Member

@erimatnor erimatnor Sep 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the point is that lock services like Zookeeper and Consul both implement a distributed leader election protocol (such as Raft or Paxos) among their respective groups/ensembles. That's what is typically meant by a "leader election protocol", specifically. I guess what trips me up a bit is the use of "protocol". Just removing "protocol" would lessen the confusion I think, e.g.,. PgAdvisoryLock is an implementation of leader election using PostgreSQL advisory locks.

util/lock.go Outdated
}
defer rows.Close()
if !rows.Next() {
return false, fmt.Errorf("error while trying to read reponse rows from `pg_try_advisory_lock` function")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reponse -> response

util/lock.go Outdated
return false, fmt.Errorf("error while trying to read reponse rows from `pg_try_advisory_lock` function")
}
var success bool
if err := rows.Scan(&success); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't you just pass &l.obtained to Scan? This avoids the extra variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I just thought that using extra var makes it more readable.

util/lock.go Outdated
return nil
}

func (l *PgAdvisoryLock) connCheck() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider using the Listener functionality in pq instead of doing polling? It seems you can get an event notification when disconnected.

https://godoc.org/github.com/lib/pq#NewListener

Copy link
Contributor Author

@niksajakovljevic niksajakovljevic Sep 11, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I did consider it, but ruled it out as not a good fit for this use case since we only need a simple health-check. Also Listener creates a new DB connection dedicated to LISTEN/NOTIFY where we only need to check the existing connection.

util/lock.go Outdated
}

func checkConnection(conn *sql.Conn) error {
_, err := conn.ExecContext(context.Background(), "SELECT 1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least newer versions of the sql lib has a PingContext to verify database connections. Should probably consider using that.

https://golang.org/pkg/database/sql/#Conn.PingContext

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it, but apparently that's not supported by pq driver: lib/pq#533

@niksajakovljevic niksajakovljevic force-pushed the ha-leader-election branch 2 times, most recently from 1198aa2 to 564d9ca Compare September 11, 2018 13:55
@niksajakovljevic
Copy link
Contributor Author

@erimatnor I fully understand your concerns. As we discussed the semantics of read.only is a bit different so I would keep it separate. I added additional Election implementation that provides REST interface so one can plug in any external leader election mechanism (eg. Zookeeper, Consul..)

if !lock.Locked() {
t.Error("Couldn't obtain the lock")
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, now that I'm looking at this again, I think it's fine to leave out the comment I suggested. This logic tests that the PgAdvisoryLock works on the Postgres-side, which is a valid micro-test.

Instead, I would be happier if there were a test that instantiates an election and tests it with two "clients" / requests (even though these requests might be sequential). Does that make sense? Or maybe I am once again misunderstanding what's going on in Go.
It looks like you test the RestElection below, but what about the regular election you had implemented?

main.go Outdated
@@ -137,7 +147,8 @@ func parseFlags() *config {
flag.StringVar(&cfg.telemetryPath, "web.telemetry-path", "/metrics", "Address to listen on for web endpoints.")
flag.StringVar(&cfg.logLevel, "log.level", "debug", "The log level to use [ \"error\", \"warn\", \"info\", \"debug\" ].")
flag.BoolVar(&cfg.readOnly, "read.only", false, "Read-only mode. Don't write to database.")

flag.IntVar(&cfg.haGroupLockId, "ha.group-advisory-lock-id", 0, "Unique advisory lock id per adapter high-availability group. Set it if you want to use leader election implementation based on PostgreSQL advisory lock")
flag.BoolVar(&cfg.restElection, "rest.election", false, "Enable REST interface for the leader election")
Copy link
Member

@erimatnor erimatnor Sep 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would group this parameter with the above one under a common prefix, e.g., leader-election.http and leader-election.advisory-lock or something. Then it is clear that these are parameter related to similar functionality.

Also, just wondering if it should be possible to have both enabled simultaneously? If not, I could imagine having one string parameter that allows leader-election=http, leader-election=lock, leader-election=off or similar.

main.go Outdated
err := w.Write(samples)
shouldWrite := true
if election != nil {
isLeader, err := election.IsLeader()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was actually confusing to me to see both, since I first tried to figure out the difference between the two variables, given their names. But this might be a personal thing.

main.go Outdated
log.Error("msg", "Error occurred while trying to become a leader", "err", err)
return err
}
shouldWrite = isLeader
Copy link
Member

@erimatnor erimatnor Sep 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, seems strange that this is set in both if and else case instead of outside.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've eliminated isLeader as it seem that it doesn't make code more readable :)

util/election.go Outdated
return r.leader, nil
}
r.leader = true
log.Info("msg", "Instance became a leader")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I would wrap the implementations of this interface in a higher-level object that does the logging for any implementation. Alternatively, just log where called. Then the logging is consistent across implementations.

For instance, you could have an Elector type that calls into an Election implementation and does the logging depending on return values from the specific implementation. So, if BecomeLeader() returns true, then the Elector logs "Instance became a leader"

util/lock.go Outdated
connCheckInterval = time.Millisecond * 250
)

// PgAdvisoryLock is implementation of leader election protocol based on PostgreSQL advisory locks. All adapters withing a HA group are trying
Copy link
Member

@erimatnor erimatnor Sep 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the point is that lock services like Zookeeper and Consul both implement a distributed leader election protocol (such as Raft or Paxos) among their respective groups/ensembles. That's what is typically meant by a "leader election protocol", specifically. I guess what trips me up a bit is the use of "protocol". Just removing "protocol" would lessen the confusion I think, e.g.,. PgAdvisoryLock is an implementation of leader election using PostgreSQL advisory locks.

util/election.go Outdated
log.Debug("msg", "Instance is already a leader")
return r.leader, nil
}
r.leader = true
Copy link
Member

@erimatnor erimatnor Sep 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems weird. In the write path, you call BecomeLeader, which for this implementation will always succeed if the instance is not the leader. I would have expected that when and how to become a leader is implementation specific and not exposed/triggered on the write path. (See note also on write path code).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. Removed election. BecomeLeader() from write path.

main.go Outdated
if isLeader {
shouldWrite = isLeader
} else {
isLeader, err := election.BecomeLeader()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems strange to me that "election" happens on every write in case you are not the leader. When and how to become a leader seems very implementation specific and, IMO, shouldn't be happening here. This seems wrong w.r.t., e.g., rest-based election.

I would expect the write path to simply check election.IsLeader(). A particular implementation (i.e.,. the advisory lock one) can try to become leader in the implementation of that check if it makes sense for that implementation.

So, IMO it doesn't make sense for, e.g., a REST- or Zookeeper-implementation of leader election to try and become leader here. For most implementations that happens in a side channel based on other events.

Maybe it even makes sense to split the Election interface into a read and write interface, thus only exposing the read interface here (i.e., only the IsLeader() function)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. The reason I had it here in the first place was once I implemented PgAdvisoryLock I wanted to try to obtain the lock as soon as possible (when write req happens), as opposite of having some scheduled routine trying to obtain the lock on some fixed time window. I guess that in the context of other implementations, eg. REST, that doesn't make much sense. I will perform election.IsLeader() check only and hide PgAdvisoryLock specifics.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you can do leader checks in the health-check go-routine? That makes sense to me since you are already polling there.

util/election.go Outdated
return
}
fmt.Fprintf(response, "%v", leader)
case http.MethodPost:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a bit weird to POST something without content. Would have expected a PUT (POST typically to create a resource) where the content is the same type as what you return in the GET.

So, you, e.g., PUT a value of 1 to /admin/election/leader to become leader and a 0 to resign. A GET simply returns the current value. No resign path needed.

Otherwise it is not really a "RESTful" API.

util/election.go Outdated
defer r.mutex.Unlock()
if !r.leader {
log.Debug("msg", "Can't resign when not a leader")
return nil

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this path return some information to the client so it knows it's telling a non-leader to resign?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've increased log level to warn so the one inspecting the logs should notice a problem. I am not sure if we should return an error because from our perspective the system is in proper state, however I am not strongly opinionated here...

util/lock.go Outdated
}
case <-l.quit:
l.mutex.Lock()
l.closed = true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When does this release the advisory lock if we're the leader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock is being released on Close(). This method is called internally

main.go Outdated
err := w.Write(samples)
shouldWrite := true
if election != nil {
isLeader, err := election.IsLeader()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm with Erik, I having both is more confusing.

@niksajakovljevic niksajakovljevic force-pushed the ha-leader-election branch 2 times, most recently from fec05d3 to 194f4c0 Compare September 13, 2018 09:27
util/lock.go Outdated
if l.obtained {
return l.obtained, nil
}
err := checkConnection(l.conn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is not needed since obtained is false and l.conn doesn't have a lock. It should be enough to just grab any connection from the pool.

Copy link
Member

@erimatnor erimatnor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor nit.

func (e *Elector) scheduledElection() {
for {
select {
case <-e.ticker.C:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should there be an exit case for this loop?

In HA setup two or more Prometheus instances scrape the same data and send it to adapters.
We want to prevent multiple adapters to write the same/similar data (no consistency guarantees in HA Prometheus setup) in parallel. The idea is to use leader election and allow only one adapter (leader) to write to the database. Provided leader election implementation relies on PostgreSQL advisory locks and does not provide strong data guarantess (some duplicates or data loss possible during failover - btw Prometheus HA doesn't provide any guarantees that multiple Prometheus nodes would see the same data). REST interface for leader election should enable plugging in any external leader election system you might use already (eg. Zookeeper)
@niksajakovljevic niksajakovljevic merged commit e27a36a into timescale:master Sep 14, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants