Skip to content

Commit fea178c

Browse files
committed
add simple query versions of copy operations
Signed-off-by: Petros Angelatos <[email protected]>
1 parent 7c5b43f commit fea178c

File tree

3 files changed

+60
-16
lines changed

3 files changed

+60
-16
lines changed

tokio-postgres/src/client.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -424,6 +424,14 @@ impl Client {
424424
copy_in::copy_in(self.inner(), statement).await
425425
}
426426

427+
/// Executes a `COPY FROM STDIN` query, returning a sink used to write the copy data.
428+
pub async fn copy_in_simple<U>(&self, query: &str) -> Result<CopyInSink<U>, Error>
429+
where
430+
U: Buf + 'static + Send,
431+
{
432+
copy_in::copy_in_simple(self.inner(), query).await
433+
}
434+
427435
/// Executes a `COPY TO STDOUT` statement, returning a stream of the resulting data.
428436
///
429437
/// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
@@ -439,6 +447,11 @@ impl Client {
439447
copy_out::copy_out(self.inner(), statement).await
440448
}
441449

450+
/// Executes a `COPY TO STDOUT` query, returning a stream of the resulting data.
451+
pub async fn copy_out_simple(&self, query: &str) -> Result<CopyOutStream, Error> {
452+
copy_out::copy_out_simple(self.inner(), query).await
453+
}
454+
442455
/// Executes a CopyBoth query, returning a combined Stream+Sink type to read and write copy
443456
/// data.
444457
pub async fn copy_both_simple<T>(&self, query: &str) -> Result<CopyBothDuplex<T>, Error>

tokio-postgres/src/copy_in.rs

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use crate::client::{InnerClient, Responses};
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
4-
use crate::{query, slice_iter, Error, Statement};
5-
use bytes::{Buf, BufMut, BytesMut};
4+
use crate::{query, simple_query, slice_iter, Error, Statement};
5+
use bytes::{Buf, BufMut, Bytes, BytesMut};
66
use futures::channel::mpsc;
77
use futures::future;
88
use futures::{ready, Sink, SinkExt, Stream, StreamExt};
@@ -195,14 +195,10 @@ where
195195
}
196196
}
197197

198-
pub async fn copy_in<T>(client: &InnerClient, statement: Statement) -> Result<CopyInSink<T>, Error>
198+
async fn start<T>(client: &InnerClient, buf: Bytes, simple: bool) -> Result<CopyInSink<T>, Error>
199199
where
200200
T: Buf + 'static + Send,
201201
{
202-
debug!("executing copy in statement {}", statement.name());
203-
204-
let buf = query::encode(client, &statement, slice_iter(&[]))?;
205-
206202
let (mut sender, receiver) = mpsc::channel(1);
207203
let receiver = CopyInReceiver::new(receiver);
208204
let mut responses = client.send(RequestMessages::CopyIn(receiver))?;
@@ -212,9 +208,11 @@ where
212208
.await
213209
.map_err(|_| Error::closed())?;
214210

215-
match responses.next().await? {
216-
Message::BindComplete => {}
217-
_ => return Err(Error::unexpected_message()),
211+
if !simple {
212+
match responses.next().await? {
213+
Message::BindComplete => {}
214+
_ => return Err(Error::unexpected_message()),
215+
}
218216
}
219217

220218
match responses.next().await? {
@@ -231,3 +229,23 @@ where
231229
_p2: PhantomData,
232230
})
233231
}
232+
233+
pub async fn copy_in<T>(client: &InnerClient, statement: Statement) -> Result<CopyInSink<T>, Error>
234+
where
235+
T: Buf + 'static + Send,
236+
{
237+
debug!("executing copy in statement {}", statement.name());
238+
239+
let buf = query::encode(client, &statement, slice_iter(&[]))?;
240+
start(client, buf, false).await
241+
}
242+
243+
pub async fn copy_in_simple<T>(client: &InnerClient, query: &str) -> Result<CopyInSink<T>, Error>
244+
where
245+
T: Buf + 'static + Send,
246+
{
247+
debug!("executing copy in query {}", query);
248+
249+
let buf = simple_query::encode(client, query)?;
250+
start(client, buf, true).await
251+
}

tokio-postgres/src/copy_out.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::client::{InnerClient, Responses};
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
4-
use crate::{query, slice_iter, Error, Statement};
4+
use crate::{query, simple_query, slice_iter, Error, Statement};
55
use bytes::Bytes;
66
use futures::{ready, Stream};
77
use log::debug;
@@ -11,23 +11,36 @@ use std::marker::PhantomPinned;
1111
use std::pin::Pin;
1212
use std::task::{Context, Poll};
1313

14+
pub async fn copy_out_simple(client: &InnerClient, query: &str) -> Result<CopyOutStream, Error> {
15+
debug!("executing copy out query {}", query);
16+
17+
let buf = simple_query::encode(client, query)?;
18+
let responses = start(client, buf, true).await?;
19+
Ok(CopyOutStream {
20+
responses,
21+
_p: PhantomPinned,
22+
})
23+
}
24+
1425
pub async fn copy_out(client: &InnerClient, statement: Statement) -> Result<CopyOutStream, Error> {
1526
debug!("executing copy out statement {}", statement.name());
1627

1728
let buf = query::encode(client, &statement, slice_iter(&[]))?;
18-
let responses = start(client, buf).await?;
29+
let responses = start(client, buf, false).await?;
1930
Ok(CopyOutStream {
2031
responses,
2132
_p: PhantomPinned,
2233
})
2334
}
2435

25-
async fn start(client: &InnerClient, buf: Bytes) -> Result<Responses, Error> {
36+
async fn start(client: &InnerClient, buf: Bytes, simple: bool) -> Result<Responses, Error> {
2637
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
2738

28-
match responses.next().await? {
29-
Message::BindComplete => {}
30-
_ => return Err(Error::unexpected_message()),
39+
if !simple {
40+
match responses.next().await? {
41+
Message::BindComplete => {}
42+
_ => return Err(Error::unexpected_message()),
43+
}
3144
}
3245

3346
match responses.next().await? {

0 commit comments

Comments
 (0)