Skip to content
This repository was archived by the owner on Jan 14, 2025. It is now read-only.

Commit 3491360

Browse files
conradludgateJulius de Bruijn
authored and
Julius de Bruijn
committed
Connection changes (sfackler#21)
* refactor query_raw_txt to use a pre-prepared statement * expose ready_status on RowStream
1 parent 1ada04a commit 3491360

File tree

8 files changed

+104
-105
lines changed

8 files changed

+104
-105
lines changed

.github/workflows/ci.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ jobs:
5656
- uses: actions/checkout@v3
5757
- uses: sfackler/actions/rustup@master
5858
with:
59-
version: 1.65.0
59+
version: 1.67.0
6060
- run: echo "::set-output name=version::$(rustc --version)"
6161
id: rust-version
6262
- run: rustup target add wasm32-unknown-unknown

tokio-postgres/src/client.rs

+10-4
Original file line numberDiff line numberDiff line change
@@ -372,13 +372,19 @@ impl Client {
372372

373373
/// Pass text directly to the Postgres backend to allow it to sort out typing itself and
374374
/// to save a roundtrip
375-
pub async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
375+
pub async fn query_raw_txt<'a, T, S, I>(
376+
&self,
377+
statement: &T,
378+
params: I,
379+
) -> Result<RowStream, Error>
376380
where
377-
S: AsRef<str> + Sync + Send,
381+
T: ?Sized + ToStatement,
382+
S: AsRef<str>,
378383
I: IntoIterator<Item = Option<S>>,
379-
I::IntoIter: ExactSizeIterator + Sync + Send,
384+
I::IntoIter: ExactSizeIterator,
380385
{
381-
query::query_txt(&self.inner, query, params).await
386+
let statement = statement.__convert().into_statement(self).await?;
387+
query::query_txt(&self.inner, statement, params).await
382388
}
383389

384390
/// Executes a statement, returning the number of rows modified.

tokio-postgres/src/generic_client.rs

+12-5
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,13 @@ pub trait GenericClient: private::Sealed {
5757
I::IntoIter: ExactSizeIterator;
5858

5959
/// Like `Client::query_raw_txt`.
60-
async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
60+
async fn query_raw_txt<'a, T, S, I>(
61+
&self,
62+
statement: &T,
63+
params: I,
64+
) -> Result<RowStream, Error>
6165
where
66+
T: ?Sized + ToStatement + Sync + Send,
6267
S: AsRef<str> + Sync + Send,
6368
I: IntoIterator<Item = Option<S>> + Sync + Send,
6469
I::IntoIter: ExactSizeIterator + Sync + Send;
@@ -143,13 +148,14 @@ impl GenericClient for Client {
143148
self.query_raw(statement, params).await
144149
}
145150

146-
async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
151+
async fn query_raw_txt<'a, T, S, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
147152
where
153+
T: ?Sized + ToStatement + Sync + Send,
148154
S: AsRef<str> + Sync + Send,
149155
I: IntoIterator<Item = Option<S>> + Sync + Send,
150156
I::IntoIter: ExactSizeIterator + Sync + Send,
151157
{
152-
self.query_raw_txt(query, params).await
158+
self.query_raw_txt(statement, params).await
153159
}
154160

155161
async fn prepare(&self, query: &str) -> Result<Statement, Error> {
@@ -238,13 +244,14 @@ impl GenericClient for Transaction<'_> {
238244
self.query_raw(statement, params).await
239245
}
240246

241-
async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
247+
async fn query_raw_txt<'a, T, S, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
242248
where
249+
T: ?Sized + ToStatement + Sync + Send,
243250
S: AsRef<str> + Sync + Send,
244251
I: IntoIterator<Item = Option<S>> + Sync + Send,
245252
I::IntoIter: ExactSizeIterator + Sync + Send,
246253
{
247-
self.query_raw_txt(query, params).await
254+
self.query_raw_txt(statement, params).await
248255
}
249256

250257
async fn prepare(&self, query: &str) -> Result<Statement, Error> {

tokio-postgres/src/query.rs

+36-62
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
use crate::client::{InnerClient, Responses};
22
use crate::codec::FrontendMessage;
33
use crate::connection::RequestMessages;
4-
use crate::prepare::get_type;
54
use crate::types::{BorrowToSql, IsNull};
6-
use crate::{Column, Error, Portal, Row, Statement};
5+
use crate::{Error, Portal, Row, Statement};
76
use bytes::{BufMut, Bytes, BytesMut};
8-
use fallible_iterator::FallibleIterator;
97
use futures_util::{ready, Stream};
108
use log::{debug, log_enabled, Level};
119
use pin_project_lite::pin_project;
1210
use postgres_protocol::message::backend::{CommandCompleteBody, Message};
1311
use postgres_protocol::message::frontend;
14-
use postgres_types::Type;
12+
use postgres_types::Format;
1513
use std::fmt;
1614
use std::marker::PhantomPinned;
1715
use std::pin::Pin;
@@ -58,30 +56,29 @@ where
5856
responses,
5957
rows_affected: None,
6058
command_tag: None,
59+
status: None,
60+
output_format: Format::Binary,
6161
_p: PhantomPinned,
6262
})
6363
}
6464

