From f49d821c185d111c37c32cb6e155ffdacdca7512 Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Wed, 9 Apr 2025 16:26:27 +0300 Subject: [PATCH] tokio-postgres: use tokio mpsc for `CopyBoth` streams The tokio implementation of mpsc channels seems to be more sophisticated and performant than the one in futures_channel. Since the channels used by the CopyBoth streams transfer a large amount of data during replication this PR switches the implementation to tokio. This PR doesn't yet alter the size of the channels in order to do an apples to apples comparison. A subsequent test should be performed with larger buffers between the producer and consumer tasks. Signed-off-by: Petros Angelatos --- tokio-postgres/src/client.rs | 8 +++--- tokio-postgres/src/copy_both.rs | 45 +++++++++++++++------------------ 2 files changed, 25 insertions(+), 28 deletions(-) diff --git a/tokio-postgres/src/client.rs b/tokio-postgres/src/client.rs index eab65d30a..2d70734a3 100644 --- a/tokio-postgres/src/client.rs +++ b/tokio-postgres/src/client.rs @@ -44,8 +44,8 @@ pub struct Responses { } pub struct CopyBothHandles { - pub(crate) stream_receiver: mpsc::Receiver>, - pub(crate) sink_sender: mpsc::Sender, + pub(crate) stream_receiver: tokio::sync::mpsc::Receiver>, + pub(crate) sink_sender: tokio::sync::mpsc::Sender, } impl Responses { @@ -124,8 +124,8 @@ impl InnerClient { pub fn start_copy_both(&self) -> Result { let (sender, receiver) = mpsc::channel(16); - let (stream_sender, stream_receiver) = mpsc::channel(16); - let (sink_sender, sink_receiver) = mpsc::channel(16); + let (stream_sender, stream_receiver) = tokio::sync::mpsc::channel(16); + let (sink_sender, sink_receiver) = tokio::sync::mpsc::channel(16); let responses = Responses { receiver, diff --git a/tokio-postgres/src/copy_both.rs b/tokio-postgres/src/copy_both.rs index e8c2103b0..ee110bfe0 100644 --- a/tokio-postgres/src/copy_both.rs +++ b/tokio-postgres/src/copy_both.rs @@ -2,7 +2,6 @@ use crate::client::{InnerClient, Responses}; use crate::codec::FrontendMessage; use crate::{simple_query, Error}; use bytes::{Buf, BufMut, Bytes, BytesMut}; -use futures_channel::mpsc; use futures_util::{ready, Sink, SinkExt, Stream, StreamExt}; use log::debug; use pin_project_lite::pin_project; @@ -12,6 +11,8 @@ use postgres_protocol::message::frontend::CopyData; use std::marker::{PhantomData, PhantomPinned}; use std::pin::Pin; use std::task::{Context, Poll}; +use tokio::sync::mpsc; +use tokio_util::sync::PollSender; /// The state machine of CopyBothReceiver /// @@ -70,7 +71,7 @@ pub struct CopyBothReceiver { /// Receiver of frontend messages sent by the user using sink_receiver: mpsc::Receiver, /// Sender of CopyData contents to be consumed by the user using - stream_sender: mpsc::Sender>, + stream_sender: PollSender>, /// The current state of the subprotocol state: CopyBothState, /// Holds a buffered message until we are ready to send it to the user's stream @@ -86,7 +87,7 @@ impl CopyBothReceiver { CopyBothReceiver { responses, sink_receiver, - stream_sender, + stream_sender: PollSender::new(stream_sender), state: CopyBothState::Setup, buffered_message: None, } @@ -108,10 +109,10 @@ impl CopyBothReceiver { // Deliver the buffered message (if any) to the user to ensure we can potentially // buffer a new one in response to a server message if let Some(message) = self.buffered_message.take() { - match self.stream_sender.poll_ready(cx) { + match self.stream_sender.poll_ready_unpin(cx) { Poll::Ready(_) => { // If the receiver has hung up we'll just drop the message - let _ = self.stream_sender.start_send(message); + let _ = self.stream_sender.start_send_unpin(message); } Poll::Pending => { // Stash the message and try again later @@ -147,7 +148,7 @@ impl CopyBothReceiver { match self.state { CopyNone => self.state = CopyComplete, CopyComplete => { - self.stream_sender.close_channel(); + self.stream_sender.close(); self.sink_receiver.close(); self.state = CommandComplete; } @@ -168,7 +169,7 @@ impl CopyBothReceiver { Some(Ok(Message::ReadyForQuery(_))) => match self.state { CommandComplete => { self.sink_receiver.close(); - self.stream_sender.close_channel(); + self.stream_sender.close(); } _ => self.unexpected_message(), }, @@ -190,7 +191,7 @@ impl Stream for CopyBothReceiver { match self.poll_backend(cx) { Poll::Ready(()) => Poll::Ready(None), Poll::Pending => match self.state { - Setup | CopyBoth | CopyIn => match ready!(self.sink_receiver.poll_next_unpin(cx)) { + Setup | CopyBoth | CopyIn => match ready!(self.sink_receiver.poll_recv(cx)) { Some(msg) => Poll::Ready(Some(msg)), None => match self.state { // The user has cancelled their interest to this CopyBoth query but we're @@ -252,9 +253,7 @@ pin_project! { /// } /// ``` pub struct CopyBothDuplex { - #[pin] - sink_sender: mpsc::Sender, - #[pin] + sink_sender: PollSender, stream_receiver: mpsc::Receiver>, buf: BytesMut, #[pin] @@ -267,7 +266,7 @@ impl Stream for CopyBothDuplex { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Poll::Ready(match ready!(self.project().stream_receiver.poll_next(cx)) { + Poll::Ready(match ready!(self.project().stream_receiver.poll_recv(cx)) { Some(Ok(Message::CopyData(body))) => Some(Ok(body.into_bytes())), Some(Ok(_)) => Some(Err(Error::unexpected_message())), Some(Err(err)) => Some(Err(err)), @@ -285,7 +284,7 @@ where fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { self.project() .sink_sender - .poll_ready(cx) + .poll_ready_unpin(cx) .map_err(|_| Error::closed()) } @@ -309,30 +308,28 @@ where let data = CopyData::new(data).map_err(Error::encode)?; this.sink_sender - .start_send(FrontendMessage::CopyData(data)) + .start_send_unpin(FrontendMessage::CopyData(data)) .map_err(|_| Error::closed()) } fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); + let this = self.project(); if !this.buf.is_empty() { - ready!(this.sink_sender.as_mut().poll_ready(cx)).map_err(|_| Error::closed())?; + ready!(this.sink_sender.poll_ready_unpin(cx)).map_err(|_| Error::closed())?; let data: Box = Box::new(this.buf.split().freeze()); let data = CopyData::new(data).map_err(Error::encode)?; this.sink_sender - .as_mut() - .start_send(FrontendMessage::CopyData(data)) + .start_send_unpin(FrontendMessage::CopyData(data)) .map_err(|_| Error::closed())?; } - - this.sink_sender.poll_flush(cx).map_err(|_| Error::closed()) + Poll::Ready(Ok(())) } fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { ready!(self.as_mut().poll_flush(cx))?; - let mut this = self.as_mut().project(); - this.sink_sender.disconnect(); + let this = self.as_mut().project(); + this.sink_sender.close(); Poll::Ready(Ok(())) } } @@ -356,14 +353,14 @@ where .await .map_err(|_| Error::closed())?; - match handles.stream_receiver.next().await.transpose()? { + match handles.stream_receiver.recv().await.transpose()? { Some(Message::CopyBothResponse(_)) => {} _ => return Err(Error::unexpected_message()), } Ok(CopyBothDuplex { stream_receiver: handles.stream_receiver, - sink_sender: handles.sink_sender, + sink_sender: PollSender::new(handles.sink_sender), buf: BytesMut::new(), _p: PhantomPinned, _p2: PhantomData,