Skip to content

Commit 165a4a6

Browse files
committed
feat(http1): Add higher-level HTTP upgrade support to Client and Server
- Adds `Body::on_upgrade()` that returns an `OnUpgrade` future. - Adds `hyper::upgrade` module containing types for dealing with upgrades. - Adds `server::conn::Connection::with_upgrades()` method to enable these upgrades when using lower-level API (because of a missing `Send` bound on the transport generic). - Client connections are automatically enabled. - Optimizes request parsing, to make up for extra work to look for upgrade requests. - Returns a smaller `DecodedLength` type instead of the fatter `Decoder`, which should also allow a couple fewer branches. - Removes the `Decode::Ignore` wrapper enum, and instead ignoring 1xx responses is handled directly in the response parsing code.
1 parent 1c3fbfd commit 165a4a6

File tree

26 files changed

+1271
-574
lines changed

26 files changed

+1271
-574
lines changed

Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ iovec = "0.1"
2929
log = "0.4"
3030
net2 = { version = "0.2.32", optional = true }
3131
time = "0.1"
32-
tokio = { version = "0.1.5", optional = true }
32+
tokio = { version = "0.1.7", optional = true }
3333
tokio-executor = { version = "0.1.0", optional = true }
3434
tokio-io = "0.1"
3535
tokio-reactor = { version = "0.1", optional = true }
@@ -101,6 +101,12 @@ name = "send_file"
101101
path = "examples/send_file.rs"
102102
required-features = ["runtime"]
103103

104+
[[example]]
105+
name = "upgrades"
106+
path = "examples/upgrades.rs"
107+
required-features = ["runtime"]
108+
109+
104110
[[example]]
105111
name = "web_api"
106112
path = "examples/web_api.rs"

examples/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,6 @@ Run examples with `cargo run --example example_name`.
1616

1717
* [`send_file`](send_file.rs) - A server that sends back content of files using tokio_fs to read the files asynchronously.
1818

19+
* [`upgrades`](upgrades.rs) - A server and client demonstrating how to do HTTP upgrades (such as WebSockets or `CONNECT` tunneling).
20+
1921
* [`web_api`](web_api.rs) - A server consisting in a service that returns incoming POST request's content in the response in uppercase and a service that call that call the first service and includes the first service response in its own response.

