Skip to content

Update to hyper v0.12 #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 9, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[package]
name = "hyper-timeout"
version = "0.1.0"
version = "0.2.0"
authors = ["Herman J. Radtke III <[email protected]>"]
description = "A timeout aware connector to be used with the hyper Client."
description = "A connect, read and write timeout aware connector to be used with hyper Client."
license = "MIT/Apache-2.0"
documentation = "https://github.com/hjr3/hyper-timeout"
homepage = "https://github.com/hjr3/hyper-timeout"
@@ -13,12 +13,12 @@ travis-ci = { repository = "https://github.com/hjr3/hyper-timeout", branch = "ma

[dependencies]
futures = "0.1"
hyper = "0.11"
tokio-core = "0.1"
hyper = "0.12"
tokio = "0.1"
tokio-io = "0.1"
tokio-io-timeout = "0.3"
tokio-reactor = "0.1"
tokio-service = "0.1"
tokio-io-timeout = "0.1"

[dev-dependencies]
native-tls = "0.1"
hyper-tls = "0.1"
hyper-tls = "0.3"
15 changes: 10 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
@@ -3,8 +3,7 @@

# hyper-timeout

A timeout aware connector to be used with hyper `Client`.

A connect, read and write timeout aware connector to be used with hyper `Client`.

## Problem

@@ -14,15 +13,21 @@ At the time this crate was created, hyper does not support timeouts. There is a

There is a `TimeoutConnector` that implements the `hyper::Connect` trait. This connector wraps around `HttpConnector` or `HttpsConnector` values and provides timeouts.

**Note:** Because of the way `tokio_proto::ClientProto` works, a read or write timeout will return a _broken pipe_ error.
**Note:** In hyper 0.11, a read or write timeout will return a _broken pipe_ error because of the way `tokio_proto::ClientProto` works

## Usage

First, add this to your `Cargo.toml`:
Hyper version compatibility:

* The `0.1` release supports hyper 0.11.
* The `0.2` release supports hyper 0.12.
* The `master` branch will track on going developer for hyper 0.13.

First, (assuming you are using hyper 0.12) add this to your `Cargo.toml`:

```toml
[dependencies]
hyper-timeout = "0.1"
hyper-timeout = "0.2"
```

Next, add this to your crate:
55 changes: 31 additions & 24 deletions examples/client.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
extern crate futures;
extern crate tokio_core;
extern crate hyper;
extern crate hyper_tls;
extern crate hyper_timeout;

use std::env;
use std::io::{self, Write};
use std::time::Duration;

use futures::Future;
use futures::stream::Stream;

use hyper::Client;
use hyper::{rt, Client};

//use hyper::client::HttpConnector;
use hyper_tls::HttpsConnector;
@@ -23,32 +23,39 @@ fn main() {
Some(url) => url,
None => {
println!("Usage: client <url>");
println!("Example: client https://example.com");
return;
}
};

let url = url.parse::<hyper::Uri>().unwrap();

let mut core = tokio_core::reactor::Core::new().unwrap();
let handle = core.handle();

// This example uses `HttpsConnector`, but you can also use the default hyper `HttpConnector`
//let connector = HttpConnector::new(4, &handle);
let connector = HttpsConnector::new(4, &handle).unwrap();
let mut tm = TimeoutConnector::new(connector, &handle);
tm.set_connect_timeout(Some(Duration::from_secs(5)));
tm.set_read_timeout(Some(Duration::from_secs(5)));
tm.set_write_timeout(Some(Duration::from_secs(5)));
let client = Client::configure().connector(tm).build(&handle);

let get = client.get(url).and_then(|res| {
println!("Response: {}", res.status());
println!("Headers: \n{}", res.headers());

res.body().concat2()
});

let got = core.run(get).unwrap();
let output = String::from_utf8_lossy(&got);
println!("{}", output);
rt::run(rt::lazy(|| {
// This example uses `HttpsConnector`, but you can also use hyper `HttpConnector`
//let connector = HttpConnector::new(1);
let https = HttpsConnector::new(1).unwrap();
let mut connector = TimeoutConnector::new(https);
connector.set_connect_timeout(Some(Duration::from_secs(5)));
connector.set_read_timeout(Some(Duration::from_secs(5)));
connector.set_write_timeout(Some(Duration::from_secs(5)));
let client = Client::builder().build::<_, hyper::Body>(connector);

client.get(url).and_then(|res| {
println!("Response: {}", res.status());

res
.into_body()
// Body is a stream, so as each chunk arrives...
.for_each(|chunk| {
io::stdout()
.write_all(&chunk)
.map_err(|e| {
panic!("example expects stdout is open, error={}", e)
})
})
})
.map_err(|err| {
println!("Error: {}", err);
})
}));
}
170 changes: 100 additions & 70 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
extern crate futures;
extern crate tokio_core;
extern crate tokio;
extern crate tokio_io;
extern crate tokio_service;
extern crate tokio_io_timeout;
@@ -8,22 +8,18 @@ extern crate hyper;
use std::time::Duration;
use std::io;

