From b0d54fb206aa27908e34eaf7904310720a88a457 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Mon, 25 Mar 2019 14:26:35 -0700 Subject: [PATCH] Add an app::errors::layer to translate errors Signed-off-by: Sean McArthur --- src/app/errors.rs | 118 ++++++++++++++++++++++++++++++++++++++ src/app/main.rs | 7 ++- src/app/mod.rs | 1 + src/proxy/canonicalize.rs | 45 ++++----------- src/proxy/http/router.rs | 69 +++------------------- tests/discovery.rs | 45 +++++++++++++++ 6 files changed, 188 insertions(+), 97 deletions(-) create mode 100644 src/app/errors.rs diff --git a/src/app/errors.rs b/src/app/errors.rs new file mode 100644 index 0000000000..f20729675f --- /dev/null +++ b/src/app/errors.rs @@ -0,0 +1,118 @@ +//! Layer to map HTTP service errors into appropriate `http::Response`s. + +use futures::{Future, Poll}; +use http::{header, Request, Response, StatusCode}; + +use svc; + +type Error = Box; + +/// Layer to map HTTP service errors into appropriate `http::Response`s. +pub fn layer() -> Layer { + Layer +} + +#[derive(Clone, Debug)] +pub struct Layer; + +#[derive(Clone, Debug)] +pub struct Stack { + inner: M, +} + +#[derive(Clone, Debug)] +pub struct Service(S); + +#[derive(Debug)] +pub struct ResponseFuture { + inner: F, +} + +impl svc::Layer for Layer +where + M: svc::Stack, +{ + type Value = as svc::Stack>::Value; + type Error = as svc::Stack>::Error; + type Stack = Stack; + + fn bind(&self, inner: M) -> Self::Stack { + Stack { inner } + } +} + +impl svc::Stack for Stack +where + M: svc::Stack, +{ + type Value = Service; + type Error = M::Error; + + fn make(&self, target: &T) -> Result { + self.inner.make(target).map(Service) + } +} + +impl svc::Service> for Service +where + S: svc::Service, Response = Response>, + S::Error: Into, + B2: Default, +{ + type Response = S::Response; + type Error = Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self) -> Poll<(), Self::Error> { + self.0.poll_ready().map_err(Into::into) + } + + fn call(&mut self, req: Request) -> Self::Future { + let inner = self.0.call(req); + ResponseFuture { inner } + } +} + +impl Future for ResponseFuture +where + F: Future>, + F::Error: Into, + B: Default, +{ + type Item = Response; + type Error = Error; + + fn poll(&mut self) -> Poll { + match self.inner.poll() { + Ok(ok) => Ok(ok), + Err(err) => { + let response = Response::builder() + .status(map_err_to_5xx(err.into())) + .header(header::CONTENT_LENGTH, "0") + .body(B::default()) + .expect("app::errors response is valid"); + + Ok(response.into()) + } + } + } +} + +fn map_err_to_5xx(e: Error) -> StatusCode { + use proxy::http::router::error as router; + + if let Some(ref c) = e.downcast_ref::() { + warn!("router at capacity ({})", c.0); + http::StatusCode::SERVICE_UNAVAILABLE + } else if let Some(ref r) = e.downcast_ref::() { + error!("router error: {:?}", r); + http::StatusCode::BAD_GATEWAY + } else if let Some(_) = e.downcast_ref::() { + error!("could not recognize request"); + http::StatusCode::BAD_GATEWAY + } else { + // we probably should have handled this before? + error!("unexpected error: {}", e); + http::StatusCode::BAD_GATEWAY + } +} diff --git a/src/app/main.rs b/src/app/main.rs index 1afc166471..6c9fe4bf80 100644 --- a/src/app/main.rs +++ b/src/app/main.rs @@ -582,7 +582,9 @@ where // Instantiates an HTTP service for each `Source` using the // shared `addr_router`. The `Source` is stored in the request's // extensions so that it can be used by the `addr_router`. - let server_stack = addr_router.push(insert_target::layer()); + let server_stack = addr_router + .push(insert_target::layer()) + .push(super::errors::layer()); // Instantiated for each TCP connection received from the local // application (including HTTP connections). @@ -730,7 +732,8 @@ where .push(set_client_id_on_req::layer()) .push(strip_header::request::layer(super::L5D_CLIENT_ID)) .push(strip_header::response::layer(super::L5D_SERVER_ID)) - .push(strip_header::request::layer(super::DST_OVERRIDE_HEADER)); + .push(strip_header::request::layer(super::DST_OVERRIDE_HEADER)) + .push(super::errors::layer()); // As the inbound proxy accepts connections, we don't do any // special transport-level handling. diff --git a/src/app/mod.rs b/src/app/mod.rs index b0fb66bd5f..4f99afd250 100644 --- a/src/app/mod.rs +++ b/src/app/mod.rs @@ -7,6 +7,7 @@ mod classify; pub mod config; mod control; mod dst; +mod errors; mod identity; mod inbound; mod main; diff --git a/src/proxy/canonicalize.rs b/src/proxy/canonicalize.rs index 8abbc177fa..b1a3fcfb9b 100644 --- a/src/proxy/canonicalize.rs +++ b/src/proxy/canonicalize.rs @@ -11,7 +11,6 @@ use futures::{future, sync::mpsc, Async, Future, Poll, Stream}; use std::time::Duration; -use std::{error, fmt}; use tokio::executor::{DefaultExecutor, Executor}; use tokio_timer::{clock, Delay, Timeout}; @@ -19,6 +18,8 @@ use dns; use svc; use {Addr, NameAddr}; +type Error = Box; + /// Duration to wait before polling DNS again after an error (or a NXDOMAIN /// response with no TTL). const DNS_ERROR_TTL: Duration = Duration::from_secs(3); @@ -71,12 +72,6 @@ enum State { ValidUntil(Delay), } -#[derive(Debug)] -pub enum Error { - Stack(M), - Service(S), -} - // === Layer === // FIXME the resolver should be abstracted to a trait so that this can be tested @@ -255,13 +250,15 @@ impl Cache { // === impl Service === -impl svc::Service for Service +impl svc::Service for Service where - M: svc::Stack, - M::Value: svc::Service, + M: svc::Stack, + M::Error: Into, + Svc: svc::Service, + Svc::Error: Into, { type Response = >::Response; - type Error = Error>::Error>; + type Error = Error; type Future = future::MapErr< >::Future, fn(>::Error) -> Self::Error, @@ -270,12 +267,12 @@ where fn poll_ready(&mut self) -> Poll<(), Self::Error> { while let Ok(Async::Ready(Some(addr))) = self.rx.poll() { debug!("refined: {}", addr); - let svc = self.stack.make(&addr.into()).map_err(Error::Stack)?; + let svc = self.stack.make(&addr.into()).map_err(Into::into)?; self.service = Some(svc); } match self.service.as_mut() { - Some(ref mut svc) => svc.poll_ready().map_err(Error::Service), + Some(ref mut svc) => svc.poll_ready().map_err(Into::into), None => { trace!("resolution has not completed"); Ok(Async::NotReady) @@ -288,26 +285,6 @@ where .as_mut() .expect("poll_ready must be called first") .call(req) - .map_err(Error::Service) - } -} - -// === impl Error === - -impl fmt::Display for Error { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Error::Stack(e) => e.fmt(f), - Error::Service(e) => e.fmt(f), - } - } -} - -impl error::Error for Error { - fn source(&self) -> Option<&(dyn error::Error + 'static)> { - match self { - Error::Stack(e) => e.source(), - Error::Service(e) => e.source(), - } + .map_err(Into::into) } } diff --git a/src/proxy/http/router.rs b/src/proxy/http/router.rs index f95dc3fa74..1f6a92eb45 100644 --- a/src/proxy/http/router.rs +++ b/src/proxy/http/router.rs @@ -1,7 +1,5 @@ -use futures::{Future, Poll}; -use h2; +use futures::Poll; use http; -use http::header::CONTENT_LENGTH; use std::fmt; use std::marker::PhantomData; use std::time::Duration; @@ -11,8 +9,10 @@ use svc; extern crate linkerd2_router; -pub use self::linkerd2_router::{Recognize, Router}; +pub use self::linkerd2_router::{error, Recognize, Router}; +// compiler doesn't notice this type is used in where bounds below... +#[allow(unused)] type Error = Box; #[derive(Clone, Debug)] @@ -49,11 +49,6 @@ where inner: Router, } -/// Catches errors from the inner future and maps them to 500 responses. -pub struct ResponseFuture { - inner: F, -} - // === impl Config === impl Config { @@ -132,27 +127,6 @@ where } } -fn route_err_to_5xx(e: Error) -> http::StatusCode { - use self::linkerd2_router::error; - - if let Some(ref r) = e.downcast_ref::() { - error!("router error: {:?}", r); - http::StatusCode::INTERNAL_SERVER_ERROR - } else if let Some(_) = e.downcast_ref::() { - error!("could not recognize request"); - http::StatusCode::INTERNAL_SERVER_ERROR - } else if let Some(ref c) = e.downcast_ref::() { - // TODO For H2 streams, we should probably signal a protocol-level - // capacity change. - error!("router at capacity ({})", c.0); - http::StatusCode::SERVICE_UNAVAILABLE - } else { - // Inner - error!("service error: {}", e); - http::StatusCode::INTERNAL_SERVER_ERROR - } -} - // === impl Service === impl svc::Service for Service @@ -165,20 +139,16 @@ where B: Default + Send + 'static, { type Response = as svc::Service>::Response; - type Error = h2::Error; - type Future = ResponseFuture< as svc::Service>::Future>; + type Error = as svc::Service>::Error; + type Future = as svc::Service>::Future; fn poll_ready(&mut self) -> Poll<(), Self::Error> { - self.inner.poll_ready().map_err(|e| { - error!("router failed to become ready: {:?}", e); - h2::Reason::INTERNAL_ERROR.into() - }) + self.inner.poll_ready() } fn call(&mut self, request: Req) -> Self::Future { trace!("routing..."); - let inner = self.inner.call(request); - ResponseFuture { inner } + self.inner.call(request) } } @@ -195,26 +165,3 @@ where } } } - -// === impl ResponseFuture === - -impl Future for ResponseFuture -where - F: Future, Error = Error>, - B: Default, -{ - type Item = F::Item; - type Error = h2::Error; - - fn poll(&mut self) -> Poll { - self.inner.poll().or_else(|e| { - let response = http::Response::builder() - .status(route_err_to_5xx(e)) - .header(CONTENT_LENGTH, "0") - .body(B::default()) - .unwrap(); - - Ok(response.into()) - }) - } -} diff --git a/tests/discovery.rs b/tests/discovery.rs index ecb441644f..0ab7d0024b 100644 --- a/tests/discovery.rs +++ b/tests/discovery.rs @@ -166,6 +166,51 @@ macro_rules! generate_tests { assert_eq!(client.get("/"), "hello"); } + #[test] + fn outbound_router_capacity() { + let _ = env_logger_init(); + let srv = $make_server().route("/", "hello").run(); + let srv_addr = srv.addr; + + let mut env = app::config::TestEnv::new(); + + // Testing what happens if we go over the router capacity... + let router_cap = 2; + env.put(app::config::ENV_OUTBOUND_ROUTER_CAPACITY, router_cap.to_string()); + + let ctrl = controller::new(); + let _txs = (0..=router_cap).map(|n| { + let disco_n = format!("disco{}.test.svc.cluster.local", n); + let tx = ctrl.destination_tx(&disco_n); + tx.send_addr(srv_addr); + tx // This will go into a vec, to keep the stream open. + }).collect::>(); + + let proxy = proxy::new() + .controller(ctrl.run()) + .outbound(srv) + .run_with_test_env(env); + + // Make requests that go through service discovery, to reach the + // router capacity. + for n in 0..router_cap { + let route = format!("disco{}.test.svc.cluster.local", n); + let client = $make_client(proxy.outbound, route); + println!("trying disco{}...", n); + assert_eq!(client.get("/"), "hello"); + } + + // The next request will fail, because we have reached the + // router capacity. + let nth_host = format!("disco{}.test.svc.cluster.local", router_cap); + let client = $make_client(proxy.outbound, &*nth_host); + println!("disco{} should fail...", router_cap); + let rsp = client.request(&mut client.request_builder("/")); + + // We should have gotten an HTTP response, not an error. + assert_eq!(rsp.status(), http::StatusCode::SERVICE_UNAVAILABLE); + } + #[test] fn outbound_reconnects_if_controller_stream_ends() { let _ = env_logger_init();