examples/upgrades.rs

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// Note: `hyper::upgrade` docs link to this upgrade.
2+
extern crate futures;
3+
extern crate hyper;
4+
extern crate tokio;
5+
6+
use std::str;
7+
8+
use futures::sync::oneshot;
9+
10+
use hyper::{Body, Client, Request, Response, Server, StatusCode};
11+
use hyper::header::{UPGRADE, HeaderValue};
12+
use hyper::rt::{self, Future};
13+
use hyper::service::service_fn_ok;
14+
15+
/// Our server HTTP handler to initiate HTTP upgrades.
16+
fn server_upgrade(req: Request<Body>) -> Response<Body> {
17+
let mut res = Response::new(Body::empty());
18+
19+
// Send a 400 to any request that doesn't have
20+
// an `Upgrade` header.
21+
if !req.headers().contains_key(UPGRADE) {
22+
*res.status_mut() = StatusCode::BAD_REQUEST;
23+
return res;
24+
}
25+
26+
// Setup a future that will eventually receive the upgraded
27+
// connection and talk a new protocol, and spawn the future
28+
// into the runtime.
29+
//
30+
// Note: This can't possibly be fulfilled until the 101 response
31+
// is returned below, so it's better to spawn this future instead
32+
// waiting for it to complete to then return a response.
33+
let on_upgrade = req
34+
.into_body()
35+
.on_upgrade()
36+
.map_err(|err| eprintln!("upgrade error: {}", err))
37+
.and_then(|upgraded| {
38+
// We have an upgraded connection that we can read and
39+
// write on directly.
40+
//
41+
// Since we completely control this example, we know exactly
42+
// how many bytes the client will write, so just read exact...
43+
tokio::io::read_exact(upgraded, vec![0; 7])
44+
.and_then(|(upgraded, vec)| {
45+
println!("server[foobar] recv: {:?}", str::from_utf8(&vec));
46+
47+
// And now write back the server 'foobar' protocol's
48+
// response...
49+
tokio::io::write_all(upgraded, b"bar=foo")
50+
})
51+
.map(|_| println!("server[foobar] sent"))
52+
.map_err(|e| eprintln!("server foobar io error: {}", e))
53+
});
54+
55+
rt::spawn(on_upgrade);
56+
57+
58+
// Now return a 101 Response saying we agree to the upgrade to some
59+
// made-up 'foobar' protocol.
60+
*res.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
61+
res.headers_mut().insert(UPGRADE, HeaderValue::from_static("foobar"));
62+
res
63+
}
64+
65+
fn main() {
66+
// For this example, we just make a server and our own client to talk to
67+
// it, so the exact port isn't important. Instead, let the OS give us an
68+
// unused port.
69+
let addr = ([127, 0, 0, 1], 0).into();
70+
71+
let server = Server::bind(&addr)
72+
.serve(|| service_fn_ok(server_upgrade));
73+
74+
// We need the assigned address for the client to send it messages.
75+
let addr = server.local_addr();
76+
77+
78+
// For this example, a oneshot is used to signal that after 1 request,
79+
// the server should be shutdown.
80+
let (tx, rx) = oneshot::channel();
81+
82+
let server = server
83+
.map_err(|e| eprintln!("server error: {}", e))
84+
.select2(rx)
85+
.then(|_| Ok(()));
86+
87+
rt::run(rt::lazy(move || {
88+
rt::spawn(server);
89+
90+
let req = Request::builder()
91+
.uri(format!("http://{}/", addr))
92+
.header(UPGRADE, "foobar")
93+
.body(Body::empty())
94+
.unwrap();
95+
96+
Client::new()
97+
.request(req)
98+
.and_then(|res| {
99+
if res.status() != StatusCode::SWITCHING_PROTOCOLS {
100+
panic!("Our server didn't upgrade: {}", res.status());
101+
}
102+
103+
res
104+
.into_body()
105+
.on_upgrade()
106+
})
107+
.map_err(|e| eprintln!("client error: {}", e))
108+
.and_then(|upgraded| {
109+
// We've gotten an upgraded connection that we can read
110+
// and write directly on. Let's start out 'foobar' protocol.
111+
tokio::io::write_all(upgraded, b"foo=bar")
112+
.and_then(|(upgraded, _)| {
113+
println!("client[foobar] sent");
114+
tokio::io::read_to_end(upgraded, Vec::new())
115+
})
116+
.map(|(_upgraded, vec)| {
117+
println!("client[foobar] recv: {:?}", str::from_utf8(&vec));
118+
119+
120+
// Complete the oneshot so that the server stops
121+
// listening and the process can close down.
122+
let _ = tx.send(());
123+
})
124+
.map_err(|e| eprintln!("client foobar io error: {}", e))
125+
})
126+
}));
127+
}

src/body/body.rs

Lines changed: 56 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use http::HeaderMap;
1010
use common::Never;
1111
use super::{Chunk, Payload};
1212
use super::internal::{FullDataArg, FullDataRet};
13+
use upgrade::OnUpgrade;
1314

1415
type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;
1516

@@ -21,15 +22,9 @@ type BodySender = mpsc::Sender<Result<Chunk, ::Error>>;
2122
#[must_use = "streams do nothing unless polled"]
2223
pub struct Body {
2324
kind: Kind,
24-
/// Allow the client to pass a future to delay the `Body` from returning
25-
/// EOF. This allows the `Client` to try to put the idle connection
26-
/// back into the pool before the body is "finished".
27-
///
28-
/// The reason for this is so that creating a new request after finishing
29-
/// streaming the body of a response could sometimes result in creating
30-
/// a brand new connection, since the pool didn't know about the idle
31-
/// connection yet.
32-
delayed_eof: Option<DelayEof>,
25+
/// Keep the extra bits in an `Option<Box<Extra>>`, so that
26+
/// Body stays small in the common case (no extras needed).
27+
extra: Option<Box<Extra>>,
3328
}
3429

