Skip to content

Commit f5e018f

Browse files
committed
add copy_both_simple method
Signed-off-by: Petros Angelatos <[email protected]>
1 parent 8423d6d commit f5e018f

File tree

7 files changed

+534
-2
lines changed

7 files changed

+534
-2
lines changed

postgres-protocol/src/message/backend.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub const DATA_ROW_TAG: u8 = b'D';
2222
pub const ERROR_RESPONSE_TAG: u8 = b'E';
2323
pub const COPY_IN_RESPONSE_TAG: u8 = b'G';
2424
pub const COPY_OUT_RESPONSE_TAG: u8 = b'H';
25+
pub const COPY_BOTH_RESPONSE_TAG: u8 = b'W';
2526
pub const EMPTY_QUERY_RESPONSE_TAG: u8 = b'I';
2627
pub const BACKEND_KEY_DATA_TAG: u8 = b'K';
2728
pub const NO_DATA_TAG: u8 = b'n';
@@ -93,6 +94,7 @@ pub enum Message {
9394
CopyDone,
9495
CopyInResponse(CopyInResponseBody),
9596
CopyOutResponse(CopyOutResponseBody),
97+
CopyBothResponse(CopyBothResponseBody),
9698
DataRow(DataRowBody),
9799
EmptyQueryResponse,
98100
ErrorResponse(ErrorResponseBody),
@@ -190,6 +192,16 @@ impl Message {
190192
storage,
191193
})
192194
}
195+
COPY_BOTH_RESPONSE_TAG => {
196+
let format = buf.read_u8()?;
197+
let len = buf.read_u16::<BigEndian>()?;
198+
let storage = buf.read_all();
199+
Message::CopyBothResponse(CopyBothResponseBody {
200+
format,
201+
len,
202+
storage,
203+
})
204+
}
193205
EMPTY_QUERY_RESPONSE_TAG => Message::EmptyQueryResponse,
194206
BACKEND_KEY_DATA_TAG => {
195207
let process_id = buf.read_i32::<BigEndian>()?;
@@ -524,6 +536,27 @@ impl CopyOutResponseBody {
524536
}
525537
}
526538

539+
pub struct CopyBothResponseBody {
540+
format: u8,
541+
len: u16,
542+
storage: Bytes,
543+
}
544+
545+
impl CopyBothResponseBody {
546+
#[inline]
547+
pub fn format(&self) -> u8 {
548+
self.format
549+
}
550+
551+
#[inline]
552+
pub fn column_formats(&self) -> ColumnFormats<'_> {
553+
ColumnFormats {
554+
remaining: self.len,
555+
buf: &self.storage,
556+
}
557+
}
558+
}
559+
527560
pub struct DataRowBody {
528561
storage: Bytes,
529562
len: u16,

tokio-postgres/src/client.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::codec::BackendMessages;
22
use crate::config::{Host, SslMode};
33
use crate::connection::{Request, RequestMessages};
4+
use crate::copy_both::CopyBothDuplex;
45
use crate::copy_out::CopyOutStream;
56
use crate::query::RowStream;
67
use crate::simple_query::SimpleQueryStream;
@@ -11,8 +12,9 @@ use crate::types::{Oid, ToSql, Type};
1112
#[cfg(feature = "runtime")]
1213
use crate::Socket;
1314
use crate::{
14-
copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken, CopyInSink, Error,
15-
Row, SimpleQueryMessage, Statement, ToStatement, Transaction, TransactionBuilder,
15+
copy_both, copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken,
16+
CopyInSink, Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction,
17+
TransactionBuilder,
1618
};
1719
use bytes::{Buf, BytesMut};
1820
use fallible_iterator::FallibleIterator;
@@ -461,6 +463,15 @@ impl Client {
461463
copy_out::copy_out(self.inner(), statement).await
462464
}
463465

466+
/// Executes a CopyBoth query, returning a combined Stream+Sink type to read and write copy
467+
/// data.
468+
pub async fn copy_both_simple<T>(&self, query: &str) -> Result<CopyBothDuplex<T>, Error>
469+
where
470+
T: Buf + 'static + Send,
471+
{
472+
copy_both::copy_both_simple(self.inner(), query).await
473+
}
474+
464475
/// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
465476
///
466477
/// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that

tokio-postgres/src/connection.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec};
2+
use crate::copy_both::CopyBothReceiver;
23
use crate::copy_in::CopyInReceiver;
34
use crate::error::DbError;
45
use crate::maybe_tls_stream::MaybeTlsStream;
@@ -21,6 +22,7 @@ use tokio_util::codec::Framed;
2122
pub enum RequestMessages {
2223
Single(FrontendMessage),
2324
CopyIn(CopyInReceiver),
25+
CopyBoth(CopyBothReceiver),
2426
}
2527

2628
pub struct Request {
@@ -259,6 +261,24 @@ where
259261
.map_err(Error::io)?;
260262
self.pending_request = Some(RequestMessages::CopyIn(receiver));
261263
}
264+
RequestMessages::CopyBoth(mut receiver) => {
265+
let message = match receiver.poll_next_unpin(cx) {
266+
Poll::Ready(Some(message)) => message,
267+
Poll::Ready(None) => {
268+
trace!("poll_write: finished copy_both request");
269+
continue;
270+
}
271+
Poll::Pending => {
272+
trace!("poll_write: waiting on copy_both stream");
273+
self.pending_request = Some(RequestMessages::CopyBoth(receiver));
274+
return Ok(true);
275+
}
276+
};
277+
Pin::new(&mut self.stream)
278+
.start_send(message)
279+
.map_err(Error::io)?;
280+
self.pending_request = Some(RequestMessages::CopyBoth(receiver));
281+
}
262282
}
263283
}
264284
}

0 commit comments

Comments
 (0)