Skip to content
This repository was archived by the owner on Jan 14, 2025. It is now read-only.

Commit 954e9eb

Browse files
skyzhJakub Wieczorek
authored and
Jakub Wieczorek
committed
add query_raw_txt for transaction (sfackler#20)
Signed-off-by: Alex Chi <[email protected]>
1 parent b4c6c19 commit 954e9eb

File tree

4 files changed

+131
-83
lines changed

4 files changed

+131
-83
lines changed

tokio-postgres/src/client.rs

+4-81
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@ use crate::connection::{Request, RequestMessages};
44
use crate::copy_out::CopyOutStream;
55
#[cfg(feature = "runtime")]
66
use crate::keepalive::KeepaliveConfig;
7-
use crate::prepare::get_type;
87
use crate::query::RowStream;
98
use crate::simple_query::SimpleQueryStream;
10-
use crate::statement::Column;
119
#[cfg(feature = "runtime")]
1210
use crate::tls::MakeTlsConnect;
1311
use crate::tls::TlsConnect;
@@ -18,7 +16,7 @@ use crate::{
1816
copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken, CopyInSink, Error,
1917
Row, SimpleQueryMessage, Statement, ToStatement, Transaction, TransactionBuilder,
2018
};
21-
use bytes::{Buf, BufMut, BytesMut};
19+
use bytes::{Buf, BytesMut};
2220
use fallible_iterator::FallibleIterator;
2321
use futures_channel::mpsc;
2422
use futures_util::{future, pin_mut, ready, StreamExt, TryStreamExt};
@@ -374,86 +372,11 @@ impl Client {
374372
/// to save a roundtrip
375373
pub async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
376374
where
377-
S: AsRef<str>,
375+
S: AsRef<str> + Sync + Send,
378376
I: IntoIterator<Item = Option<S>>,
379-
I::IntoIter: ExactSizeIterator,
377+
I::IntoIter: ExactSizeIterator + Sync + Send,
380378
{
381-
let params = params.into_iter();
382-
let params_len = params.len();
383-
384-
let buf = self.inner.with_buf(|buf| {
385-
// Parse, anonymous portal
386-
frontend::parse("", query.as_ref(), std::iter::empty(), buf).map_err(Error::encode)?;
387-
// Bind, pass params as text, retrieve as binary
388-
match frontend::bind(
389-
"", // empty string selects the unnamed portal
390-
"", // empty string selects the unnamed prepared statement
391-
std::iter::empty(), // all parameters use the default format (text)
392-
params,
393-
|param, buf| match param {
394-
Some(param) => {
395-
buf.put_slice(param.as_ref().as_bytes());
396-
Ok(postgres_protocol::IsNull::No)
397-
}
398-
None => Ok(postgres_protocol::IsNull::Yes),
399-
},
400-
Some(0), // all text
401-
buf,
402-
) {
403-
Ok(()) => Ok(()),
404-
Err(frontend::BindError::Conversion(e)) => Err(Error::to_sql(e, 0)),
405-
Err(frontend::BindError::Serialization(e)) => Err(Error::encode(e)),
406-
}?;
407-
408-
// Describe portal to typecast results
409-
frontend::describe(b'P', "", buf).map_err(Error::encode)?;
410-
// Execute
411-
frontend::execute("", 0, buf).map_err(Error::encode)?;
412-
// Sync
413-
frontend::sync(buf);
414-
415-
Ok(buf.split().freeze())
416-
})?;
417-
418-
let mut responses = self
419-
.inner
420-
.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
421-
422-
// now read the responses
423-
424-
match responses.next().await? {
425-
Message::ParseComplete => {}
426-
_ => return Err(Error::unexpected_message()),
427-
}
428-
match responses.next().await? {
429-
Message::BindComplete => {}
430-
_ => return Err(Error::unexpected_message()),
431-
}
432-
let row_description = match responses.next().await? {
433-
Message::RowDescription(body) => Some(body),
434-
Message::NoData => None,
435-
_ => return Err(Error::unexpected_message()),
436-
};
437-
438-
// construct statement object
439-
440-
let parameters = vec![Type::UNKNOWN; params_len];
441-
442-
let mut columns = vec![];
443-
if let Some(row_description) = row_description {
444-
let mut it = row_description.fields();
445-
while let Some(field) = it.next().map_err(Error::parse)? {
446-
// NB: for some types that function may send a query to the server. At least in
447-
// raw text mode we don't need that info and can skip this.
448-
let type_ = get_type(&self.inner, field.type_oid()).await?;
449-
let column = Column::new(field.name().to_string(), type_, field);
450-
columns.push(column);
451-
}
452-
}
453-
454-
let statement = Statement::new_text(&self.inner, "".to_owned(), parameters, columns);
455-
456-
Ok(RowStream::new(statement, responses))
379+
query::query_txt(&self.inner, query, params).await
457380
}
458381

459382
/// Executes a statement, returning the number of rows modified.

tokio-postgres/src/generic_client.rs

+25
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ pub trait GenericClient: private::Sealed {
5656
I: IntoIterator<Item = P> + Sync + Send,
5757
I::IntoIter: ExactSizeIterator;
5858

59+
/// Like `Client::query_raw_txt`.
60+
async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
61+
where
62+
S: AsRef<str> + Sync + Send,
63+
I: IntoIterator<Item = Option<S>> + Sync + Send,
64+
I::IntoIter: ExactSizeIterator + Sync + Send;
65+
5966
/// Like `Client::prepare`.
6067
async fn prepare(&self, query: &str) -> Result<Statement, Error>;
6168

@@ -136,6 +143,15 @@ impl GenericClient for Client {
136143
self.query_raw(statement, params).await
137144
}
138145

146+
async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
147+
where
148+
S: AsRef<str> + Sync + Send,
149+
I: IntoIterator<Item = Option<S>> + Sync + Send,
150+
I::IntoIter: ExactSizeIterator + Sync + Send,
151+
{
152+
self.query_raw_txt(query, params).await
153+
}
154+
139155
async fn prepare(&self, query: &str) -> Result<Statement, Error> {
140156
self.prepare(query).await
141157
}
@@ -222,6 +238,15 @@ impl GenericClient for Transaction<'_> {
222238
self.query_raw(statement, params).await
223239
}
224240

241+
async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
242+
where
243+
S: AsRef<str> + Sync + Send,
244+
I: IntoIterator<Item = Option<S>> + Sync + Send,
245+
I::IntoIter: ExactSizeIterator + Sync + Send,
246+
{
247+
self.query_raw_txt(query, params).await
248+
}
249+
225250
async fn prepare(&self, query: &str) -> Result<Statement, Error> {
226251
self.prepare(query).await
227252
}

tokio-postgres/src/query.rs

+92-2
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
11
use crate::client::{InnerClient, Responses};
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
4+
use crate::prepare::get_type;
45
use crate::types::{BorrowToSql, IsNull};
5-
use crate::{Error, Portal, Row, Statement};
6-
use bytes::{Bytes, BytesMut};
6+
use crate::{Column, Error, Portal, Row, Statement};
7+
use bytes::{BufMut, Bytes, BytesMut};
8+
use fallible_iterator::FallibleIterator;
79
use futures_util::{ready, Stream};
810
use log::{debug, log_enabled, Level};
911
use pin_project_lite::pin_project;
1012
use postgres_protocol::message::backend::{CommandCompleteBody, Message};
1113
use postgres_protocol::message::frontend;
14+
use postgres_types::Type;
1215
use std::fmt;
1316
use std::marker::PhantomPinned;
1417
use std::pin::Pin;
18+
use std::sync::Arc;
1519
use std::task::{Context, Poll};
1620

1721
struct BorrowToSqlParamsDebug<'a, T>(&'a [T]);
@@ -58,6 +62,92 @@ where
5862
})
5963
}
6064

