Skip to content

Commit b0d54fb

Browse files
committed
Add an app::errors::layer to translate errors
Signed-off-by: Sean McArthur <[email protected]>
1 parent cc86324 commit b0d54fb

File tree

6 files changed

+188
-97
lines changed

6 files changed

+188
-97
lines changed

src/app/errors.rs

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
//! Layer to map HTTP service errors into appropriate `http::Response`s.
2+
3+
use futures::{Future, Poll};
4+
use http::{header, Request, Response, StatusCode};
5+
6+
use svc;
7+
8+
type Error = Box<dyn std::error::Error + Send + Sync>;
9+
10+
/// Layer to map HTTP service errors into appropriate `http::Response`s.
11+
pub fn layer() -> Layer {
12+
Layer
13+
}
14+
15+
#[derive(Clone, Debug)]
16+
pub struct Layer;
17+
18+
#[derive(Clone, Debug)]
19+
pub struct Stack<M> {
20+
inner: M,
21+
}
22+
23+
#[derive(Clone, Debug)]
24+
pub struct Service<S>(S);
25+
26+
#[derive(Debug)]
27+
pub struct ResponseFuture<F> {
28+
inner: F,
29+
}
30+
31+
impl<T, M> svc::Layer<T, T, M> for Layer
32+
where
33+
M: svc::Stack<T>,
34+
{
35+
type Value = <Stack<M> as svc::Stack<T>>::Value;
36+
type Error = <Stack<M> as svc::Stack<T>>::Error;
37+
type Stack = Stack<M>;
38+
39+
fn bind(&self, inner: M) -> Self::Stack {
40+
Stack { inner }
41+
}
42+
}
43+
44+
impl<T, M> svc::Stack<T> for Stack<M>
45+
where
46+
M: svc::Stack<T>,
47+
{
48+
type Value = Service<M::Value>;
49+
type Error = M::Error;
50+
51+
fn make(&self, target: &T) -> Result<Self::Value, Self::Error> {
52+
self.inner.make(target).map(Service)
53+
}
54+
}
55+
56+
impl<S, B1, B2> svc::Service<Request<B1>> for Service<S>
57+
where
58+
S: svc::Service<Request<B1>, Response = Response<B2>>,
59+
S::Error: Into<Error>,
60+
B2: Default,
61+
{
62+
type Response = S::Response;
63+
type Error = Error;
64+
type Future = ResponseFuture<S::Future>;
65+
66+
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
67+
self.0.poll_ready().map_err(Into::into)
68+
}
69+
70+
fn call(&mut self, req: Request<B1>) -> Self::Future {
71+
let inner = self.0.call(req);
72+
ResponseFuture { inner }
73+
}
74+
}
75+
76+
impl<F, B> Future for ResponseFuture<F>
77+
where
78+
F: Future<Item = Response<B>>,
79+
F::Error: Into<Error>,
80+
B: Default,
81+
{
82+
type Item = Response<B>;
83+
type Error = Error;
84+
85+
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
86+
match self.inner.poll() {
87+
Ok(ok) => Ok(ok),
88+
Err(err) => {
89+
let response = Response::builder()
90+
.status(map_err_to_5xx(err.into()))
91+
.header(header::CONTENT_LENGTH, "0")
92+
.body(B::default())
93+
.expect("app::errors response is valid");
94+
95+
Ok(response.into())
96+
}
97+
}
98+
}
99+
}
100+
101+
fn map_err_to_5xx(e: Error) -> StatusCode {
102+
use proxy::http::router::error as router;
103+
104+
if let Some(ref c) = e.downcast_ref::<router::NoCapacity>() {
105+
warn!("router at capacity ({})", c.0);
106+
http::StatusCode::SERVICE_UNAVAILABLE
107+
} else if let Some(ref r) = e.downcast_ref::<router::MakeRoute>() {
108+
error!("router error: {:?}", r);
109+
http::StatusCode::BAD_GATEWAY
110+
} else if let Some(_) = e.downcast_ref::<router::NotRecognized>() {
111+
error!("could not recognize request");
112+
http::StatusCode::BAD_GATEWAY
113+
} else {
114+
// we probably should have handled this before?
115+
error!("unexpected error: {}", e);
116+
http::StatusCode::BAD_GATEWAY
117+
}
118+
}

