Skip to content

Commit 0e1f1aa

Browse files
committed
feat(rt): replace IO traits with hyper::rt ones
1 parent 8552543 commit 0e1f1aa

39 files changed

+863
-200
lines changed

benches/support/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
mod tokiort;
2-
pub use tokiort::{TokioExecutor, TokioTimer};
2+
pub use tokiort::{TokioExecutor, TokioIo, TokioTimer};

benches/support/tokiort.rs

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,149 @@ impl Future for TokioSleep {
7979
// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html
8080

8181
impl Sleep for TokioSleep {}
82+
83+
pin_project! {
84+
#[derive(Debug)]
85+
pub struct TokioIo<T> {
86+
#[pin]
87+
inner: T,
88+
}
89+
}
90+
91+
impl<T> TokioIo<T> {
92+
pub fn new(inner: T) -> Self {
93+
Self { inner }
94+
}
95+
96+
pub fn inner(self) -> T {
97+
self.inner
98+
}
99+
}
100+
101+
impl<T> hyper::rt::AsyncRead for TokioIo<T>
102+
where
103+
T: tokio::io::AsyncRead,
104+
{
105+
fn poll_read(
106+
self: Pin<&mut Self>,
107+
cx: &mut Context<'_>,
108+
mut buf: hyper::rt::ReadBufCursor<'_>,
109+
) -> Poll<Result<(), std::io::Error>> {
110+
let n = unsafe {
111+
let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
112+
match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) {
113+
Poll::Ready(Ok(())) => tbuf.filled().len(),
114+
other => return other,
115+
}
116+
};
117+
118+
unsafe {
119+
buf.advance(n);
120+
}
121+
Poll::Ready(Ok(()))
122+
}
123+
}
124+
125+
impl<T> hyper::rt::AsyncWrite for TokioIo<T>
126+
where
127+
T: tokio::io::AsyncWrite,
128+
{
129+
fn poll_write(
130+
self: Pin<&mut Self>,
131+
cx: &mut Context<'_>,
132+
buf: &[u8],
133+
) -> Poll<Result<usize, std::io::Error>> {
134+
tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
135+
}
136+
137+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
138+
tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
139+
}
140+
141+
fn poll_shutdown(
142+
self: Pin<&mut Self>,
143+
cx: &mut Context<'_>,
144+
) -> Poll<Result<(), std::io::Error>> {
145+
tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
146+
}
147+
148+
fn is_write_vectored(&self) -> bool {
149+
tokio::io::AsyncWrite::is_write_vectored(&self.inner)
150+
}
151+
152+
fn poll_write_vectored(
153+
self: Pin<&mut Self>,
154+
cx: &mut Context<'_>,
155+
bufs: &[std::io::IoSlice<'_>],
156+
) -> Poll<Result<usize, std::io::Error>> {
157+
tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs)
158+
}
159+
}
160+
161+
impl<T> tokio::io::AsyncRead for TokioIo<T>
162+
where
163+
T: hyper::rt::AsyncRead,
164+
{
165+
fn poll_read(
166+
self: Pin<&mut Self>,
167+
cx: &mut Context<'_>,
168+
tbuf: &mut tokio::io::ReadBuf<'_>,
169+
) -> Poll<Result<(), std::io::Error>> {
170+
//let init = tbuf.initialized().len();
171+
let filled = tbuf.filled().len();
172+
let sub_filled = unsafe {
173+
let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut());
174+
175+
match hyper::rt::AsyncRead::poll_read(self.project().inner, cx, buf.unfilled()) {
176+
Poll::Ready(Ok(())) => buf.filled().len(),
177+
other => return other,
178+
}
179+
};
180+
181+
let n_filled = filled + sub_filled;
182+
// At least sub_filled bytes had to have been initialized.
183+
let n_init = sub_filled;
184+
unsafe {
185+
tbuf.assume_init(n_init);
186+
tbuf.set_filled(n_filled);
187+
}
188+
189+
Poll::Ready(Ok(()))
190+
}
191+
}
192+
193+
impl<T> tokio::io::AsyncWrite for TokioIo<T>
194+
where
195+
T: hyper::rt::AsyncWrite,
196+
{
197+
fn poll_write(
198+
self: Pin<&mut Self>,
199+
cx: &mut Context<'_>,
200+
buf: &[u8],
201+
) -> Poll<Result<usize, std::io::Error>> {
202+
hyper::rt::AsyncWrite::poll_write(self.project().inner, cx, buf)
203+
}
204+
205+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
206+
hyper::rt::AsyncWrite::poll_flush(self.project().inner, cx)
207+
}
208+
209+
fn poll_shutdown(
210+
self: Pin<&mut Self>,
211+
cx: &mut Context<'_>,
212+
) -> Poll<Result<(), std::io::Error>> {
213+
hyper::rt::AsyncWrite::poll_shutdown(self.project().inner, cx)
214+
}
215+
216+
fn is_write_vectored(&self) -> bool {
217+
hyper::rt::AsyncWrite::is_write_vectored(&self.inner)
218+
}
219+
220+
fn poll_write_vectored(
221+
self: Pin<&mut Self>,
222+
cx: &mut Context<'_>,
223+
bufs: &[std::io::IoSlice<'_>],
224+
) -> Poll<Result<usize, std::io::Error>> {
225+
hyper::rt::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs)
226+
}
227+
}