use futures::future::{Either, Future};
use futures::Future;

use tokio_core::reactor::{Handle, Timeout};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio_service::Service;
use tokio::timer::Timeout;
use tokio_io_timeout::TimeoutStream;

use hyper::client::Connect;
use hyper::client::connect::{Connect, Connected, Destination};

/// A connector that enforces as connection timeout
#[derive(Debug)]
pub struct TimeoutConnector<T> {
/// A connector implementing the `Connect` trait
connector: T,
/// Handle to be used to set the timeout within tokio's core
handle: Handle,
/// Amount of time to wait connecting
connect_timeout: Option<Duration>,
/// Amount of time to wait reading response
@@ -34,16 +30,56 @@ pub struct TimeoutConnector<T> {

impl<T: Connect> TimeoutConnector<T> {
/// Construct a new TimeoutConnector with a given connector implementing the `Connect` trait
pub fn new(connector: T, handle: &Handle) -> Self {
pub fn new(connector: T) -> Self {
TimeoutConnector {
connector: connector,
handle: handle.clone(),
connect_timeout: None,
read_timeout: None,
write_timeout: None,
}
}
}

impl<T: Connect> Connect for TimeoutConnector<T>
where
T: Connect<Error = io::Error> + 'static,
T::Future: 'static,
{
type Transport = TimeoutStream<T::Transport>;
type Error = T::Error;
type Future = Box<Future<Item = (Self::Transport, Connected), Error = Self::Error> + Send>;

fn connect(&self, dst: Destination) -> Self::Future {

let read_timeout = self.read_timeout.clone();
let write_timeout = self.write_timeout.clone();
let connecting = self.connector.connect(dst);

if self.connect_timeout.is_none() {
return Box::new(connecting.map(move |(io, c)| {
let mut tm = TimeoutStream::new(io);
tm.set_read_timeout(read_timeout);
tm.set_write_timeout(write_timeout);
(tm, c)
}));
}

let connect_timeout = self.connect_timeout.expect("Connect timeout should be set");
let timeout = Timeout::new(connecting, connect_timeout);

Box::new(timeout.then(move |res| match res {
Ok((io, c)) => {
let mut tm = TimeoutStream::new(io);
tm.set_read_timeout(read_timeout);
tm.set_write_timeout(write_timeout);
Ok((tm, c))
}
Err(e) => Err(io::Error::new(io::ErrorKind::TimedOut, e)),
}))
}
}

impl<T> TimeoutConnector<T> {
/// Set the timeout for connecting to a URL.
///
/// Default is no timeout.
@@ -69,76 +105,70 @@ impl<T: Connect> TimeoutConnector<T> {
}
}

impl<T> Service for TimeoutConnector<T>
where
T: Service<Error = io::Error> + 'static,
T::Response: AsyncRead + AsyncWrite,
T::Future: Future<Error = io::Error>,
{
type Request = T::Request;
type Response = TimeoutStream<T::Response>;
type Error = T::Error;
type Future = Box<Future<Item = Self::Response, Error = Self::Error>>;

fn call(&self, req: Self::Request) -> Self::Future {
let handle = self.handle.clone();
let read_timeout = self.read_timeout.clone();
let write_timeout = self.write_timeout.clone();
let connecting = self.connector.call(req);

if self.connect_timeout.is_none() {
return Box::new(connecting.map(move |io| {
let mut tm = TimeoutStream::new(io, &handle);
tm.set_read_timeout(read_timeout);
tm.set_write_timeout(write_timeout);
tm
}));
}

let connect_timeout = self.connect_timeout.expect("Connect timeout should be set");
let timeout = Timeout::new(connect_timeout, &self.handle).unwrap();

Box::new(connecting.select2(timeout).then(move |res| match res {
Ok(Either::A((io, _))) => {
let mut tm = TimeoutStream::new(io, &handle);
tm.set_read_timeout(read_timeout);
tm.set_write_timeout(write_timeout);
Ok(tm)
}
Ok(Either::B((_, _))) => {
Err(io::Error::new(
io::ErrorKind::TimedOut,
"Client timed out while connecting",
))
}
Err(Either::A((e, _))) => Err(e),
Err(Either::B((e, _))) => Err(e),
}))
}
}

#[cfg(test)]
mod tests {
use std::error::Error;
use std::io;
use std::time::Duration;
use tokio_core::reactor::Core;
use hyper::client::{Connect, HttpConnector};
use futures::future;
use tokio::runtime::current_thread::Runtime;
use hyper::Client;
use hyper::client::HttpConnector;
use super::TimeoutConnector;

#[test]
fn test_timeout_connector() {
let mut core = Core::new().unwrap();
// 10.255.255.1 is a not a routable IP address
let url = "http://10.255.255.1".parse().unwrap();
let mut connector =
TimeoutConnector::new(HttpConnector::new(1, &core.handle()), &core.handle());
connector.set_connect_timeout(Some(Duration::from_millis(1)));

match core.run(connector.connect(url)) {
let mut rt = Runtime::new().unwrap();
let res = rt.block_on(future::lazy(|| {
// 10.255.255.1 is a not a routable IP address
let url = "http://10.255.255.1".parse().unwrap();

let http = HttpConnector::new(1);
let mut connector = TimeoutConnector::new(http);
connector.set_connect_timeout(Some(Duration::from_millis(1)));

let client = Client::builder().build::<_, hyper::Body>(connector);

client.get(url)
}));

match res {
Ok(_) => panic!("Expected a timeout"),
Err(e) => {
if let Some(io_e) = e.source().unwrap().downcast_ref::<io::Error>() {
assert_eq!(io_e.kind(), io::ErrorKind::TimedOut);
} else {
panic!("Expected timeout error");
}
}
}
}

#[test]
fn test_read_timeout() {
let mut rt = Runtime::new().unwrap();
let res = rt.block_on(future::lazy(|| {
let url = "http://example.com".parse().unwrap();

let http = HttpConnector::new(1);
let mut connector = TimeoutConnector::new(http);
// A 1 ms read timeout should be so short that we trigger a timeout error
connector.set_read_timeout(Some(Duration::from_millis(1)));

let client = Client::builder().build::<_, hyper::Body>(connector);

client.get(url)
}));

match res {
Ok(_) => panic!("Expected a timeout"),
Err(e) => {
assert_eq!(e.kind(), io::ErrorKind::TimedOut);
if let Some(io_e) = e.source().unwrap().downcast_ref::<io::Error>() {
assert_eq!(io_e.kind(), io::ErrorKind::TimedOut);
} else {
panic!("Expected timeout error");
}
}
_ => panic!("Expected timeout error"),
}
}
}