src/app/main.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,9 @@ where
582582
// Instantiates an HTTP service for each `Source` using the
583583
// shared `addr_router`. The `Source` is stored in the request's
584584
// extensions so that it can be used by the `addr_router`.
585-
let server_stack = addr_router.push(insert_target::layer());
585+
let server_stack = addr_router
586+
.push(insert_target::layer())
587+
.push(super::errors::layer());
586588

587589
// Instantiated for each TCP connection received from the local
588590
// application (including HTTP connections).
@@ -730,7 +732,8 @@ where
730732
.push(set_client_id_on_req::layer())
731733
.push(strip_header::request::layer(super::L5D_CLIENT_ID))
732734
.push(strip_header::response::layer(super::L5D_SERVER_ID))
733-
.push(strip_header::request::layer(super::DST_OVERRIDE_HEADER));
735+
.push(strip_header::request::layer(super::DST_OVERRIDE_HEADER))
736+
.push(super::errors::layer());
734737

735738
// As the inbound proxy accepts connections, we don't do any
736739
// special transport-level handling.

src/app/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod classify;
77
pub mod config;
88
mod control;
99
mod dst;
10+
mod errors;
1011
mod identity;
1112
mod inbound;
1213
mod main;

src/proxy/canonicalize.rs

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,15 @@
1111
1212
use futures::{future, sync::mpsc, Async, Future, Poll, Stream};
1313
use std::time::Duration;
14-
use std::{error, fmt};
1514
use tokio::executor::{DefaultExecutor, Executor};
1615
use tokio_timer::{clock, Delay, Timeout};
1716

1817
use dns;
1918
use svc;
2019
use {Addr, NameAddr};
2120

21+
type Error = Box<dyn std::error::Error + Send + Sync>;
22+
2223
/// Duration to wait before polling DNS again after an error (or a NXDOMAIN
2324
/// response with no TTL).
2425
const DNS_ERROR_TTL: Duration = Duration::from_secs(3);
@@ -71,12 +72,6 @@ enum State {
7172
ValidUntil(Delay),
7273
}
7374

74-
#[derive(Debug)]
75-
pub enum Error<M, S> {
76-
Stack(M),
77-
Service(S),
78-
}
79-
8075
// === Layer ===
8176

8277
// FIXME the resolver should be abstracted to a trait so that this can be tested
@@ -255,13 +250,15 @@ impl Cache {
255250

256251
// === impl Service ===
257252

258-
impl<M, Req> svc::Service<Req> for Service<M>
253+
impl<M, Req, Svc> svc::Service<Req> for Service<M>
259254
where
260-
M: svc::Stack<Addr>,
261-
M::Value: svc::Service<Req>,
255+
M: svc::Stack<Addr, Value = Svc>,
256+
M::Error: Into<Error>,
257+
Svc: svc::Service<Req>,
258+
Svc::Error: Into<Error>,
262259
{
263260
type Response = <M::Value as svc::Service<Req>>::Response;
264-
type Error = Error<M::Error, <M::Value as svc::Service<Req>>::Error>;
261+
type Error = Error;
265262
type Future = future::MapErr<
266263
<M::Value as svc::Service<Req>>::Future,
267264
fn(<M::Value as svc::Service<Req>>::Error) -> Self::Error,
@@ -270,12 +267,12 @@ where
270267
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
271268
while let Ok(Async::Ready(Some(addr))) = self.rx.poll() {
272269
debug!("refined: {}", addr);
273-
let svc = self.stack.make(&addr.into()).map_err(Error::Stack)?;
270+
let svc = self.stack.make(&addr.into()).map_err(Into::into)?;
274271
self.service = Some(svc);
275272
}
276273

277274
match self.service.as_mut() {
278-
Some(ref mut svc) => svc.poll_ready().map_err(Error::Service),
275+
Some(ref mut svc) => svc.poll_ready().map_err(Into::into),
279276
None => {
280277
trace!("resolution has not completed");
281278
Ok(Async::NotReady)
@@ -288,26 +285,6 @@ where
288285
.as_mut()
289286
.expect("poll_ready must be called first")
290287
.call(req)
291-
.map_err(Error::Service)
292-
}
293-
}
294-
295-
// === impl Error ===
296-
297-
impl<M: fmt::Display, S: fmt::Display> fmt::Display for Error<M, S> {
298-
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
299-
match self {
300-
Error::Stack(e) => e.fmt(f),
301-
Error::Service(e) => e.fmt(f),
302-
}
303-
}
304-
}
305-
306-
impl<M: error::Error, S: error::Error> error::Error for Error<M, S> {
307-
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
308-
match self {
309-
Error::Stack(e) => e.source(),
310-
Error::Service(e) => e.source(),
311-
}
288+
.map_err(Into::into)
312289
}
313290
}

src/proxy/http/router.rs

Lines changed: 8 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
use futures::{Future, Poll};
2-
use h2;
1+
use futures::Poll;
32
use http;
4-
use http::header::CONTENT_LENGTH;
53
use std::fmt;
64
use std::marker::PhantomData;
75
use std::time::Duration;
@@ -11,8 +9,10 @@ use svc;
119

1210
extern crate linkerd2_router;
1311

14-
pub use self::linkerd2_router::{Recognize, Router};
12+
pub use self::linkerd2_router::{error, Recognize, Router};
1513

14+
// compiler doesn't notice this type is used in where bounds below...
15+
#[allow(unused)]
1616
type Error = Box<dyn std::error::Error + Send + Sync>;
1717

1818
#[derive(Clone, Debug)]
@@ -49,11 +49,6 @@ where
4949
inner: Router<Req, Rec, Stk>,
5050
}
5151

52-
/// Catches errors from the inner future and maps them to 500 responses.
53-
pub struct ResponseFuture<F> {
54-
inner: F,
55-
}
56-
5752
// === impl Config ===
5853

5954
impl Config {
@@ -132,27 +127,6 @@ where
132127
}
133128
}
134129

135-
fn route_err_to_5xx(e: Error) -> http::StatusCode {
136-
use self::linkerd2_router::error;
137-
138-
if let Some(ref r) = e.downcast_ref::<error::MakeRoute>() {
139-
error!("router error: {:?}", r);
140-
http::StatusCode::INTERNAL_SERVER_ERROR
141-
} else if let Some(_) = e.downcast_ref::<error::NotRecognized>() {
142-
error!("could not recognize request");
143-
http::StatusCode::INTERNAL_SERVER_ERROR
144-
} else if let Some(ref c) = e.downcast_ref::<error::NoCapacity>() {
145-
// TODO For H2 streams, we should probably signal a protocol-level
146-
// capacity change.
147-
error!("router at capacity ({})", c.0);
148-
http::StatusCode::SERVICE_UNAVAILABLE
149-
} else {
150-
// Inner
151-
error!("service error: {}", e);
152-
http::StatusCode::INTERNAL_SERVER_ERROR
153-
}
154-
}
155-
156130
// === impl Service ===
157131

158132
impl<Req, Rec, Stk, B> svc::Service<Req> for Service<Req, Rec, Stk>
@@ -165,20 +139,16 @@ where
165139
B: Default + Send + 'static,
166140
{
167141
type Response = <Router<Req, Rec, Stk> as svc::Service<Req>>::Response;
168-
type Error = h2::Error;
169-
type Future = ResponseFuture<<Router<Req, Rec, Stk> as svc::Service<Req>>::Future>;
142+
type Error = <Router<Req, Rec, Stk> as svc::Service<Req>>::Error;
143+
type Future = <Router<Req, Rec, Stk> as svc::Service<Req>>::Future;
170144

171145
fn poll_ready(&mut self) -> Poll<(), Self::Error> {
172-
self.inner.poll_ready().map_err(|e| {
173-
error!("router failed to become ready: {:?}", e);
174-
h2::Reason::INTERNAL_ERROR.into()
175-
})
146+
self.inner.poll_ready()
176147
}
177148

178149
fn call(&mut self, request: Req) -> Self::Future {
179150
trace!("routing...");
180-
let inner = self.inner.call(request);
181-
ResponseFuture { inner }
151+
self.inner.call(request)
182152
}
183153
}
184154

@@ -195,26 +165,3 @@ where
195165
}
196166
}
197167
}
198-
199-
// === impl ResponseFuture ===
200-
201-
impl<F, B> Future for ResponseFuture<F>
202-
where
203-
F: Future<Item = http::Response<B>, Error = Error>,
204-
B: Default,
205-
{
206-
type Item = F::Item;
207-
type Error = h2::Error;
208-
209-
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
210-
self.inner.poll().or_else(|e| {
211-
let response = http::Response::builder()
212-
.status(route_err_to_5xx(e))
213-
.header(CONTENT_LENGTH, "0")
214-
.body(B::default())
215-
.unwrap();
216-
217-
Ok(response.into())
218-
})
219-
}
220-
}

0 commit comments

Comments
 (0)