examples/client.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ use hyper::Request;
88
use tokio::io::{self, AsyncWriteExt as _};
99
use tokio::net::TcpStream;
1010

11+
#[path = "../benches/support/mod.rs"]
12+
mod support;
13+
use support::TokioIo;
14+
1115
// A simple type alias so as to DRY.
1216
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
1317

@@ -40,8 +44,9 @@ async fn fetch_url(url: hyper::Uri) -> Result<()> {
4044
let port = url.port_u16().unwrap_or(80);
4145
let addr = format!("{}:{}", host, port);
4246
let stream = TcpStream::connect(addr).await?;
47+
let io = TokioIo::new(stream);
4348

44-
let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await?;
49+
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
4550
tokio::task::spawn(async move {
4651
if let Err(err) = conn.await {
4752
println!("Connection failed: {:?}", err);

examples/client_json.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ use hyper::{body::Buf, Request};
77
use serde::Deserialize;
88
use tokio::net::TcpStream;
99

10+
#[path = "../benches/support/mod.rs"]
11+
mod support;
12+
use support::TokioIo;
13+
1014
// A simple type alias so as to DRY.
1115
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
1216

@@ -29,8 +33,9 @@ async fn fetch_json(url: hyper::Uri) -> Result<Vec<User>> {
2933
let addr = format!("{}:{}", host, port);
3034

3135
let stream = TcpStream::connect(addr).await?;
36+
let io = TokioIo::new(stream);
3237

33-
let (mut sender, conn) = hyper::client::conn::http1::handshake(stream).await?;
38+
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
3439
tokio::task::spawn(async move {
3540
if let Err(err) = conn.await {
3641
println!("Connection failed: {:?}", err);

examples/echo.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ use hyper::service::service_fn;
1010
use hyper::{body::Body, Method, Request, Response, StatusCode};
1111
use tokio::net::TcpListener;
1212

13+
#[path = "../benches/support/mod.rs"]
14+
mod support;
15+
use support::TokioIo;
16+
1317
/// This is our service handler. It receives a Request, routes on its
1418
/// path, and returns a Future of a Response.
1519
async fn echo(
@@ -92,10 +96,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
9296
println!("Listening on http://{}", addr);
9397
loop {
9498
let (stream, _) = listener.accept().await?;
99+
let io = TokioIo::new(stream);
95100

96101
tokio::task::spawn(async move {
97102
if let Err(err) = http1::Builder::new()
98-
.serve_connection(stream, service_fn(echo))
103+
.serve_connection(io, service_fn(echo))
99104
.await
100105
{
101106
println!("Error serving connection: {:?}", err);

examples/gateway.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ use hyper::{server::conn::http1, service::service_fn};
44
use std::net::SocketAddr;
55
use tokio::net::{TcpListener, TcpStream};
66

7+
#[path = "../benches/support/mod.rs"]
8+
mod support;
9+
use support::TokioIo;
10+
711
#[tokio::main]
812
async fn main() -> Result<(), Box<dyn std::error::Error>> {
913
pretty_env_logger::init();
@@ -20,6 +24,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2024

2125
loop {
2226
let (stream, _) = listener.accept().await?;
27+
let io = TokioIo::new(stream);
2328

2429
// This is the `Service` that will handle the connection.
2530
// `service_fn` is a helper to convert a function that
@@ -42,9 +47,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
4247

4348
async move {
4449
let client_stream = TcpStream::connect(addr).await.unwrap();
50+
let io = TokioIo::new(client_stream);
4551

46-
let (mut sender, conn) =
47-
hyper::client::conn::http1::handshake(client_stream).await?;
52+
let (mut sender, conn) = hyper::client::conn::http1::handshake(io).await?;
4853
tokio::task::spawn(async move {
4954
if let Err(err) = conn.await {
5055
println!("Connection failed: {:?}", err);
@@ -56,10 +61,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
5661
});
5762

5863
tokio::task::spawn(async move {
59-
if let Err(err) = http1::Builder::new()
60-
.serve_connection(stream, service)
61-
.await
62-
{
64+
if let Err(err) = http1::Builder::new().serve_connection(io, service).await {
6365
println!("Failed to serve the connection: {:?}", err);
6466
}
6567
});

examples/hello.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ use hyper::service::service_fn;
1010
use hyper::{Request, Response};
1111
use tokio::net::TcpListener;
1212

13+
#[path = "../benches/support/mod.rs"]
14+
mod support;
15+
use support::TokioIo;
16+
1317
// An async function that consumes a request, does nothing with it and returns a
1418
// response.
1519
async fn hello(_: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
@@ -35,7 +39,10 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
3539
// has work to do. In this case, a connection arrives on the port we are listening on and
3640
// the task is woken up, at which point the task is then put back on a thread, and is
3741
// driven forward by the runtime, eventually yielding a TCP stream.
38-
let (stream, _) = listener.accept().await?;
42+
let (tcp, _) = listener.accept().await?;
43+
// Use an adapter to access something implementing `tokio::io` traits as if they implement
44+
// `hyper::rt` IO traits.
45+
let io = TokioIo::new(tcp);
3946

4047
// Spin up a new task in Tokio so we can continue to listen for new TCP connection on the
4148
// current task without waiting for the processing of the HTTP1 connection we just received
@@ -44,7 +51,7 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4451
// Handle the connection from the client using HTTP1 and pass any
4552
// HTTP requests received on that connection to the `hello` function
4653
if let Err(err) = http1::Builder::new()
47-
.serve_connection(stream, service_fn(hello))
54+
.serve_connection(io, service_fn(hello))
4855
.await
4956
{
5057
println!("Error serving connection: {:?}", err);

examples/http_proxy.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ use hyper::{Method, Request, Response};
1212

1313
use tokio::net::{TcpListener, TcpStream};
1414

15+
#[path = "../benches/support/mod.rs"]
16+
mod support;
17+
use support::TokioIo;
18+
1519
// To try this example:
1620
// 1. cargo run --example http_proxy
1721
// 2. config http_proxy in command line
@@ -28,12 +32,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
2832

2933
loop {
3034
let (stream, _) = listener.accept().await?;
35+
let io = TokioIo::new(stream);
3136

3237
tokio::task::spawn(async move {
3338
if let Err(err) = http1::Builder::new()
3439
.preserve_header_case(true)
3540
.title_case_headers(true)
36-
.serve_connection(stream, service_fn(proxy))
41+
.serve_connection(io, service_fn(proxy))
3742
.with_upgrades()
3843
.await
3944
{
@@ -88,11 +93,12 @@ async fn proxy(
8893
let addr = format!("{}:{}", host, port);
8994

9095
let stream = TcpStream::connect(addr).await.unwrap();
96+
let io = TokioIo::new(stream);
9197

9298
let (mut sender, conn) = Builder::new()
9399
.preserve_header_case(true)
94100
.title_case_headers(true)
95-
.handshake(stream)
101+
.handshake(io)
96102
.await?;
97103
tokio::task::spawn(async move {
98104
if let Err(err) = conn.await {
@@ -123,9 +129,10 @@ fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
123129

124130
// Create a TCP connection to host:port, build a tunnel between the connection and
125131
// the upgraded connection
126-
async fn tunnel(mut upgraded: Upgraded, addr: String) -> std::io::Result<()> {
132+
async fn tunnel(upgraded: Upgraded, addr: String) -> std::io::Result<()> {
127133
// Connect to remote server
128134
let mut server = TcpStream::connect(addr).await?;
135+
let mut upgraded = TokioIo::new(upgraded);
129136

130137
// Proxying data
131138
let (from_client, from_server) =

examples/multi_server.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ use hyper::service::service_fn;
1111
use hyper::{Request, Response};
1212
use tokio::net::TcpListener;
1313

14+
#[path = "../benches/support/mod.rs"]
15+
mod support;
16+
use support::TokioIo;
17+
1418
static INDEX1: &[u8] = b"The 1st service!";
1519
static INDEX2: &[u8] = b"The 2nd service!";
1620

@@ -33,10 +37,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
3337
let listener = TcpListener::bind(addr1).await.unwrap();
3438
loop {
3539
let (stream, _) = listener.accept().await.unwrap();
40+
let io = TokioIo::new(stream);
3641

3742
tokio::task::spawn(async move {
3843
if let Err(err) = http1::Builder::new()
39-
.serve_connection(stream, service_fn(index1))
44+
.serve_connection(io, service_fn(index1))
4045
.await
4146
{
4247
println!("Error serving connection: {:?}", err);
@@ -49,10 +54,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
4954
let listener = TcpListener::bind(addr2).await.unwrap();
5055
loop {
5156
let (stream, _) = listener.accept().await.unwrap();
57+
let io = TokioIo::new(stream);
5258

5359
tokio::task::spawn(async move {
5460
if let Err(err) = http1::Builder::new()
55-
.serve_connection(stream, service_fn(index2))
61+
.serve_connection(io, service_fn(index2))
5662
.await
5763
{
5864
println!("Error serving connection: {:?}", err);

0 commit comments

Comments
 (0)