Skip to content

Replication support (#116) #696

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 24 commits into
base: master
Choose a base branch
from

Conversation

jeff-davis
Copy link
Contributor

@jeff-davis jeff-davis commented Nov 15, 2020

Improve on #652.

  • Separate type ReplicationClient which can't issue ordinary queries.
  • start_physical_replication and start_logical_replication consume the replication client so that you can't issue new commands once streaming has begun

@jeff-davis jeff-davis force-pushed the replication branch 2 times, most recently from 9eb3092 to d9166ed Compare November 21, 2020 21:42
@jeff-davis jeff-davis changed the title WIP replication support Replication support (#116) Nov 21, 2020
@jeff-davis jeff-davis force-pushed the replication branch 2 times, most recently from bc89955 to 7d5a0d9 Compare November 28, 2020 23:05
@jeff-davis jeff-davis force-pushed the replication branch 4 times, most recently from 1647aa2 to ba06fd6 Compare November 30, 2020 20:39
@jeff-davis
Copy link
Contributor Author

jeff-davis commented Jan 28, 2021 via email

@petrosagg
Copy link
Contributor

Hey @jeff-davis! Thanks for all this great work!

Over at materialize we're working on integrating with postgres replication directly and I've starting working on your PR. I've cleaned up all fixups in your commit log, rebased on top of master, added support for running simple queries over the replication protocol and added support for decoding the output of the pgoutput replication plugin.

If you're ok with it I can open a new PR and take over the upstreaming effort, there are a few more things to be done like adding support for replication in the sync client, tests, etc that I'm going to get to in the next couple of weeks.

You can see the current state of the work here: master...petrosagg:replication

Let me know what you think!

@jeff-davis
Copy link
Contributor Author

Hey @jeff-davis! Thanks for all this great work!

Hi!

Over at materialize we're working on integrating with postgres replication directly and I've starting working on your PR. I've cleaned up all fixups in your commit log, rebased on top of master, added support for running simple queries over the replication protocol and added support for decoding the output of the pgoutput replication plugin.

Sounds interesting.

If you're ok with it I can open a new PR and take over the upstreaming effort, there are a few more things to be done like adding support for replication in the sync client, tests, etc that I'm going to get to in the next couple of weeks.

If you have the cycles to do so that would be much appreciated.

You can see the current state of the work here: master...petrosagg:replication

Let me know what you think!

I think you broke logical decoding, e.g. wal2json. This part is a bit confusing in PG-land, so let me try to explain:

There are two types of replication connections: physical replication and logical replication. A physical replication connection is for physical replication, but a logical replication connection may be for either logical replication or logical decoding. "Logical replication" is a special case of logical decoding that uses the logical replication messages in the payload. But if you are doing some other kind of logical decoding, e.g. wal2json, then it uses an arbitrary payload (e.g. json) and doesn't use the logical replication messages at all.

Also, the arbitrary simple queries are only allowed over a logical replication connection. It's possible we may want to use the type system to enforce this, or we could just let the backend throw an error if the client tries to send an arbitrary query over a physical replication connection.

@petrosagg
Copy link
Contributor

If you have the cycles to do so that would be much appreciated.

Awesome! I'll open a new PR soon.

I think you broke logical decoding, e.g. wal2json. This part is a bit confusing in PG-land, so let me try to explain:

You're right, this was just me being lazy for a PoC I wanted to build with pgoutput. I was meaning to separate the plugin implementation out under a trait so that this project can support a couple official ones and let users define their own decoding implementations. I just added that to my branch now so wal2json is usable again :) See commits petrosagg@d46ada0 and petrosagg@48acadb

but a logical replication connection may be for either logical replication or logical decoding.

I don't think that's quite right. Logical replication is also doing logical decoding, it's just that the plugin used (pgoutput) is shipped with postgres itelf. Here is the relevant documentation (quote: The walsender process starts logical decoding (described in Chapter 48) of the WAL and loads the standard logical decoding plugin (pgoutput))

Also, the arbitrary simple queries are only allowed over a logical replication connection.

Since there is only one type of connection for any decoding I think you can issue queries regardless. I haven't tested this though so if there is a relevant part of the docs please send it over!

@jeff-davis
Copy link
Contributor Author

Awesome! I'll open a new PR soon.

Great!

You're right, this was just me being lazy for a PoC I wanted to build with pgoutput. I was meaning to separate the plugin implementation out under a trait so that this project can support a couple official ones and let users define their own decoding implementations. I just added that to my branch now so wal2json is usable again :) See commits petrosagg@d46ada0 and petrosagg@48acadb

That looks like a good direction. FWIW I would focus on getting the basic replication stuff in (with a payload of bytes), but I like where you are taking it. It would be good if @sfackler could comment on the overall approach and how he'd like to see replication support land. rust-postgres is a widely-used crate and I'm guessing that he wants to be careful about breaking changes in the future.

I don't think that's quite right. Logical replication is also doing logical decoding, it's just that the plugin used (pgoutput) is shipped with postgres itelf.

Yes. I was trying to get that across, but I think I just added to the confusion. Sorry.

Also, the arbitrary simple queries are only allowed over a logical replication connection.

Since there is only one type of connection for any decoding I think you can issue queries regardless. I haven't tested this though so if there is a relevant part of the docs please send it over!

You can issue queries in logical replication, but not physical. Here's a link to the source:

https://github.com/postgres/postgres/blob/master/src/backend/replication/walsender.c#L1571

Non-replication SQL commands are allowed if it's logical replication/decoding, but it will throw an error if it's a physical replication connection.

Maybe an error there is fine; it doesn't break the protocol.

@videni
Copy link

videni commented Jun 11, 2021

@jeff-davis , what is the lastest status? I also need this to implement the CDC feature for a rust app, instead of using the Debezium suite which is tool complex for our use case.

let mut got_begin = false;
let mut got_insert = false;
let mut got_commit = false;
while let Some(replication_message) = logical_stream.next().await {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeff-davis I've been pouring over this PR the past couple of days trying to understand everything and there's one thing I'm not quite connecting. Forgive me, as I'm somewhat new to Rust. Isn't logical_stream.next() going to wait indefinitely until there is some copy data to consume from the server?

The reason I ask is because, if we look at pg_recvlogical, we don't want to wait indefinitely on copy data because there may be a need to send standby data to postgres (e.g. if we've waited some long amount of time in between receiving copy data) so that postgres doesn't kill our replication connection.

Apologies if I'm missing something obvious here.

Copy link

@Venryx Venryx Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not very familiar with the rust-postgres codebase (nor Rust in general -- have just started using it), but from what I observed while I was integrating the branch by @petrosagg, is that postgres sends out "keepalive" messages every 20 seconds or so.

Interestingly, on this doc page (under "Primary keepalive message (B)"), it shows that the last bit in that message-type's structure tells the client if postgres detects that the client hasn't communicated in a while, and is nearing the point where it will close the connection.

So what I have my code do at the moment, is just wait for a keepalive message with that "timeout imminent" flag, and then immediately send a response to the server saying "I am still connected". Note that postgres is actually pretty lenient; it gives another 10 or 15 seconds before it actually closes the connection, which is enough time for this "reactionary" approach to be reliable (during the several hours I've tested it anyway).

From what I can tell, this "notify postgres we're still online" feature is not present in this pull-request's code -- so I guess that's something @petrosagg is thinking can be handled by higher level modules/crates. (it might be worth having at least a warning about this behavior in the base rust-postgres library however, eg. if it detects a disconnect, and it can see that it was due to the client not sending any responses during the timeout period, then it could log a warning about it)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Venryx unfortunately it is not enough to wait for postgres to send your a keepalive with that bit set in order to send a keepalive, you actually need to proactively be sending them in a timely manner. This is exactly what we do in materialize.

Check out this code https://github.com/MaterializeInc/materialize/blob/main/src/dataflow/src/source/postgres.rs#L303-L329

and this thread https://www.postgresql.org/message-id/CAMsr+YE2dSfHVr7iEv1GSPZihitWX-PMkD9QALEGcTYa+sdsgg@mail.gmail.com

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Venryx unfortunately it is not enough to wait for postgres to send your a keepalive with that bit set in order to send a keepalive, you actually need to proactively be sending them in a timely manner. This is exactly what we do in materialize.

Ah, interesting; thank you for mentioning. (At my project's current scale, I think it will be a long time before there are transactions of a large enough size for queuing to cause problems with the keepalive responder; but it's still a good thing to fix, to not cause issues for other developers [or myself] down the road.)

@guerinoni
Copy link

What is the current status? I'm interested in this for switching from libpq :) and searching something with pgoutput

@azlev
Copy link

azlev commented Jul 22, 2023

Hello. Can we revive this PR? Maybe split in smaller parts to other people like me to get involved?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants