Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions src/app/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
//! Layer to map HTTP service errors into appropriate `http::Response`s.

use futures::{Future, Poll};
use http::{header, Request, Response, StatusCode};

use svc;

type Error = Box<dyn std::error::Error + Send + Sync>;

/// Layer to map HTTP service errors into appropriate `http::Response`s.
pub fn layer() -> Layer {
Layer
}

#[derive(Clone, Debug)]
pub struct Layer;

#[derive(Clone, Debug)]
pub struct Stack<M> {
inner: M,
}

#[derive(Clone, Debug)]
pub struct Service<S>(S);

#[derive(Debug)]
pub struct ResponseFuture<F> {
inner: F,
}

impl<T, M> svc::Layer<T, T, M> for Layer
where
M: svc::Stack<T>,
{
type Value = <Stack<M> as svc::Stack<T>>::Value;
type Error = <Stack<M> as svc::Stack<T>>::Error;
type Stack = Stack<M>;

fn bind(&self, inner: M) -> Self::Stack {
Stack { inner }
}
}

impl<T, M> svc::Stack<T> for Stack<M>
where
M: svc::Stack<T>,
{
type Value = Service<M::Value>;
type Error = M::Error;

fn make(&self, target: &T) -> Result<Self::Value, Self::Error> {
self.inner.make(target).map(Service)
}
}

impl<S, B1, B2> svc::Service<Request<B1>> for Service<S>
where
S: svc::Service<Request<B1>, Response = Response<B2>>,
S::Error: Into<Error>,
B2: Default,
{
type Response = S::Response;
type Error = Error;
type Future = ResponseFuture<S::Future>;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.0.poll_ready().map_err(Into::into)
}

fn call(&mut self, req: Request<B1>) -> Self::Future {
let inner = self.0.call(req);
ResponseFuture { inner }
}
}

impl<F, B> Future for ResponseFuture<F>
where
F: Future<Item = Response<B>>,
F::Error: Into<Error>,
B: Default,
{
type Item = Response<B>;
type Error = Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner.poll() {
Ok(ok) => Ok(ok),
Err(err) => {
let response = Response::builder()
.status(map_err_to_5xx(err.into()))
.header(header::CONTENT_LENGTH, "0")
.body(B::default())
.expect("app::errors response is valid");

Ok(response.into())
}
}
}
}

fn map_err_to_5xx(e: Error) -> StatusCode {
use proxy::http::router::error as router;

if let Some(ref c) = e.downcast_ref::<router::NoCapacity>() {
warn!("router at capacity ({})", c.0);
http::StatusCode::SERVICE_UNAVAILABLE
} else if let Some(ref r) = e.downcast_ref::<router::MakeRoute>() {
error!("router error: {:?}", r);
http::StatusCode::BAD_GATEWAY
} else if let Some(_) = e.downcast_ref::<router::NotRecognized>() {
error!("could not recognize request");
http::StatusCode::BAD_GATEWAY
} else {
// we probably should have handled this before?
error!("unexpected error: {}", e);
http::StatusCode::BAD_GATEWAY
}
}
7 changes: 5 additions & 2 deletions src/app/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,9 @@ where
// Instantiates an HTTP service for each `Source` using the
// shared `addr_router`. The `Source` is stored in the request's
// extensions so that it can be used by the `addr_router`.
let server_stack = addr_router.push(insert_target::layer());
let server_stack = addr_router
.push(insert_target::layer())
.push(super::errors::layer());

// Instantiated for each TCP connection received from the local
// application (including HTTP connections).
Expand Down Expand Up @@ -730,7 +732,8 @@ where
.push(set_client_id_on_req::layer())
.push(strip_header::request::layer(super::L5D_CLIENT_ID))
.push(strip_header::response::layer(super::L5D_SERVER_ID))
.push(strip_header::request::layer(super::DST_OVERRIDE_HEADER));
.push(strip_header::request::layer(super::DST_OVERRIDE_HEADER))
.push(super::errors::layer());

// As the inbound proxy accepts connections, we don't do any
// special transport-level handling.
Expand Down
1 change: 1 addition & 0 deletions src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod classify;
pub mod config;
mod control;
mod dst;
mod errors;
mod identity;
mod inbound;
mod main;
Expand Down
45 changes: 11 additions & 34 deletions src/proxy/canonicalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@

use futures::{future, sync::mpsc, Async, Future, Poll, Stream};
use std::time::Duration;
use std::{error, fmt};
use tokio::executor::{DefaultExecutor, Executor};
use tokio_timer::{clock, Delay, Timeout};

use dns;
use svc;
use {Addr, NameAddr};

type Error = Box<dyn std::error::Error + Send + Sync>;

/// Duration to wait before polling DNS again after an error (or a NXDOMAIN
/// response with no TTL).
const DNS_ERROR_TTL: Duration = Duration::from_secs(3);
Expand Down Expand Up @@ -71,12 +72,6 @@ enum State {
ValidUntil(Delay),
}

#[derive(Debug)]
pub enum Error<M, S> {
Stack(M),
Service(S),
}

// === Layer ===