6565
pub async fn query_txt<S, I>(
6666
client: &Arc<InnerClient>,
67-
query: S,
67+
statement: Statement,
6868
params: I,
6969
) -> Result<RowStream, Error>
7070
where
71-
S: AsRef<str> + Sync + Send,
71+
S: AsRef<str>,
7272
I: IntoIterator<Item = Option<S>>,
7373
I::IntoIter: ExactSizeIterator,
7474
{
7575
let params = params.into_iter();
76-
let params_len = params.len();
7776

7877
let buf = client.with_buf(|buf| {
79-
// Parse, anonymous portal
80-
frontend::parse("", query.as_ref(), std::iter::empty(), buf).map_err(Error::encode)?;
8178
// Bind, pass params as text, retrieve as binary
8279
match frontend::bind(
8380
"", // empty string selects the unnamed portal
84-
"", // empty string selects the unnamed prepared statement
81+
statement.name(), // named prepared statement
8582
std::iter::empty(), // all parameters use the default format (text)
8683
params,
8784
|param, buf| match param {
@@ -99,8 +96,6 @@ where
9996
Err(frontend::BindError::Serialization(e)) => Err(Error::encode(e)),
10097
}?;
10198

102-
// Describe portal to typecast results
103-
frontend::describe(b'P', "", buf).map_err(Error::encode)?;
10499
// Execute
105100
frontend::execute("", 0, buf).map_err(Error::encode)?;
106101
// Sync
@@ -109,43 +104,16 @@ where
109104
Ok(buf.split().freeze())
110105
})?;
111106

112-
let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
113-
114107
// now read the responses
115-
116-
match responses.next().await? {
117-
Message::ParseComplete => {}
118-
_ => return Err(Error::unexpected_message()),
119-
}
120-
match responses.next().await? {
121-
Message::BindComplete => {}
122-
_ => return Err(Error::unexpected_message()),
123-
}
124-
let row_description = match responses.next().await? {
125-
Message::RowDescription(body) => Some(body),
126-
Message::NoData => None,
127-
_ => return Err(Error::unexpected_message()),
128-
};
129-
130-
// construct statement object
131-
132-
let parameters = vec![Type::UNKNOWN; params_len];
133-
134-
let mut columns = vec![];
135-
if let Some(row_description) = row_description {
136-
let mut it = row_description.fields();
137-
while let Some(field) = it.next().map_err(Error::parse)? {
138-
// NB: for some types that function may send a query to the server. At least in
139-
// raw text mode we don't need that info and can skip this.
140-
let type_ = get_type(client, field.type_oid()).await?;
141-
let column = Column::new(field.name().to_string(), type_, field);
142-
columns.push(column);
143-
}
144-
}
145-
146-
let statement = Statement::new_text(client, "".to_owned(), parameters, columns);
147-
148-
Ok(RowStream::new(statement, responses))
108+
let responses = start(client, buf).await?;
109+
Ok(RowStream {
110+
statement,
111+
responses,
112+
command_tag: None,
113+
status: None,
114+
output_format: Format::Text,
115+
_p: PhantomPinned,
116+
})
149117
}
150118

151119
pub async fn query_portal(
@@ -166,6 +134,8 @@ pub async fn query_portal(
166134
responses,
167135
rows_affected: None,
168136
command_tag: None,
137+
status: None,
138+
output_format: Format::Binary,
169139
_p: PhantomPinned,
170140
})
171141
}
@@ -301,23 +271,13 @@ pin_project! {
301271
responses: Responses,
302272
rows_affected: Option<u64>,
303273
command_tag: Option<String>,
274+
output_format: Format,
275+
status: Option<u8>,
304276
#[pin]
305277
_p: PhantomPinned,
306278
}
307279
}
308280

