Skip to content
This repository was archived by the owner on Jan 14, 2025. It is now read-only.

Commit 1ada04a

Browse files
skyzhJulius de Bruijn
authored and
Julius de Bruijn
committed
add query_raw_txt for transaction (sfackler#20)
Signed-off-by: Alex Chi <[email protected]>
1 parent ace81c7 commit 1ada04a

File tree

4 files changed

+131
-83
lines changed

4 files changed

+131
-83
lines changed

tokio-postgres/src/client.rs

+4-81
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@ use crate::copy_both::CopyBothDuplex;
55
use crate::copy_out::CopyOutStream;
66
#[cfg(feature = "runtime")]
77
use crate::keepalive::KeepaliveConfig;
8-
use crate::prepare::get_type;
98
use crate::query::RowStream;
109
use crate::simple_query::SimpleQueryStream;
11-
use crate::statement::Column;
1210
#[cfg(feature = "runtime")]
1311
use crate::tls::MakeTlsConnect;
1412
use crate::tls::TlsConnect;
@@ -20,7 +18,7 @@ use crate::{
2018
CopyInSink, Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction,
2119
TransactionBuilder,
2220
};
23-
use bytes::{Buf, BufMut, BytesMut};
21+
use bytes::{Buf, BytesMut};
2422
use fallible_iterator::FallibleIterator;
2523
use futures_channel::mpsc;
2624
use futures_util::{future, pin_mut, ready, StreamExt, TryStreamExt};
@@ -376,86 +374,11 @@ impl Client {
376374
/// to save a roundtrip
377375
pub async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
378376
where
379-
S: AsRef<str>,
377+
S: AsRef<str> + Sync + Send,
380378
I: IntoIterator<Item = Option<S>>,
381-
I::IntoIter: ExactSizeIterator,
379+
I::IntoIter: ExactSizeIterator + Sync + Send,
382380
{
383-
let params = params.into_iter();
384-
let params_len = params.len();
385-
386-
let buf = self.inner.with_buf(|buf| {
387-
// Parse, anonymous portal
388-
frontend::parse("", query.as_ref(), std::iter::empty(), buf).map_err(Error::encode)?;
389-
// Bind, pass params as text, retrieve as binary
390-
match frontend::bind(
391-
"", // empty string selects the unnamed portal
392-
"", // empty string selects the unnamed prepared statement
393-
std::iter::empty(), // all parameters use the default format (text)
394-
params,
395-
|param, buf| match param {
396-
Some(param) => {
397-
buf.put_slice(param.as_ref().as_bytes());
398-
Ok(postgres_protocol::IsNull::No)
399-
}
400-
None => Ok(postgres_protocol::IsNull::Yes),
401-
},
402-
Some(0), // all text
403-
buf,
404-
) {
405-
Ok(()) => Ok(()),
406-
Err(frontend::BindError::Conversion(e)) => Err(Error::to_sql(e, 0)),
407-
Err(frontend::BindError::Serialization(e)) => Err(Error::encode(e)),
408-
}?;
409-
410-
// Describe portal to typecast results
411-
frontend::describe(b'P', "", buf).map_err(Error::encode)?;
412-
// Execute
413-
frontend::execute("", 0, buf).map_err(Error::encode)?;
414-
// Sync
415-
frontend::sync(buf);
416-
417-
Ok(buf.split().freeze())
418-
})?;
419-
420-
let mut responses = self
421-
.inner
422-
.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
423-
424-
// now read the responses
425-
426-
match responses.next().await? {
427-
Message::ParseComplete => {}
428-
_ => return Err(Error::unexpected_message()),
429-
}
430-
match responses.next().await? {
431-
Message::BindComplete => {}
432-
_ => return Err(Error::unexpected_message()),
433-
}
434-
let row_description = match responses.next().await? {
435-
Message::RowDescription(body) => Some(body),
436-
Message::NoData => None,
437-
_ => return Err(Error::unexpected_message()),
438-
};
439-
440-
// construct statement object
441-
442-
let parameters = vec![Type::UNKNOWN; params_len];
443-
444-
let mut columns = vec![];
445-
if let Some(row_description) = row_description {
446-
let mut it = row_description.fields();
447-
while let Some(field) = it.next().map_err(Error::parse)? {
448-
// NB: for some types that function may send a query to the server. At least in
449-
// raw text mode we don't need that info and can skip this.
450-
let type_ = get_type(&self.inner, field.type_oid()).await?;
451-
let column = Column::new(field.name().to_string(), type_, field);
452-
columns.push(column);
453-
}
454-
}
455-
456-
let statement = Statement::new_text(&self.inner, "".to_owned(), parameters, columns);
457-
458-
Ok(RowStream::new(statement, responses))
381+
query::query_txt(&self.inner, query, params).await
459382
}
460383

461384
/// Executes a statement, returning the number of rows modified.

tokio-postgres/src/generic_client.rs

+25
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ pub trait GenericClient: private::Sealed {
5656
I: IntoIterator<Item = P> + Sync + Send,
5757
I::IntoIter: ExactSizeIterator;
5858

59+
/// Like `Client::query_raw_txt`.
60+
async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
61+
where
62+
S: AsRef<str> + Sync + Send,
63+
I: IntoIterator<Item = Option<S>> + Sync + Send,
64+
I::IntoIter: ExactSizeIterator + Sync + Send;
65+
5966
/// Like `Client::prepare`.
6067
async fn prepare(&self, query: &str) -> Result<Statement, Error>;
6168

@@ -136,6 +143,15 @@ impl GenericClient for Client {
136143
self.query_raw(statement, params).await
137144
}
138145

146+
async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
147+
where
148+
S: AsRef<str> + Sync + Send,
149+
I: IntoIterator<Item = Option<S>> + Sync + Send,
150+
I::IntoIter: ExactSizeIterator + Sync + Send,
151+
{
152+
self.query_raw_txt(query, params).await
153+
}
154+
139155
async fn prepare(&self, query: &str) -> Result<Statement, Error> {
140156
self.prepare(query).await
141157
}
@@ -222,6 +238,15 @@ impl GenericClient for Transaction<'_> {
222238
self.query_raw(statement, params).await
223239
}
224240

241+
async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
242+
where
243+
S: AsRef<str> + Sync + Send,
244+
I: IntoIterator<Item = Option<S>> + Sync + Send,
245+
I::IntoIter: ExactSizeIterator + Sync + Send,
246+
{
247+
self.query_raw_txt(query, params).await
248+
}
249+
225250
async fn prepare(&self, query: &str) -> Result<Statement, Error> {
226251
self.prepare(query).await
227252
}

tokio-postgres/src/query.rs

+92-2
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
use crate::client::{InnerClient, Responses};
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
4+
use crate::prepare::get_type;
45
use crate::types::{BorrowToSql, IsNull};
5-
use crate::{Error, Portal, Row, Statement};
6-
use bytes::{Bytes, BytesMut};
6+
use crate::{Column, Error, Portal, Row, Statement};
7+
use bytes::{BufMut, Bytes, BytesMut};
8+
use fallible_iterator::FallibleIterator;
79
use futures_util::{ready, Stream};
810
use log::{debug, log_enabled, Level};
911
use pin_project_lite::pin_project;
1012
use postgres_protocol::message::backend::{CommandCompleteBody, Message};
1113
use postgres_protocol::message::frontend;
14+
use postgres_types::Type;
1215
use std::fmt;
1316
use std::marker::PhantomPinned;
1417
use std::pin::Pin;
18+
use std::sync::Arc;
1519
use std::task::{Context, Poll};
1620

1721
struct BorrowToSqlParamsDebug<'a, T>(&'a [T]);
@@ -58,6 +62,92 @@ where
5862
})
5963
}
6064

