Skip to content

Commit be8c650

Browse files
committed
Add a new exec_simple_query method
This adds a new `exec_simple_query` method so we can make 'out of band' queries to servers that don't interfere with pools at all. In order to reuse startup code for making these simple queries, we need to set the stats (`Reporter`) optional, so using these simple queries wont interfere with stats.
1 parent 9a668e5 commit be8c650

File tree

1 file changed

+113
-5
lines changed

1 file changed

+113
-5
lines changed

src/server.rs

Lines changed: 113 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ pub struct Server {
6262
connected_at: chrono::naive::NaiveDateTime,
6363

6464
/// Reports various metrics, e.g. data sent & received.
65-
stats: Reporter,
65+
stats: Option<Reporter>,
6666

6767
/// Application name using the server at the moment.
6868
application_name: String,
@@ -82,7 +82,7 @@ impl Server {
8282
user: &User,
8383
database: &str,
8484
client_server_map: ClientServerMap,
85-
stats: Reporter,
85+
stats: Option<Reporter>,
8686
) -> Result<Server, Error> {
8787
let mut stream =
8888
match TcpStream::connect(&format!("{}:{}", &address.host, address.port)).await {
@@ -396,7 +396,10 @@ impl Server {
396396
/// Send messages to the server from the client.
397397
pub async fn send(&mut self, messages: &BytesMut) -> Result<(), Error> {
398398
self.mirror_send(messages);
399-
self.stats.data_sent(messages.len(), self.server_id);
399+
400+
if let Some(stats) = &self.stats {
401+
stats.data_sent(messages.len(), self.server_id);
402+
}
400403

401404
match write_all_half(&mut self.write, messages).await {
402405
Ok(_) => {
@@ -545,7 +548,9 @@ impl Server {
545548
let bytes = self.buffer.clone();
546549

547550
// Keep track of how much data we got from the server for stats.
548-
self.stats.data_received(bytes.len(), self.server_id);
551+
if let Some(stats) = &self.stats {
552+
stats.data_received(bytes.len(), self.server_id);
553+
}
549554

550555
// Clear the buffer for next query.
551556
self.buffer.clear();
@@ -700,6 +705,106 @@ impl Server {
700705
None => (),
701706
}
702707
}
708+
709+
// This is so we can execute out of band queries to the server.
710+
// The connection will be opened, the query executed and closed.
711+
pub async fn exec_simple_query(
712+
address: &Address,
713+
user: &User,
714+
query: &str,
715+
) -> Result<Vec<String>, Error> {
716+
let client_server_map: ClientServerMap = Arc::new(Mutex::new(HashMap::new()));
717+
718+
debug!("Connecting to server to obtain auth hashes.");
719+
let mut server = Server::startup(
720+
0,
721+
address,
722+
user,
723+
&address.database,
724+
client_server_map,
725+
None,
726+
Arc::new(RwLock::new(None)),
727+
)
728+
.await?;
729+
debug!("Connected!, sending query.");
730+
server.send(&simple_query(query)).await?;
731+
let mut message = server.recv().await?;
732+
733+
Ok(parse_query_message(&mut message).await?)
734+
}
735+
}
736+
737+
async fn parse_query_message(message: &mut BytesMut) -> Result<Vec<String>, Error> {
738+
let mut pair = Vec::<String>::new();
739+
match message::backend::Message::parse(message) {
740+
Ok(Some(message::backend::Message::RowDescription(_description))) => {}
741+
Ok(Some(message::backend::Message::ErrorResponse(err))) => {
742+
return Err(Error::ProtocolSyncError(format!(
743+
"Protocol error parsing response. Err: {:?}",
744+
err.fields()
745+
.iterator()
746+
.fold(String::default(), |acc, element| acc
747+
+ element.unwrap().value())
748+
)))
749+
}
750+
Ok(_) => {
751+
return Err(Error::ProtocolSyncError(
752+
"Protocol error, expected Row Description.".to_string(),
753+
))
754+
}
755+
Err(err) => {
756+
return Err(Error::ProtocolSyncError(format!(
757+
"Protocol error parsing response. Err: {:?}",
758+
err
759+
)))
760+
}
761+
}
762+
763+
while !message.is_empty() {
764+
match message::backend::Message::parse(message) {
765+
Ok(postgres_message) => {
766+
match postgres_message {
767+
Some(message::backend::Message::DataRow(data)) => {
768+
let buf = data.buffer();
769+
trace!("Data: {:?}", buf);
770+
771+
for item in data.ranges().iterator() {
772+
match item.as_ref() {
773+
Ok(range) => match range {
774+
Some(range) => {
775+
pair.push(String::from_utf8_lossy(&buf[range.clone()]).to_string());
776+
}
777+
None => return Err(Error::ProtocolSyncError(String::from(
778+
"Data expected while receiving query auth data, found nothing.",
779+
))),
780+
},
781+
Err(err) => {
782+
return Err(Error::ProtocolSyncError(format!(
783+
"Data error, err: {:?}",
784+
err
785+
)))
786+
}
787+
}
788+
}
789+
}
790+
Some(message::backend::Message::CommandComplete(_)) => {}
791+
Some(message::backend::Message::ReadyForQuery(_)) => {}
792+
_ => {
793+
return Err(Error::ProtocolSyncError(
794+
"Unexpected message while receiving auth query data.".to_string(),
795+
))
796+
}
797+
}
798+
}
799+
Err(err) => {
800+
return Err(Error::ProtocolSyncError(format!(
801+
"Parse error, err: {:?}",
802+
err
803+
)))
804+
}
805+
};
806+
}
807+
Ok(pair)
703808
}
704809

705810
impl Drop for Server {
@@ -708,7 +813,10 @@ impl Drop for Server {
708813
/// for a write.
709814
fn drop(&mut self) {
710815
self.mirror_disconnect();
711-
self.stats.server_disconnecting(self.server_id);
816+
817+
if let Some(stats) = &self.stats {
818+
stats.server_disconnecting(self.server_id);
819+
}
712820

713821
let mut bytes = BytesMut::with_capacity(4);
714822
bytes.put_u8(b'X');

0 commit comments

Comments
 (0)