309-
impl RowStream {
310-
/// Creates a new `RowStream`.
311-
pub fn new(statement: Statement, responses: Responses) -> Self {
312-
RowStream {
313-
statement,
314-
responses,
315-
command_tag: None,
316-
_p: PhantomPinned,
317-
}
318-
}
319-
}
320-
321281
impl Stream for RowStream {
322282
type Item = Result<Row, Error>;
323283

@@ -326,7 +286,11 @@ impl Stream for RowStream {
326286
loop {
327287
match ready!(this.responses.poll_next(cx)?) {
328288
Message::DataRow(body) => {
329-
return Poll::Ready(Some(Ok(Row::new(this.statement.clone(), body)?)))
289+
return Poll::Ready(Some(Ok(Row::new(
290+
this.statement.clone(),
291+
body,
292+
*this.output_format,
293+
)?)))
330294
}
331295
Message::CommandComplete(body) => {
332296
*this.rows_affected = Some(extract_row_affected(&body)?);
@@ -336,7 +300,10 @@ impl Stream for RowStream {
336300
}
337301
}
338302
Message::EmptyQueryResponse | Message::PortalSuspended => {}
339-
Message::ReadyForQuery(_) => return Poll::Ready(None),
303+
Message::ReadyForQuery(status) => {
304+
*this.status = Some(status.status());
305+
return Poll::Ready(None);
306+
}
340307
_ => return Poll::Ready(Some(Err(Error::unexpected_message()))),
341308
}
342309
}
@@ -357,4 +324,11 @@ impl RowStream {
357324
pub fn command_tag(&self) -> Option<String> {
358325
self.command_tag.clone()
359326
}
327+
328+
/// Returns if the connection is ready for querying, with the status of the connection.
329+
///
330+
/// This might be available only after the stream has been exhausted.
331+
pub fn ready_status(&self) -> Option<u8> {
332+
self.status
333+
}
360334
}

tokio-postgres/src/row.rs

+8-2
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ where
9898
/// A row of data returned from the database by a query.
9999
pub struct Row {
100100
statement: Statement,
101+
output_format: Format,
101102
body: DataRowBody,
102103
ranges: Vec<Option<Range<usize>>>,
103104
}
@@ -111,12 +112,17 @@ impl fmt::Debug for Row {
111112
}
112113

113114
impl Row {
114-
pub(crate) fn new(statement: Statement, body: DataRowBody) -> Result<Row, Error> {
115+
pub(crate) fn new(
116+
statement: Statement,
117+
body: DataRowBody,
118+
output_format: Format,
119+
) -> Result<Row, Error> {
115120
let ranges = body.ranges().collect().map_err(Error::parse)?;
116121
Ok(Row {
117122
statement,
118123
body,
119124
ranges,
125+
output_format,
120126
})
121127
}
122128

@@ -193,7 +199,7 @@ impl Row {
193199
///
194200
/// Useful when using query_raw_txt() which sets text transfer mode
195201
pub fn as_text(&self, idx: usize) -> Result<Option<&str>, Error> {
196-
if self.statement.output_format() == Format::Text {
202+
if self.output_format == Format::Text {
197203
match self.col_buffer(idx) {
198204
Some(raw) => {
199205
FromSql::from_sql(&Type::TEXT, raw).map_err(|e| Error::from_sql(e, idx))

tokio-postgres/src/statement.rs

-23
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use postgres_protocol::{
66
message::{backend::Field, frontend},
77
Oid,
88
};
9-
use postgres_types::Format;
109
use std::{
1110
fmt,
1211
sync::{Arc, Weak},
@@ -17,7 +16,6 @@ struct StatementInner {
1716
name: String,
1817
params: Vec<Type>,
1918
columns: Vec<Column>,
20-
output_format: Format,
2119
}
2220

2321
impl Drop for StatementInner {
@@ -51,22 +49,6 @@ impl Statement {
5149
name,
5250
params,
5351
columns,
54-
output_format: Format::Binary,
55-
}))
56-
}
57-
58-
pub(crate) fn new_text(
59-
inner: &Arc<InnerClient>,
60-
name: String,
61-
params: Vec<Type>,
62-
columns: Vec<Column>,
63-
) -> Statement {
64-
Statement(Arc::new(StatementInner {
65-
client: Arc::downgrade(inner),
66-
name,
67-
params,
68-
columns,
69-
output_format: Format::Text,
7052
}))
7153
}
7254

@@ -83,11 +65,6 @@ impl Statement {
8365
pub fn columns(&self) -> &[Column] {
8466
&self.0.columns
8567
}
86-
87-
/// Returns output format for the statement.
88-
pub fn output_format(&self) -> Format {
89-
self.0.output_format
90-
}
9168
}
9269

9370
/// Information about a column of a query.

tokio-postgres/src/transaction.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,14 @@ impl<'a> Transaction<'a> {
150150
}
151151

152152
/// Like `Client::query_raw_txt`.
153-
pub async fn query_raw_txt<S, I>(&self, query: S, params: I) -> Result<RowStream, Error>
153+
pub async fn query_raw_txt<T, S, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
154154
where
155-
S: AsRef<str> + Sync + Send,
155+
T: ?Sized + ToStatement,
156+
S: AsRef<str>,
156157
I: IntoIterator<Item = Option<S>>,
157-
I::IntoIter: ExactSizeIterator + Sync + Send,
158+
I::IntoIter: ExactSizeIterator,
158159
{
159-
self.client.query_raw_txt(query, params).await
160+
self.client.query_raw_txt(statement, params).await
160161
}
161162

162163
/// Like `Client::execute`.

0 commit comments

Comments
 (0)