// FIXME the resolver should be abstracted to a trait so that this can be tested
Expand Down Expand Up @@ -255,13 +250,15 @@ impl Cache {

// === impl Service ===

impl<M, Req> svc::Service<Req> for Service<M>
impl<M, Req, Svc> svc::Service<Req> for Service<M>
where
M: svc::Stack<Addr>,
M::Value: svc::Service<Req>,
M: svc::Stack<Addr, Value = Svc>,
M::Error: Into<Error>,
Svc: svc::Service<Req>,
Svc::Error: Into<Error>,
{
type Response = <M::Value as svc::Service<Req>>::Response;
type Error = Error<M::Error, <M::Value as svc::Service<Req>>::Error>;
type Error = Error;
type Future = future::MapErr<
<M::Value as svc::Service<Req>>::Future,
fn(<M::Value as svc::Service<Req>>::Error) -> Self::Error,
Expand All @@ -270,12 +267,12 @@ where
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
while let Ok(Async::Ready(Some(addr))) = self.rx.poll() {
debug!("refined: {}", addr);
let svc = self.stack.make(&addr.into()).map_err(Error::Stack)?;
let svc = self.stack.make(&addr.into()).map_err(Into::into)?;
self.service = Some(svc);
}

match self.service.as_mut() {
Some(ref mut svc) => svc.poll_ready().map_err(Error::Service),
Some(ref mut svc) => svc.poll_ready().map_err(Into::into),
None => {
trace!("resolution has not completed");
Ok(Async::NotReady)
Expand All @@ -288,26 +285,6 @@ where
.as_mut()
.expect("poll_ready must be called first")
.call(req)
.map_err(Error::Service)
}
}

// === impl Error ===

impl<M: fmt::Display, S: fmt::Display> fmt::Display for Error<M, S> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::Stack(e) => e.fmt(f),
Error::Service(e) => e.fmt(f),
}
}
}

impl<M: error::Error, S: error::Error> error::Error for Error<M, S> {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
Error::Stack(e) => e.source(),
Error::Service(e) => e.source(),
}
.map_err(Into::into)
}
}
69 changes: 8 additions & 61 deletions src/proxy/http/router.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
use futures::{Future, Poll};
use h2;
use futures::Poll;
use http;
use http::header::CONTENT_LENGTH;
use std::fmt;
use std::marker::PhantomData;
use std::time::Duration;
Expand All @@ -11,8 +9,10 @@ use svc;

extern crate linkerd2_router;

pub use self::linkerd2_router::{Recognize, Router};
pub use self::linkerd2_router::{error, Recognize, Router};

// compiler doesn't notice this type is used in where bounds below...
#[allow(unused)]
type Error = Box<dyn std::error::Error + Send + Sync>;

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -49,11 +49,6 @@ where
inner: Router<Req, Rec, Stk>,
}

/// Catches errors from the inner future and maps them to 500 responses.
pub struct ResponseFuture<F> {
inner: F,
}

// === impl Config ===

impl Config {
Expand Down Expand Up @@ -132,27 +127,6 @@ where
}
}

fn route_err_to_5xx(e: Error) -> http::StatusCode {
use self::linkerd2_router::error;

if let Some(ref r) = e.downcast_ref::<error::MakeRoute>() {
error!("router error: {:?}", r);
http::StatusCode::INTERNAL_SERVER_ERROR
} else if let Some(_) = e.downcast_ref::<error::NotRecognized>() {
error!("could not recognize request");
http::StatusCode::INTERNAL_SERVER_ERROR
} else if let Some(ref c) = e.downcast_ref::<error::NoCapacity>() {
// TODO For H2 streams, we should probably signal a protocol-level
// capacity change.
error!("router at capacity ({})", c.0);
http::StatusCode::SERVICE_UNAVAILABLE
} else {
// Inner
error!("service error: {}", e);
http::StatusCode::INTERNAL_SERVER_ERROR
}
}

// === impl Service ===

impl<Req, Rec, Stk, B> svc::Service<Req> for Service<Req, Rec, Stk>
Expand All @@ -165,20 +139,16 @@ where
B: Default + Send + 'static,
{
type Response = <Router<Req, Rec, Stk> as svc::Service<Req>>::Response;
type Error = h2::Error;
type Future = ResponseFuture<<Router<Req, Rec, Stk> as svc::Service<Req>>::Future>;
type Error = <Router<Req, Rec, Stk> as svc::Service<Req>>::Error;
type Future = <Router<Req, Rec, Stk> as svc::Service<Req>>::Future;

fn poll_ready(&mut self) -> Poll<(), Self::Error> {
self.inner.poll_ready().map_err(|e| {
error!("router failed to become ready: {:?}", e);
h2::Reason::INTERNAL_ERROR.into()
})
self.inner.poll_ready()
}

fn call(&mut self, request: Req) -> Self::Future {
trace!("routing...");
let inner = self.inner.call(request);
ResponseFuture { inner }
self.inner.call(request)
}
}

Expand All @@ -195,26 +165,3 @@ where
}
}
}

// === impl ResponseFuture ===

impl<F, B> Future for ResponseFuture<F>
where
F: Future<Item = http::Response<B>, Error = Error>,
B: Default,
{
type Item = F::Item;
type Error = h2::Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll().or_else(|e| {
let response = http::Response::builder()
.status(route_err_to_5xx(e))
.header(CONTENT_LENGTH, "0")
.body(B::default())
.unwrap();

Ok(response.into())
})
}
}
Loading