3530
enum Kind {
@@ -43,6 +38,19 @@ enum Kind {
4338
Wrapped(Box<Stream<Item=Chunk, Error=Box<::std::error::Error + Send + Sync>> + Send>),
4439
}
4540

41+
struct Extra {
42+
/// Allow the client to pass a future to delay the `Body` from returning
43+
/// EOF. This allows the `Client` to try to put the idle connection
44+
/// back into the pool before the body is "finished".
45+
///
46+
/// The reason for this is so that creating a new request after finishing
47+
/// streaming the body of a response could sometimes result in creating
48+
/// a brand new connection, since the pool didn't know about the idle
49+
/// connection yet.
50+
delayed_eof: Option<DelayEof>,
51+
on_upgrade: OnUpgrade,
52+
}
53+
4654
type DelayEofUntil = oneshot::Receiver<Never>;
4755

4856
enum DelayEof {
@@ -89,7 +97,6 @@ impl Body {
8997
Self::new_channel(None)
9098
}
9199

92-
#[inline]
93100
pub(crate) fn new_channel(content_length: Option<u64>) -> (Sender, Body) {
94101
let (tx, rx) = mpsc::channel(0);
95102
let (abort_tx, abort_rx) = oneshot::channel();
@@ -139,34 +146,67 @@ impl Body {
139146
Body::new(Kind::Wrapped(Box::new(mapped)))
140147
}
141148

149+
/// Converts this `Body` into a `Future` of a pending HTTP upgrade.
150+
///
151+
/// See [the `upgrade` module](::upgrade) for more.
152+
pub fn on_upgrade(self) -> OnUpgrade {
153+
self
154+
.extra
155+
.map(|ex| ex.on_upgrade)
156+
.unwrap_or_else(OnUpgrade::none)
157+
}
158+
142159
fn new(kind: Kind) -> Body {
143160
Body {
144161
kind: kind,
145-
delayed_eof: None,
162+
extra: None,
146163
}
147164
}
148165

149166
pub(crate) fn h2(recv: h2::RecvStream) -> Self {
150167
Body::new(Kind::H2(recv))
151168
}
152169

170+
pub(crate) fn set_on_upgrade(&mut self, upgrade: OnUpgrade) {
171+
debug_assert!(!upgrade.is_none(), "set_on_upgrade with empty upgrade");
172+
let extra = self.extra_mut();
173+
debug_assert!(extra.on_upgrade.is_none(), "set_on_upgrade twice");
174+
extra.on_upgrade = upgrade;
175+
}
176+
153177
pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
154-
self.delayed_eof = Some(DelayEof::NotEof(fut));
178+
self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut));
179+
}
180+
181+
fn take_delayed_eof(&mut self) -> Option<DelayEof> {
182+
self
183+
.extra
184+
.as_mut()
185+
.and_then(|extra| extra.delayed_eof.take())
186+
}
187+
188+
fn extra_mut(&mut self) -> &mut Extra {
189+
self
190+
.extra
191+
.get_or_insert_with(|| Box::new(Extra {
192+
delayed_eof: None,
193+
on_upgrade: OnUpgrade::none(),
194+
}))
155195
}
156196

157197
fn poll_eof(&mut self) -> Poll<Option<Chunk>, ::Error> {
158-
match self.delayed_eof.take() {
198+
match self.take_delayed_eof() {
159199
Some(DelayEof::NotEof(mut delay)) => {
160200
match self.poll_inner() {
161201
ok @ Ok(Async::Ready(Some(..))) |
162202
ok @ Ok(Async::NotReady) => {
163-
self.delayed_eof = Some(DelayEof::NotEof(delay));
203+
self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay));
164204
ok
165205
},
166206
Ok(Async::Ready(None)) => match delay.poll() {
167207
Ok(Async::Ready(never)) => match never {},
168208
Ok(Async::NotReady) => {
169-
self.delayed_eof = Some(DelayEof::Eof(delay));
209+
self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
170210
Ok(Async::NotReady)
171211
},
172212
Err(_done) => {
@@ -180,7 +220,7 @@ impl Body {
180220
match delay.poll() {
181221
Ok(Async::Ready(never)) => match never {},
182222
Ok(Async::NotReady) => {
183-
self.delayed_eof = Some(DelayEof::Eof(delay));
223+
self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
184224
Ok(Async::NotReady)
185225
},
186226
Err(_done) => {

0 commit comments

Comments
 (0)