Skip to content

Streaming replication protocol #116

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
jeff-davis opened this issue Apr 24, 2015 · 17 comments
Open

Streaming replication protocol #116

jeff-davis opened this issue Apr 24, 2015 · 17 comments

Comments

@jeff-davis
Copy link
Contributor

This idea isn't fully formed yet, but it would be nice if rust-postgres could support the streaming replication protocol: http://www.postgresql.org/docs/devel/static/protocol-replication.html

It doesn't make a lot of sense with the physical replication (though that could be supported, too). But for logical replication it could be quite interesting.

@sfackler
Copy link
Owner

sfackler commented Jan 2, 2016

cc @posix4e

@posix4e
Copy link

posix4e commented Jan 2, 2016

We actually have a project called elephant pump that attempts to provide a bridge to writing logical decoder in rust. We intend on working with sfackler to port our code to his library once we get a bit further

@jmealo
Copy link

jmealo commented Sep 24, 2017

Any updates on a logical decoding/streaming replication client for Rust?

@posix4e
Copy link

posix4e commented Sep 24, 2017

@jmealo besides your work at http://github.com/posix4e/jsoncdc :)

@eadanfahey
Copy link

Any updates on this?
I'm trying to create a logical replication slot, using the wal2json plugin, from Rust:

extern crate postgres;
use postgres::{Connection, TlsMode};

fn main() {
    let url = "postgresql://postgres:postgres@localhost:5432/postgres?replication=database";
    let rep_conn = Connection::connect(url, TlsMode::None).unwrap();
    rep_conn.execute("CREATE_REPLICATION_SLOT test_slot LOGICAL wal2json EXPORT_SNAPSHOT", &[]).unwrap();
}

This produces an error:

{ severity: "ERROR", parsed_severity: Some(Error), code: SqlState("08P01"), message: "extended query protocol not supported in a replication connection", detail: None, hint: None, position: None, where_: None, schema: None, table: None, column: None, datatype: None, constraint: None, file: Some("postgres.c"), line: Some(4339), routine: Some("forbidden_in_wal_sender") }

@jmealo
Copy link

jmealo commented Aug 20, 2018

@eadanfahey https://www.postgresql.org/docs/current/static/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY

I hope this helps. We need to support the simple query protocol to send the logical replication commands.

You could sniff the protocol from pg_recvlogical or checkout Lapidus to see what's required.

@ruslantalpa
Copy link
Contributor

@sfackler is there still interest in this?
From my experiments, it looks like the state is this:
First config.rs needs support for the replication parameter (easy)

When this works, PG can enter into "replication" mode after the connection.
After this, the queries need to be sent using simple_query so that they are sent directly and not as "prepared statements" which triggers the error @eadanfahey posted.

Once the START_REPLICATION statement is sent, pg starts streaming back the "events".
Now the question becomes how does one capture those events. I tried looking at poll_message function from connection.rs but it looks like it's returning only the Async messages while the replication events come back to this part of the code as BackendMessage::Normal with request_complete=false and the code assumes (from my understanding) that this is a reply to a previously issued query and it starts looking for a sender of the query to send the reply back by doing self.responses.pop_front() and then later response.sender.start_send(messages) (at a lower level in psotgres-protocol they look like Some(CopyData(CopyDataBody { storage: b"..." })))

I could make a PR with the config.rs part but i am not sure how to handle making those replication messages available to the lib user. My rough thinking is to make a new addition to pub enum Message, similar to NoticeResponse(NoticeResponseBody), and then in the poll_read function, somehow detect (based on connection parameters, request_complete and that there is no sender expecting a repsonse) that this is a replication frame and return it as an Async message? Or should this be handled at a lower level in backend.rs or something in between (that part that turns CopyData message into BackendMessage::Normal, donno where that is yet )

thank you

@sfackler
Copy link
Owner

sfackler commented Sep 4, 2020

Definitely interested! Other than adding the replication parameter, I think what we'd need to do is just add a copy_both method for use with START_REPLICATION. I'd imagine it'd have some kind of sink + stream interface with some extra methods to terminate the copy.

@ruslantalpa
Copy link
Contributor

@sfackler sorry, i am not that familiar with the with the layout of your lib/or pg protocol (i just cloned your repo, did a bunch of trace! to try to get the replication started)
What do you mean by copy_both method (is this a rust thing?) I was thinking of modeling the interface similar to how notifications work.

Just point me in the right direction and if i can i'll try and make this happen.

@sfackler
Copy link
Owner

sfackler commented Sep 4, 2020

