Skip to content

refactor(client): replace futures-cpupool with tokio's thread pool #1514

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ include = [
[dependencies]
bytes = "0.4.4"
futures = "0.1.21"
futures-cpupool = { version = "0.1.6", optional = true }
http = "0.1.5"
httparse = "1.0"
h2 = "0.1.5"
Expand All @@ -37,6 +36,7 @@ tokio-io = "0.1"
tokio-reactor = { version = "0.1", optional = true }
tokio-tcp = { version = "0.1", optional = true }
tokio-timer = { version = "0.2", optional = true }
tokio-threadpool = { version = "0.1.3", optional = true }
want = "0.0.4"

[dev-dependencies]
Expand All @@ -53,13 +53,13 @@ default = [
"runtime",
]
runtime = [
"futures-cpupool",
"net2",
"tokio",
"tokio-executor",
"tokio-reactor",
"tokio-tcp",
"tokio-timer",
"tokio-threadpool",
]
nightly = []
__internal_flaky_tests = []
Expand Down
4 changes: 2 additions & 2 deletions benches/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fn get_one_at_a_time(b: &mut test::Bencher) {
let mut rt = Runtime::new().unwrap();
let addr = spawn_hello(&mut rt);

let connector = HttpConnector::new_with_handle(1, rt.reactor().clone());
let connector = HttpConnector::new_with_handle(rt.reactor().clone());
let client = hyper::Client::builder()
.executor(rt.executor())
.build::<_, Body>(connector);
Expand All @@ -46,7 +46,7 @@ fn post_one_at_a_time(b: &mut test::Bencher) {
let mut rt = Runtime::new().unwrap();
let addr = spawn_hello(&mut rt);

let connector = HttpConnector::new_with_handle(1, rt.reactor().clone());
let connector = HttpConnector::new_with_handle(rt.reactor().clone());
let client = hyper::Client::builder()
.executor(rt.executor())
.build::<_, Body>(connector);
Expand Down
90 changes: 17 additions & 73 deletions src/client/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,17 @@ mod http {
use std::io;
use std::mem;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;

use futures::{Async, Poll};
use futures::future::{Executor, ExecuteError};
use futures::sync::oneshot;
use futures_cpupool::{Builder as CpuPoolBuilder};
use http::uri::Scheme;
use net2::TcpBuilder;
use tokio_reactor::Handle;
use tokio_tcp::{TcpStream, ConnectFuture};
use tokio_threadpool::blocking;

use super::super::dns;

use self::http_connector::HttpConnectorBlockingTask;


fn connect(addr: &SocketAddr, local_addr: &Option<IpAddr>, handle: &Option<Handle>) -> io::Result<ConnectFuture> {
let builder = match addr {
Expand Down Expand Up @@ -183,7 +178,6 @@ mod http {
/// Performs DNS resolution in a thread pool, and then connects over TCP.
#[derive(Clone)]
pub struct HttpConnector {
executor: HttpConnectExecutor,
enforce_http: bool,
handle: Option<Handle>,
keep_alive_timeout: Option<Duration>,
Expand All @@ -193,34 +187,18 @@ mod http {

impl HttpConnector {
/// Construct a new HttpConnector.
///
/// Takes number of DNS worker threads.
#[inline]
pub fn new(threads: usize) -> HttpConnector {
HttpConnector::new_with_handle_opt(threads, None)
pub fn new() -> HttpConnector {
HttpConnector::new_with_handle_opt(None)
}

/// Construct a new HttpConnector with a specific Tokio handle.
pub fn new_with_handle(threads: usize, handle: Handle) -> HttpConnector {
HttpConnector::new_with_handle_opt(threads, Some(handle))
}

fn new_with_handle_opt(threads: usize, handle: Option<Handle>) -> HttpConnector {
let pool = CpuPoolBuilder::new()
.name_prefix("hyper-dns")
.pool_size(threads)
.create();
HttpConnector::new_with_executor(pool, handle)
pub fn new_with_handle(handle: Handle) -> HttpConnector {
HttpConnector::new_with_handle_opt(Some(handle))
}

/// Construct a new HttpConnector.
///
/// Takes an executor to run blocking tasks on.
pub fn new_with_executor<E: 'static>(executor: E, handle: Option<Handle>) -> HttpConnector
where E: Executor<HttpConnectorBlockingTask> + Send + Sync
{
fn new_with_handle_opt(handle: Option<Handle>) -> HttpConnector {
HttpConnector {
executor: HttpConnectExecutor(Arc::new(executor)),
enforce_http: true,
handle,
keep_alive_timeout: None,
Expand Down Expand Up @@ -305,7 +283,7 @@ mod http {
};

HttpConnecting {
state: State::Lazy(self.executor.clone(), host.into(), port, self.local_address),
state: State::Lazy(host.into(), port, self.local_address),
handle: self.handle.clone(),
keep_alive_timeout: self.keep_alive_timeout,
nodelay: self.nodelay,
Expand Down Expand Up @@ -355,8 +333,8 @@ mod http {
}

enum State {
Lazy(HttpConnectExecutor, String, u16, Option<IpAddr>),
Resolving(oneshot::SpawnHandle<dns::IpAddrs, io::Error>, Option<IpAddr>),
Lazy(String, u16, Option<IpAddr>),
Resolving(dns::Work, Option<IpAddr>),
Connecting(ConnectingTcp),
Error(Option<io::Error>),
}
Expand All @@ -369,7 +347,7 @@ mod http {
loop {
let state;
match self.state {
State::Lazy(ref executor, ref mut host, port, local_addr) => {
State::Lazy(ref mut host, port, local_addr) => {
// If the host is already an IP addr (v4 or v6),
// skip resolving the dns and start connecting right away.
if let Some(addrs) = dns::IpAddrs::try_parse(host, port) {
Expand All @@ -381,15 +359,15 @@ mod http {
} else {
let host = mem::replace(host, String::new());
let work = dns::Work::new(host, port);
state = State::Resolving(oneshot::spawn(work, executor), local_addr);
state = State::Resolving(work, local_addr);
}
},
State::Resolving(ref mut future, local_addr) => {
match try!(future.poll()) {
State::Resolving(ref work, local_addr) => {
match blocking(|| work.resolve()).expect("thread pool has shut down") {
Async::NotReady => return Ok(Async::NotReady),
Async::Ready(addrs) => {
state = State::Connecting(ConnectingTcp {
addrs: addrs,
addrs: addrs?,
local_addr: local_addr,
current: None,
})
Expand Down Expand Up @@ -455,40 +433,6 @@ mod http {
}
}

// Make this Future unnameable outside of this crate.
mod http_connector {
use super::*;
// Blocking task to be executed on a thread pool.
pub struct HttpConnectorBlockingTask {
pub(super) work: oneshot::Execute<dns::Work>
}

impl fmt::Debug for HttpConnectorBlockingTask {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("HttpConnectorBlockingTask")
}
}

impl Future for HttpConnectorBlockingTask {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<(), ()> {
self.work.poll()
}
}
}

#[derive(Clone)]
struct HttpConnectExecutor(Arc<Executor<HttpConnectorBlockingTask> + Send + Sync>);

impl Executor<oneshot::Execute<dns::Work>> for HttpConnectExecutor {
fn execute(&self, future: oneshot::Execute<dns::Work>) -> Result<(), ExecuteError<oneshot::Execute<dns::Work>>> {
self.0.execute(HttpConnectorBlockingTask { work: future })
.map_err(|err| ExecuteError::new(err.kind(), err.into_future().work))
}
}

#[cfg(test)]
mod tests {
use std::io;
Expand All @@ -501,7 +445,7 @@ mod http {
let dst = Destination {
uri,
};
let connector = HttpConnector::new(1);
let connector = HttpConnector::new();

assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
Expand All @@ -512,7 +456,7 @@ mod http {
let dst = Destination {
uri,
};
let connector = HttpConnector::new(1);
let connector = HttpConnector::new();

assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
Expand All @@ -524,7 +468,7 @@ mod http {
let dst = Destination {
uri,
};
let connector = HttpConnector::new(1);
let connector = HttpConnector::new();

assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput);
}
Expand Down
11 changes: 3 additions & 8 deletions src/client/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use std::net::{
};
use std::vec;

use ::futures::{Async, Future, Poll};

pub struct Work {
host: String,
port: u16
Expand All @@ -19,14 +17,11 @@ impl Work {
}
}

impl Future for Work {
type Item = IpAddrs;
type Error = io::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
impl Work {
pub fn resolve(&self) -> Result<IpAddrs, io::Error> {
debug!("resolving host={:?}, port={:?}", self.host, self.port);
(&*self.host, self.port).to_socket_addrs()
.map(|i| Async::Ready(IpAddrs { iter: i }))
.map(|i| IpAddrs { iter: i })
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ impl Builder {
B: Payload + Send,
B::Data: Send,
{
let mut connector = HttpConnector::new(4);
let mut connector = HttpConnector::new();
if self.keep_alive {
connector.set_keepalive(self.keep_alive_timeout);
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

extern crate bytes;
#[macro_use] extern crate futures;
#[cfg(feature = "runtime")] extern crate futures_cpupool;
extern crate h2;
extern crate http;
extern crate httparse;
Expand All @@ -32,6 +31,7 @@ extern crate time;
#[cfg(feature = "runtime")] extern crate tokio_reactor;
#[cfg(feature = "runtime")] extern crate tokio_tcp;
#[cfg(feature = "runtime")] extern crate tokio_timer;
#[cfg(feature = "runtime")] extern crate tokio_threadpool;
extern crate want;

#[cfg(all(test, feature = "nightly"))]
Expand Down
18 changes: 9 additions & 9 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ macro_rules! test {
let addr = server.local_addr().expect("local_addr");
let runtime = $runtime;

let connector = ::hyper::client::HttpConnector::new_with_handle(1, runtime.reactor().clone());
let connector = ::hyper::client::HttpConnector::new_with_handle(runtime.reactor().clone());
let client = Client::builder()
.set_host($set_host)
.http1_title_case_headers($title_case_headers)
Expand Down Expand Up @@ -737,7 +737,7 @@ mod dispatch_impl {
let (closes_tx, closes) = mpsc::channel(10);
let client = Client::builder()
.executor(runtime.executor())
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, runtime.reactor().clone()), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(runtime.reactor().clone()), closes_tx));

let (tx1, rx1) = oneshot::channel();

Expand Down Expand Up @@ -796,7 +796,7 @@ mod dispatch_impl {
let res = {
let client = Client::builder()
.executor(runtime.executor())
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(handle.clone()), closes_tx));

let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
Expand Down Expand Up @@ -849,7 +849,7 @@ mod dispatch_impl {

let client = Client::builder()
.executor(runtime.executor())
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(handle.clone()), closes_tx));

let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
Expand Down Expand Up @@ -913,7 +913,7 @@ mod dispatch_impl {
let res = {
let client = Client::builder()
.executor(runtime.executor())
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(handle.clone()), closes_tx));

let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
Expand Down Expand Up @@ -964,7 +964,7 @@ mod dispatch_impl {
let res = {
let client = Client::builder()
.executor(runtime.executor())
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(handle.clone()), closes_tx));

let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
Expand Down Expand Up @@ -1015,7 +1015,7 @@ mod dispatch_impl {
let client = Client::builder()
.keep_alive(false)
.executor(runtime.executor())
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(handle.clone()), closes_tx));

let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
Expand Down Expand Up @@ -1063,7 +1063,7 @@ mod dispatch_impl {

let client = Client::builder()
.executor(runtime.executor())
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx));
.build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(handle.clone()), closes_tx));

let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
Expand Down Expand Up @@ -1287,7 +1287,7 @@ mod dispatch_impl {

impl DebugConnector {
fn new(handle: &Handle) -> DebugConnector {
let http = HttpConnector::new_with_handle(1, handle.clone());
let http = HttpConnector::new_with_handle(handle.clone());
let (tx, _) = mpsc::channel(10);
DebugConnector::with_http_and_closes(http, tx)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/support/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ pub fn __run_test(cfg: __TestConfig) {
Version::HTTP_11
};

let connector = HttpConnector::new_with_handle(1, handle.clone());
let connector = HttpConnector::new_with_handle(handle.clone());
let client = Client::builder()
.keep_alive_timeout(Duration::from_secs(10))
.http2_only(cfg.client_version == 2)
Expand Down