65+
pub async fn query_txt<S, I>(
66+
client: &Arc<InnerClient>,
67+
query: S,
68+
params: I,
69+
) -> Result<RowStream, Error>
70+
where
71+
S: AsRef<str> + Sync + Send,
72+
I: IntoIterator<Item = Option<S>>,
73+
I::IntoIter: ExactSizeIterator,
74+
{
75+
let params = params.into_iter();
76+
let params_len = params.len();
77+
78+
let buf = client.with_buf(|buf| {
79+
// Parse, anonymous portal
80+
frontend::parse("", query.as_ref(), std::iter::empty(), buf).map_err(Error::encode)?;
81+
// Bind, pass params as text, retrieve as binary
82+
match frontend::bind(
83+
"", // empty string selects the unnamed portal
84+
"", // empty string selects the unnamed prepared statement
85+
std::iter::empty(), // all parameters use the default format (text)
86+
params,
87+
|param, buf| match param {
88+
Some(param) => {
89+
buf.put_slice(param.as_ref().as_bytes());
90+
Ok(postgres_protocol::IsNull::No)
91+
}
92+
None => Ok(postgres_protocol::IsNull::Yes),
93+
},
94+
Some(0), // all text
95+
buf,
96+
) {
97+
Ok(()) => Ok(()),
98+
Err(frontend::BindError::Conversion(e)) => Err(Error::to_sql(e, 0)),
99+
Err(frontend::BindError::Serialization(e)) => Err(Error::encode(e)),
100+
}?;
101+
102+
// Describe portal to typecast results
103+
frontend::describe(b'P', "", buf).map_err(Error::encode)?;
104+
// Execute
105+
frontend::execute("", 0, buf).map_err(Error::encode)?;
106+
// Sync
107+
frontend::sync(buf);
108+
109+
Ok(buf.split().freeze())
110+
})?;
111+
112+
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
113+
114+
// now read the responses
115+
116+
match responses.next().await? {
117+
Message::ParseComplete => {}
118+
_ => return Err(Error::unexpected_message()),
119+
}
120+
match responses.next().await? {
121+
Message::BindComplete => {}
122+
_ => return Err(Error::unexpected_message()),
123+
}
124+
let row_description = match responses.next().await? {
125+
Message::RowDescription(body) => Some(body),
126+
Message::NoData => None,
127+
_ => return Err(Error::unexpected_message()),
128+
};
129+
130+
// construct statement object
131+
132+
let parameters = vec![Type::UNKNOWN; params_len];
133+
134+
let mut columns = vec![];
135+
if let Some(row_description) = row_description {
136+
let mut it = row_description.fields();
137+
while let Some(field) = it.next().map_err(Error::parse)? {
138+
// NB: for some types that function may send a query to the server. At least in
139+
// raw text mode we don't need that info and can skip this.
140+
let type_ = get_type(client, field.type_oid()).await?;
141+
let column = Column::new(field.name().to_string(), type_, field);
142+
columns.push(column);
143+
}
144+
}
145+
146+
let statement = Statement::new_text(client, "".to_owned(), parameters, columns);
147+
148+
Ok(RowStream::new(statement, responses))
149+
}
150+
61151
pub async fn query_portal(
62152
client: &InnerClient,
63153
portal: &Portal,

tokio-postgres/src/transaction.rs

+10
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,16 @@ impl<'a> Transaction<'a> {
149149
self.client.query_raw(statement, params).await
150150
}
151151

152+
/// Like `Client::query_raw_txt`.
153+
pub async fn query_raw_txt<S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
154+
where
155+
S: AsRef<str> + Sync + Send,
156+
I: IntoIterator<Item = Option<S>>,
157+
I::IntoIter: ExactSizeIterator + Sync + Send,
158+
{
159+
self.client.query_raw_txt(query, params).await
160+
}
161+
152162
/// Like `Client::execute`.
153163
pub async fn execute<T>(
154164
&self,

0 commit comments

Comments
 (0)