Skip to content

Commit bc89955

Browse files
committed
Implement IDENTIFY_SYSTEM command.
1 parent 64eb4a3 commit bc89955

File tree

2 files changed

+110
-43
lines changed

2 files changed

+110
-43
lines changed

tokio-postgres/src/replication_client.rs

Lines changed: 109 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -7,70 +7,136 @@ use bytes::Bytes;
77
use fallible_iterator::FallibleIterator;
88
use futures::{ready, Stream};
99
use pin_project_lite::pin_project;
10-
use postgres_protocol::message::backend::{Field, Message, ReplicationMessage};
10+
use postgres_protocol::message::backend::{Message, ReplicationMessage};
1111
use std::io;
1212
use std::marker::PhantomPinned;
13-
use std::ops::Range;
1413
use std::pin::Pin;
14+
use std::str::from_utf8;
1515
use std::task::{Context, Poll};
1616

17+
#[derive(Debug)]
18+
pub struct IdentifySystem {
19+
systemid: String,
20+
timeline: u32,
21+
xlogpos: Lsn,
22+
dbname: String,
23+
}
24+
25+
impl IdentifySystem {
26+
pub fn systemid(&self) -> &str {
27+
&self.systemid
28+
}
29+
30+
pub fn timeline(&self) -> u32 {
31+
self.timeline
32+
}
33+
34+
pub fn xlogpos(&self) -> Lsn {
35+
self.xlogpos
36+
}
37+
38+
pub fn dbname(&self) -> &str {
39+
&self.dbname
40+
}
41+
}
42+
1743
/// Replication client connection.
1844
///
1945
/// A replication client is used to issue replication commands, begin
2046
/// streaming, and send status updates to the server.
2147
pub struct ReplicationClient(pub(crate) Client);
2248

2349
impl ReplicationClient {
50+
/// IDENTIFY_SYSTEM message
51+
pub async fn identify_system(&self) -> Result<IdentifySystem, Error> {
52+
let iclient = self.0.inner();
53+
let command = format!("IDENTIFY_SYSTEM");
54+
let buf = simple_query::encode(iclient, &command)?;
55+
let mut responses = iclient.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
56+
57+
let rowdesc = match responses.next().await? {
58+
Message::RowDescription(m) => m,
59+
_ => return Err(Error::unexpected_message()),
60+
};
61+
let datarow = match responses.next().await? {
62+
Message::DataRow(m) => m,
63+
_ => return Err(Error::unexpected_message()),
64+
};
65+
match responses.next().await? {
66+
Message::CommandComplete(_) => (),
67+
_ => return Err(Error::unexpected_message()),
68+
};
69+
match responses.next().await? {
70+
Message::ReadyForQuery(_) => (),
71+
_ => return Err(Error::unexpected_message()),
72+
};
73+
74+
let fields = rowdesc.fields().collect::<Vec<_>>().map_err(Error::parse)?;
75+
let ranges = datarow.ranges().collect::<Vec<_>>().map_err(Error::parse)?;
76+
77+
if fields.len() != 4
78+
|| fields[0].type_oid() != Type::TEXT.oid()
79+
|| fields[1].type_oid() != Type::INT4.oid()
80+
|| fields[2].type_oid() != Type::TEXT.oid()
81+
|| fields[3].type_oid() != Type::TEXT.oid()
82+
|| ranges.len() != 4
83+
{
84+
return Err(Error::parse(io::Error::new(
85+
io::ErrorKind::InvalidInput,
86+
"expected (text, int4, text, text) result",
87+
)));
88+
};
89+
90+
let str_values = ranges
91+
.iter()
92+
.map(|r| from_utf8(&datarow.buffer()[r.to_owned().unwrap()]).unwrap())
93+
.collect::<Vec<_>>();
94+
95+
Ok(IdentifySystem {
96+
systemid: String::from(str_values[0]),
97+
timeline: str_values[1].parse::<u32>().unwrap(),
98+
xlogpos: Lsn::from(str_values[2]),
99+
dbname: String::from(str_values[3]),
100+
})
101+
}
102+
24103
/// show the value of the given setting
25-
pub async fn show(&self, name: &str) -> Result<Vec<u8>, Error> {
104+
pub async fn show(&self, name: &str) -> Result<String, Error> {
26105
let iclient = self.0.inner();
27106
let command = format!("SHOW \"{}\"", name);
28107
let buf = simple_query::encode(iclient, &command)?;
29108
let mut responses = iclient.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
109+
110+
let rowdesc = match responses.next().await? {
111+
Message::RowDescription(m) => m,
112+
_ => return Err(Error::unexpected_message()),
113+
};
114+
let datarow = match responses.next().await? {
115+
Message::DataRow(m) => m,
116+
_ => return Err(Error::unexpected_message()),
117+
};
30118
match responses.next().await? {
31-
Message::RowDescription(rowdesc) => {
32-
let fields: Vec<Field<'_>> = rowdesc.fields().collect().map_err(Error::parse)?;
33-
if fields.len() != 1 {
34-
return Err(Error::parse(io::Error::new(
35-
io::ErrorKind::InvalidInput,
36-
"expected single column in response",
37-
)))
38-
}
39-
if fields[0].type_oid() != Type::TEXT.oid() {
40-
return Err(Error::parse(io::Error::new(
41-
io::ErrorKind::InvalidInput,
42-
"expected single text column in response",
43-
)))
44-
}
45-
}
119+
Message::CommandComplete(_) => (),
46120
_ => return Err(Error::unexpected_message()),
47121
};
48-
49122
match responses.next().await? {
50-
Message::DataRow(d) => {
51-
let ranges: Vec<Option<Range<usize>>> = d.ranges().collect().map_err(Error::parse)?;
52-
if ranges.len() != 1 {
53-
return Err(Error::parse(io::Error::new(
54-
io::ErrorKind::InvalidInput,
55-
"expected single column in response",
56-
)))
57-
}
58-
// fetch only column
59-
match &ranges[0] {
60-
Some(r) => Ok(Vec::from(&d.buffer()[r.to_owned()])),
61-
None => {
62-
Err(Error::parse(io::Error::new(
63-
io::ErrorKind::InvalidInput,
64-
"unexpected NULL setting in response",
65-
)))
66-
}
67-
}
68-
}
69-
m => {
70-
dbg!(m);
71-
Err(Error::unexpected_message())
72-
}
73-
}
123+
Message::ReadyForQuery(_) => (),
124+
_ => return Err(Error::unexpected_message()),
125+
};
126+
127+
let fields = rowdesc.fields().collect::<Vec<_>>().map_err(Error::parse)?;
128+
let ranges = datarow.ranges().collect::<Vec<_>>().map_err(Error::parse)?;
129+
130+
if fields.len() != 1 || fields[0].type_oid() != Type::TEXT.oid() || ranges.len() != 1 {
131+
return Err(Error::parse(io::Error::new(
132+
io::ErrorKind::InvalidInput,
133+
"expected single text column in response",
134+
)));
135+
};
136+
137+
let val = from_utf8(&datarow.buffer()[ranges[0].to_owned().unwrap()]).unwrap();
138+
139+
Ok(String::from(val))
74140
}
75141

76142
/// Begin physical replication, consuming the replication client and producing a replication stream.

tokio-postgres/src/types.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
pub use postgres_types::*;
77

88
/// Log Sequence Number for PostgreSQL Write-Ahead Log (transaction log).
9+
#[derive(Clone, Copy, Debug)]
910
pub struct Lsn(u64);
1011

1112
impl From<&str> for Lsn {

0 commit comments

Comments
 (0)