From f8a602ee83ed0876d4ece7e73cbb859600066790 Mon Sep 17 00:00:00 2001 From: Ran Benita Date: Sun, 13 May 2018 22:39:00 +0300 Subject: [PATCH] refactor(client): replace futures-cpupool with tokio's thread pool The tokio thread pool is already transitively used by the tokio runtime, so by using it for the DNS as well, the futures-cpupool dependency is removed without adding a new one. The code uses tokio-threadpool's `blocking` function which makes the code very straightforward. Tokio's thread pool is supposedly faster than futures-cpupool's one. According to its README, "Why not futures-cpupool? It's 10x slower.". The user of the library can configure the maximum number of threads for executing blocking tasks as part of configuring tokio's runtime, which also holds for any other blocking code besides hyper (e.g. blocking file operations using tokio-fs ). Hyper no longer needs to concern itself with that. BREAKING CHANGE: The `threads` argument is removed from `hyper::client::HttpConnector::new()` and `new_with_handle()`. To set the maximum amount of threads to use for DNS resolving, configure the `max_blocking` parameter of the tokio runtime instead. The function `hyper::client::HttpConnector::new_with_executor()` is removed. The thread pool of the tokio runtime is now always used. Use `new()` or `new_with_handle()` instead. --- Cargo.toml | 4 +- benches/end_to_end.rs | 4 +- src/client/connect.rs | 90 ++++++++----------------------------------- src/client/dns.rs | 11 ++---- src/client/mod.rs | 2 +- src/lib.rs | 2 +- tests/client.rs | 18 ++++----- tests/support/mod.rs | 2 +- 8 files changed, 36 insertions(+), 97 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index cb16c36c05..38b6695107 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,6 @@ include = [ [dependencies] bytes = "0.4.4" futures = "0.1.21" -futures-cpupool = { version = "0.1.6", optional = true } http = "0.1.5" httparse = "1.0" h2 = "0.1.5" @@ -37,6 +36,7 @@ tokio-io = "0.1" tokio-reactor = { version = "0.1", optional = true } tokio-tcp = { version = "0.1", optional = true } tokio-timer = { version = "0.2", optional = true } +tokio-threadpool = { version = "0.1.3", optional = true } want = "0.0.4" [dev-dependencies] @@ -53,13 +53,13 @@ default = [ "runtime", ] runtime = [ - "futures-cpupool", "net2", "tokio", "tokio-executor", "tokio-reactor", "tokio-tcp", "tokio-timer", + "tokio-threadpool", ] nightly = [] __internal_flaky_tests = [] diff --git a/benches/end_to_end.rs b/benches/end_to_end.rs index f62ce86a70..ee3930339b 100644 --- a/benches/end_to_end.rs +++ b/benches/end_to_end.rs @@ -22,7 +22,7 @@ fn get_one_at_a_time(b: &mut test::Bencher) { let mut rt = Runtime::new().unwrap(); let addr = spawn_hello(&mut rt); - let connector = HttpConnector::new_with_handle(1, rt.reactor().clone()); + let connector = HttpConnector::new_with_handle(rt.reactor().clone()); let client = hyper::Client::builder() .executor(rt.executor()) .build::<_, Body>(connector); @@ -46,7 +46,7 @@ fn post_one_at_a_time(b: &mut test::Bencher) { let mut rt = Runtime::new().unwrap(); let addr = spawn_hello(&mut rt); - let connector = HttpConnector::new_with_handle(1, rt.reactor().clone()); + let connector = HttpConnector::new_with_handle(rt.reactor().clone()); let client = hyper::Client::builder() .executor(rt.executor()) .build::<_, Body>(connector); diff --git a/src/client/connect.rs b/src/client/connect.rs index 1e7b8cf3e5..628c985aff 100644 --- a/src/client/connect.rs +++ b/src/client/connect.rs @@ -130,22 +130,17 @@ mod http { use std::io; use std::mem; use std::net::{IpAddr, SocketAddr}; - use std::sync::Arc; use std::time::Duration; use futures::{Async, Poll}; - use futures::future::{Executor, ExecuteError}; - use futures::sync::oneshot; - use futures_cpupool::{Builder as CpuPoolBuilder}; use http::uri::Scheme; use net2::TcpBuilder; use tokio_reactor::Handle; use tokio_tcp::{TcpStream, ConnectFuture}; + use tokio_threadpool::blocking; use super::super::dns; - use self::http_connector::HttpConnectorBlockingTask; - fn connect(addr: &SocketAddr, local_addr: &Option, handle: &Option) -> io::Result { let builder = match addr { @@ -183,7 +178,6 @@ mod http { /// Performs DNS resolution in a thread pool, and then connects over TCP. #[derive(Clone)] pub struct HttpConnector { - executor: HttpConnectExecutor, enforce_http: bool, handle: Option, keep_alive_timeout: Option, @@ -193,34 +187,18 @@ mod http { impl HttpConnector { /// Construct a new HttpConnector. - /// - /// Takes number of DNS worker threads. #[inline] - pub fn new(threads: usize) -> HttpConnector { - HttpConnector::new_with_handle_opt(threads, None) + pub fn new() -> HttpConnector { + HttpConnector::new_with_handle_opt(None) } /// Construct a new HttpConnector with a specific Tokio handle. - pub fn new_with_handle(threads: usize, handle: Handle) -> HttpConnector { - HttpConnector::new_with_handle_opt(threads, Some(handle)) - } - - fn new_with_handle_opt(threads: usize, handle: Option) -> HttpConnector { - let pool = CpuPoolBuilder::new() - .name_prefix("hyper-dns") - .pool_size(threads) - .create(); - HttpConnector::new_with_executor(pool, handle) + pub fn new_with_handle(handle: Handle) -> HttpConnector { + HttpConnector::new_with_handle_opt(Some(handle)) } - /// Construct a new HttpConnector. - /// - /// Takes an executor to run blocking tasks on. - pub fn new_with_executor(executor: E, handle: Option) -> HttpConnector - where E: Executor + Send + Sync - { + fn new_with_handle_opt(handle: Option) -> HttpConnector { HttpConnector { - executor: HttpConnectExecutor(Arc::new(executor)), enforce_http: true, handle, keep_alive_timeout: None, @@ -305,7 +283,7 @@ mod http { }; HttpConnecting { - state: State::Lazy(self.executor.clone(), host.into(), port, self.local_address), + state: State::Lazy(host.into(), port, self.local_address), handle: self.handle.clone(), keep_alive_timeout: self.keep_alive_timeout, nodelay: self.nodelay, @@ -355,8 +333,8 @@ mod http { } enum State { - Lazy(HttpConnectExecutor, String, u16, Option), - Resolving(oneshot::SpawnHandle, Option), + Lazy(String, u16, Option), + Resolving(dns::Work, Option), Connecting(ConnectingTcp), Error(Option), } @@ -369,7 +347,7 @@ mod http { loop { let state; match self.state { - State::Lazy(ref executor, ref mut host, port, local_addr) => { + State::Lazy(ref mut host, port, local_addr) => { // If the host is already an IP addr (v4 or v6), // skip resolving the dns and start connecting right away. if let Some(addrs) = dns::IpAddrs::try_parse(host, port) { @@ -381,15 +359,15 @@ mod http { } else { let host = mem::replace(host, String::new()); let work = dns::Work::new(host, port); - state = State::Resolving(oneshot::spawn(work, executor), local_addr); + state = State::Resolving(work, local_addr); } }, - State::Resolving(ref mut future, local_addr) => { - match try!(future.poll()) { + State::Resolving(ref work, local_addr) => { + match blocking(|| work.resolve()).expect("thread pool has shut down") { Async::NotReady => return Ok(Async::NotReady), Async::Ready(addrs) => { state = State::Connecting(ConnectingTcp { - addrs: addrs, + addrs: addrs?, local_addr: local_addr, current: None, }) @@ -455,40 +433,6 @@ mod http { } } - // Make this Future unnameable outside of this crate. - mod http_connector { - use super::*; - // Blocking task to be executed on a thread pool. - pub struct HttpConnectorBlockingTask { - pub(super) work: oneshot::Execute - } - - impl fmt::Debug for HttpConnectorBlockingTask { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.pad("HttpConnectorBlockingTask") - } - } - - impl Future for HttpConnectorBlockingTask { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<(), ()> { - self.work.poll() - } - } - } - - #[derive(Clone)] - struct HttpConnectExecutor(Arc + Send + Sync>); - - impl Executor> for HttpConnectExecutor { - fn execute(&self, future: oneshot::Execute) -> Result<(), ExecuteError>> { - self.0.execute(HttpConnectorBlockingTask { work: future }) - .map_err(|err| ExecuteError::new(err.kind(), err.into_future().work)) - } - } - #[cfg(test)] mod tests { use std::io; @@ -501,7 +445,7 @@ mod http { let dst = Destination { uri, }; - let connector = HttpConnector::new(1); + let connector = HttpConnector::new(); assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput); } @@ -512,7 +456,7 @@ mod http { let dst = Destination { uri, }; - let connector = HttpConnector::new(1); + let connector = HttpConnector::new(); assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput); } @@ -524,7 +468,7 @@ mod http { let dst = Destination { uri, }; - let connector = HttpConnector::new(1); + let connector = HttpConnector::new(); assert_eq!(connector.connect(dst).wait().unwrap_err().kind(), io::ErrorKind::InvalidInput); } diff --git a/src/client/dns.rs b/src/client/dns.rs index 182481d343..4593912520 100644 --- a/src/client/dns.rs +++ b/src/client/dns.rs @@ -6,8 +6,6 @@ use std::net::{ }; use std::vec; -use ::futures::{Async, Future, Poll}; - pub struct Work { host: String, port: u16 @@ -19,14 +17,11 @@ impl Work { } } -impl Future for Work { - type Item = IpAddrs; - type Error = io::Error; - - fn poll(&mut self) -> Poll { +impl Work { + pub fn resolve(&self) -> Result { debug!("resolving host={:?}, port={:?}", self.host, self.port); (&*self.host, self.port).to_socket_addrs() - .map(|i| Async::Ready(IpAddrs { iter: i })) + .map(|i| IpAddrs { iter: i }) } } diff --git a/src/client/mod.rs b/src/client/mod.rs index 8f8ee03bf5..f24276df7a 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -752,7 +752,7 @@ impl Builder { B: Payload + Send, B::Data: Send, { - let mut connector = HttpConnector::new(4); + let mut connector = HttpConnector::new(); if self.keep_alive { connector.set_keepalive(self.keep_alive_timeout); } diff --git a/src/lib.rs b/src/lib.rs index 6ab96494ba..92604259cc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,7 +18,6 @@ extern crate bytes; #[macro_use] extern crate futures; -#[cfg(feature = "runtime")] extern crate futures_cpupool; extern crate h2; extern crate http; extern crate httparse; @@ -32,6 +31,7 @@ extern crate time; #[cfg(feature = "runtime")] extern crate tokio_reactor; #[cfg(feature = "runtime")] extern crate tokio_tcp; #[cfg(feature = "runtime")] extern crate tokio_timer; +#[cfg(feature = "runtime")] extern crate tokio_threadpool; extern crate want; #[cfg(all(test, feature = "nightly"))] diff --git a/tests/client.rs b/tests/client.rs index 3436ca7eee..0f0dffb25c 100644 --- a/tests/client.rs +++ b/tests/client.rs @@ -231,7 +231,7 @@ macro_rules! test { let addr = server.local_addr().expect("local_addr"); let runtime = $runtime; - let connector = ::hyper::client::HttpConnector::new_with_handle(1, runtime.reactor().clone()); + let connector = ::hyper::client::HttpConnector::new_with_handle(runtime.reactor().clone()); let client = Client::builder() .set_host($set_host) .http1_title_case_headers($title_case_headers) @@ -737,7 +737,7 @@ mod dispatch_impl { let (closes_tx, closes) = mpsc::channel(10); let client = Client::builder() .executor(runtime.executor()) - .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, runtime.reactor().clone()), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(runtime.reactor().clone()), closes_tx)); let (tx1, rx1) = oneshot::channel(); @@ -796,7 +796,7 @@ mod dispatch_impl { let res = { let client = Client::builder() .executor(runtime.executor()) - .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(handle.clone()), closes_tx)); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -849,7 +849,7 @@ mod dispatch_impl { let client = Client::builder() .executor(runtime.executor()) - .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(handle.clone()), closes_tx)); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -913,7 +913,7 @@ mod dispatch_impl { let res = { let client = Client::builder() .executor(runtime.executor()) - .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(handle.clone()), closes_tx)); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -964,7 +964,7 @@ mod dispatch_impl { let res = { let client = Client::builder() .executor(runtime.executor()) - .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(handle.clone()), closes_tx)); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -1015,7 +1015,7 @@ mod dispatch_impl { let client = Client::builder() .keep_alive(false) .executor(runtime.executor()) - .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(handle.clone()), closes_tx)); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -1063,7 +1063,7 @@ mod dispatch_impl { let client = Client::builder() .executor(runtime.executor()) - .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(1, handle.clone()), closes_tx)); + .build(DebugConnector::with_http_and_closes(HttpConnector::new_with_handle(handle.clone()), closes_tx)); let req = Request::builder() .uri(&*format!("http://{}/a", addr)) @@ -1287,7 +1287,7 @@ mod dispatch_impl { impl DebugConnector { fn new(handle: &Handle) -> DebugConnector { - let http = HttpConnector::new_with_handle(1, handle.clone()); + let http = HttpConnector::new_with_handle(handle.clone()); let (tx, _) = mpsc::channel(10); DebugConnector::with_http_and_closes(http, tx) } diff --git a/tests/support/mod.rs b/tests/support/mod.rs index 683299e6b3..0865955fec 100644 --- a/tests/support/mod.rs +++ b/tests/support/mod.rs @@ -205,7 +205,7 @@ pub fn __run_test(cfg: __TestConfig) { Version::HTTP_11 }; - let connector = HttpConnector::new_with_handle(1, handle.clone()); + let connector = HttpConnector::new_with_handle(handle.clone()); let client = Client::builder() .keep_alive_timeout(Duration::from_secs(10)) .http2_only(cfg.client_version == 2)