START_REPLICATION switches the connection into bidirectional copy mode, sort of like how COPY ... TO STDOUT and COPY ... TO STDIN switch the connection into unidirectional copy mode. There are specific methods to handle those modes: https://docs.rs/tokio-postgres/0.5.5/tokio_postgres/struct.Client.html#method.copy_in. Support for replication would involve adding a similar copy_both method.

@jeff-davis
Copy link
Contributor Author

I have made a lot of progress on my PR #696. Right now it's failing tests, but the biggest reason is because the image doesn't support logical replication.

Can I get some high-level feedback on the PR? I'd like to know whether (a) it's something you want, and there is just more work to do and you don't have the bandwidth to give it a review right now; or (b) you don't really think #696 is the right approach.

@jakajancar
Copy link

Re (a), just as a user, I would love this.

Right now I use pg_create_logical_replication_slot() and poll pg_logical_slot_get_changes() which is messy and high latency.

@honne23
Copy link

honne23 commented Jan 10, 2021

I have made a lot of progress on my PR #696. Right now it's failing tests, but the biggest reason is because the image doesn't support logical replication.

Can I get some high-level feedback on the PR? I'd like to know whether (a) it's something you want, and there is just more work to do and you don't have the bandwidth to give it a review right now; or (b) you don't really think #696 is the right approach.

Would love to contribute as the use-case is coming up quite frequently; might be worth integrating with https://www.datadoghq.com/blog/engineering/introducing-glommio/ for each wal-sender which is a rebuild of Seastar C++.

Could be super speedy

@honne23
Copy link

honne23 commented Jan 10, 2021

even perhaps https://github.com/tokio-rs/io-uring

@videni
Copy link

videni commented Jun 10, 2021

what's the latest status?

@Venryx
Copy link

Venryx commented Jan 20, 2022

I hate to be bothering the maintainers (in case they just lack the time), but the last comment was over six months ago, and it might be helpful to know what additional steps need to be taken to get the pull-request over the finish line.

Also @jakajancar, you mention:

Right now I use pg_create_logical_replication_slot() and poll pg_logical_slot_get_changes() which is messy and high latency.

To do this, are you using the copy-both branch of petrosagg's fork? Or are you accomplishing it on the rust-postgres master branch?

And would it be straight-forward to show the key parts of the code you're using to accomplish it? Are you basically just using the commands seen in the copy-both test function?: https://github.com/petrosagg/rust-postgres/blob/df8b5986088d29f29a3afb71eec5877171278387/tokio-postgres/tests/test/copy_both.rs#L72

EDIT: Not sure how relevant it is, but here is the polling mechanism used by postgraphile's logical-decoding plugin to receive database changes: https://github.com/graphile/graphile-engine/blob/a9fcbafc52244495420faac5da0e05cbceaf5e69/packages/lds/src/pg-logical-decoding.ts#L168

@Venryx
Copy link

Venryx commented Jan 21, 2022

Good news! After a few hours of having my first "battle with the borrow-checker" (annoying and exhilarating at the same time 😅), I managed to get the copy-both branch of petrosagg's fork to stream the replication-log changes to my Rust server. (and using streams, rather than the pg_logical_slot_get_changes() function that @jakajancar mentioned wanting to avoid)

For others wanting to see how it can be done, here is the working code: https://github.com/debate-map/app/blob/afc6467b6c6c961f7bcc7b7f901f0ff5cd79d440/Packages/app-server-rs/src/pgclient.rs

(And here is the Cargo.toml file that tells cargo where to find the copy-both branch/fork mentioned earlier.)

The pgclient.rs module connects to the postgres database, starts the replication-log stream (logging data-changes to the terminal; note that the code tells postgres to use the wal2json plugin [if present] to format the data as json, so it's suggested to have that installed for easier data-parsing), and responds to the "heartbeat checks" that postgres sends out (if you don't respond to these, postgres terminates the connection within about a minute). My code for responding to these checks is not complete (of the various int64 fields, it only fills in the client system's time), but so far it hasn't seemed to cause any problems.

Update: If you leave those bytes (mentioned just above) blank/zeroed, the server will keep holding all of the wal segments on disk while pgclient.rs is running; this can cause the pg_wal folder to grow to a huge size (if your client stays alive for a long time), which can cause postgres to crash due to all disk-space becoming used (as just happened for me); the solution is to track the wal-position each time you receive a wal update, and send that in your heartbeat response, so postgres knows those wal segments have been processed by all replication slots, thus enabling it to delete those old wal-segments' files.

As for parsing the JSON text present in the XLogData/data-change events, I haven't added that yet -- though I've confirmed from the logs that the data is there. I'll be adding that parsing soon, using the serde library or something similar.

EDIT: See also this comment here for a code change that is recommended, for large databases.

EDIT: See also this file that is part of a smaller demo repo of logical decoding in Rust.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

10 participants