65+
pub async fn query_txt<S, I>(
66+
client: &Arc<InnerClient>,
67+
query: S,
68+
params: I,
69+
) -> Result<RowStream, Error>
70+
where
71+
S: AsRef<str> + Sync + Send,
72+
I: IntoIterator<Item = Option<S>>,
73+
I::IntoIter: ExactSizeIterator,
74+
{
75+
let params = params.into_iter();
76+
let params_len = params.len();
77+
78+
let buf = client.with_buf(|buf| {
79+
// Parse, anonymous portal
80+
frontend::parse("", query.as_ref(), std::iter::empty(), buf).map_err(Error::encode)?;
81+
// Bind, pass params as text, retrieve as binary
82+
match frontend::bind(
83+
"", // empty string selects the unnamed portal
84+
"", // empty string selects the unnamed prepared statement
85+
std::iter::empty(), // all parameters use the default format (text)
86+
params,
87+
|param, buf| match param {
88+
Some(param) => {
89+
buf.put_slice(param.as_ref().as_bytes());
90+
Ok(postgres_protocol::IsNull::No)
91+
}
92+
None => Ok(postgres_protocol::IsNull::Yes),
93+
},
94+
Some(0), // all text
95+
buf,
96+
) {
97+
Ok(()) => Ok(()),
98+
Err(frontend::BindError::Conversion(e)) => Err(Error::to_sql(e, 0)),
99+
Err(frontend::BindError::Serialization(e)) => Err(Error::encode(e)),
100+
}?;
101+
102+
// Describe portal to typecast results
103+
frontend::describe(b'P', "", buf).map_err(Error::encode)?;
104+
// Execute
105+
frontend::execute("", 0, buf).map_err(Error::encode)?;
106+
// Sync
107+
frontend::sync(buf);
108+
109+
Ok(buf.split().freeze())
110+
})?;
111+
112+
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
113+
114+
// now read the responses
115+
116+
match responses.next().await? {
117+
Message::ParseComplete => {}
118+
_ => return Err(Error::unexpected_message()),
119+
}
120+
match responses.next().await? {
121+
Message::BindComplete => {}
122+
_ => return Err(Error::unexpected_message()),
123+
}
124+
let row_description = match responses.next().await? {
125+
Message::RowDescription(body) => Some(body),
126+
Message::NoData => None,
127+
_ => return Err(Error::unexpected_message()),
128+
};
129+
130+
// construct statement object
131+
132+
let parameters = vec![Type::UNKNOWN; params_len];
133+
134+
let mut columns = vec![];
135+
if let Some(row_description) = row_description {
136+
let mut it = row_description.fields();
137+
while let Some(field) = it.next().map_err(Error::parse)? {
138+
// NB: for some types that function may send a query to the server. At least in
139+
// raw text mode we don't need that info and can skip this.
140+
let type_ = get_type(client, field.type_oid()).await?;
141+
let column = Column::new(field.name().to_string(), type_, field);
142+
columns.push(column);
143+
}
144+
}
145+
146+
let statement = Statement::new_text(client, "".to_owned(), parameters, columns);
147+
148+
Ok(RowStream::new(statement, responses))
149+
}
150+
61151
pub async fn query_portal(
62152
client: &InnerClient,
63153
portal: &Portal,

tokio-postgres/src/transaction.rs

+10
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,16 @@ impl<'a> Transaction<'a> {
149149
self.client.query_raw(statement, params).await
150150
}
151151

152+
/// Like `Client::query_raw_txt`.
153+
pub async fn query_raw_txt<S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
154+
where
155+
S: AsRef<str> + Sync + Send,
156+
I: IntoIterator<Item = Option<S>>,
157+
I::IntoIter: ExactSizeIterator + Sync + Send,
158+
{
159+
self.client.query_raw_txt(query, params).await
160+
}
161+
152162
/// Like `Client::execute`.
153163
pub async fn execute<T>(
154164
&self,

0 commit comments

Comments
 (0)