Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 50 additions & 8 deletions linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ struct TlsParams {

const DETECT_TIMEOUT: Duration = Duration::from_secs(1);

#[derive(Copy, Clone, Debug)]
struct Rescue;

// === impl Config ===

impl Config {
Expand Down Expand Up @@ -94,13 +97,7 @@ impl Config {
.push(metrics.proxy.http_endpoint.to_layer::<classify::Response, _, Permitted>())
.push_map_target(|(permit, http)| Permitted { permit, http })
.push(inbound::policy::NewAuthorizeHttp::layer(metrics.http_authz.clone()))
.push(errors::respond::layer(|error: Error| -> Result<_> {
if error.is::<inbound::policy::DeniedUnauthorized> () {
return Ok(errors::SyntheticHttpResponse::permission_denied(error));
}
tracing::warn!(%error, "Unexpected error");
Ok(errors::SyntheticHttpResponse::unexpected_error())
}))
.push(Rescue::layer())
.push_on_service(http::BoxResponse::layer())
.push(http::NewServeHttp::layer(Default::default(), drain.clone()))
.push_request_filter(
Expand Down Expand Up @@ -145,7 +142,7 @@ impl Config {
},
)
.push(svc::ArcNewService::layer())
.push(detect::NewDetectService::layer(detect::Config::<http::DetectHttp>::from_timeout(DETECT_TIMEOUT)))
.push(detect::NewDetectService::layer(svc::stack::CloneParam::from(detect::Config::<http::DetectHttp>::from_timeout(DETECT_TIMEOUT))))
.push(transport::metrics::NewServer::layer(metrics.proxy.transport))
.push_map_target(move |(tls, addrs): (tls::ConditionalServerTls, B::Addrs)| {
Tcp {
Expand Down Expand Up @@ -258,3 +255,48 @@ impl<T> InsertParam<tls::ConditionalServerTls, T> for TlsParams {
(tls, target)
}
}

// === impl Rescue ===

impl Rescue {
/// Synthesizes responses for HTTP requests that encounter errors.
fn layer<N>(
) -> impl svc::layer::Layer<N, Service = errors::NewRespondService<Self, Self, N>> + Clone {
errors::respond::layer(Self)
}
}

impl<T> ExtractParam<Self, T> for Rescue {
#[inline]
fn extract_param(&self, _: &T) -> Self {
Self
}
}

impl<T: Param<tls::ConditionalServerTls>> ExtractParam<errors::respond::EmitHeaders, T> for Rescue {
#[inline]
fn extract_param(&self, t: &T) -> errors::respond::EmitHeaders {
// Only emit informational headers to meshed peers.
let emit = t
.param()
.value()
.map(|tls| match tls {
tls::ServerTls::Established { client_id, .. } => client_id.is_some(),
_ => false,
})
.unwrap_or(false);
errors::respond::EmitHeaders(emit)
}
}

impl errors::HttpRescue<Error> for Rescue {
fn rescue(&self, error: Error) -> Result<errors::SyntheticHttpResponse> {
let cause = errors::root_cause(&*error);
if cause.is::<inbound::policy::DeniedUnauthorized>() {
return Ok(errors::SyntheticHttpResponse::permission_denied(error));
}

tracing::warn!(%error, "Unexpected error");
Ok(errors::SyntheticHttpResponse::unexpected_error())
}
}
6 changes: 3 additions & 3 deletions linkerd/app/core/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub use crate::exp_backoff::ExponentialBackoff;
use crate::{
proxy::http::{self, h1, h2},
svc::Param,
svc::{stack::CloneParam, Param},
transport::{Keepalive, ListenAddr},
};
use std::time::Duration;
Expand Down Expand Up @@ -36,8 +36,8 @@ pub struct ProxyConfig {
// === impl ProxyConfig ===

impl ProxyConfig {
pub fn detect_http(&self) -> linkerd_detect::Config<http::DetectHttp> {
linkerd_detect::Config::from_timeout(self.detect_protocol_timeout)
pub fn detect_http(&self) -> CloneParam<linkerd_detect::Config<http::DetectHttp>> {
linkerd_detect::Config::from_timeout(self.detect_protocol_timeout).into()
}
}

Expand Down
73 changes: 50 additions & 23 deletions linkerd/app/core/src/errors/respond.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,16 @@ pub struct SyntheticHttpResponse {
pub message: Cow<'static, str>,
}

#[derive(Copy, Clone, Debug)]
pub struct EmitHeaders(pub bool);

#[derive(Clone, Debug)]
pub struct ExtractRespond<P>(P);

#[derive(Copy, Clone, Debug)]
pub struct NewRespond<R> {
rescue: R,
emit_headers: bool,
}

#[derive(Clone, Debug)]
Expand All @@ -52,6 +56,7 @@ pub struct Respond<R> {
version: http::Version,
is_grpc: bool,
client: Option<ClientHandle>,
emit_headers: bool,
}

#[pin_project(project = ResponseBodyProj)]
Expand All @@ -62,6 +67,7 @@ pub enum ResponseBody<R, B> {
inner: B,
trailers: Option<http::HeaderMap>,
rescue: R,
emit_headers: bool,
},
}

Expand Down Expand Up @@ -158,19 +164,21 @@ impl SyntheticHttpResponse {
}

#[inline]
fn grpc_response<B: Default>(&self) -> http::Response<B> {
fn grpc_response<B: Default>(&self, emit_headers: bool) -> http::Response<B> {
debug!(code = %self.grpc_status, "Handling error on gRPC connection");
let mut rsp = http::Response::builder()
.version(http::Version::HTTP_2)
.header(http::header::CONTENT_LENGTH, "0")
.header(http::header::CONTENT_TYPE, GRPC_CONTENT_TYPE)
.header(GRPC_STATUS, code_header(self.grpc_status))
.header(GRPC_MESSAGE, self.message());
.header(GRPC_STATUS, code_header(self.grpc_status));

// TODO only set when client is trusted.
rsp = rsp.header(L5D_PROXY_ERROR, self.message());
if emit_headers {
rsp = rsp
.header(GRPC_MESSAGE, self.message())
.header(L5D_PROXY_ERROR, self.message());
}

if self.close_connection {
if self.close_connection && emit_headers {
// TODO only set when meshed.
rsp = rsp.header(L5D_PROXY_CONNECTION, "close");
}
Expand All @@ -180,26 +188,33 @@ impl SyntheticHttpResponse {
}

#[inline]
fn http_response<B: Default>(&self, version: http::Version) -> http::Response<B> {
fn http_response<B: Default>(
&self,
version: http::Version,
emit_headers: bool,
) -> http::Response<B> {
debug!(status = %self.http_status, ?version, close = %self.close_connection, "Handling error on HTTP connection");
let mut rsp = http::Response::builder()
.status(self.http_status)
.version(version)
.header(http::header::CONTENT_LENGTH, "0");

// TODO only set when client is trusted.
rsp = rsp.header(L5D_PROXY_ERROR, self.message());
if emit_headers {
rsp = rsp.header(L5D_PROXY_ERROR, self.message());
}

if self.close_connection {
if version == http::Version::HTTP_11 {
// Notify the (proxy or non-proxy) client that the connection will be closed.
rsp = rsp.header(http::header::CONNECTION, "close");
}

// Tell the remote outbound proxy that it should close the connection to its
// Tell the remote outbound proxy that it should close the proxied connection to its
// application, i.e. so the application can choose another replica.
// TODO only set when meshed.
rsp = rsp.header(L5D_PROXY_CONNECTION, "close");
if emit_headers {
// TODO only set when meshed.
rsp = rsp.header(L5D_PROXY_CONNECTION, "close");
}
}

rsp.body(B::default())
Expand All @@ -212,11 +227,14 @@ impl SyntheticHttpResponse {
impl<T, R, P> ExtractParam<NewRespond<R>, T> for ExtractRespond<P>
where
P: ExtractParam<R, T>,
P: ExtractParam<EmitHeaders, T>,
{
#[inline]
fn extract_param(&self, t: &T) -> NewRespond<R> {
let EmitHeaders(emit_headers) = self.0.extract_param(t);
NewRespond {
rescue: self.0.extract_param(t),
emit_headers,
}
}
}
Expand All @@ -234,6 +252,7 @@ where
debug_assert!(client.is_some(), "Missing client handle");

let rescue = self.rescue.clone();
let emit_headers = self.emit_headers;

match req.version() {
http::Version::HTTP_2 => {
Expand All @@ -247,13 +266,15 @@ where
rescue,
is_grpc,
version: http::Version::HTTP_2,
emit_headers,
}
}
version => Respond {
client,
rescue,
version,
is_grpc: false,
emit_headers,
},
}
}
Expand Down Expand Up @@ -287,11 +308,13 @@ where
Respond {
is_grpc: true,
rescue,
emit_headers,
..
} => ResponseBody::GrpcRescue {
inner: b,
trailers: None,
rescue: rescue.clone(),
emit_headers: *emit_headers,
},
_ => ResponseBody::Passthru(b),
}));
Expand All @@ -313,9 +336,9 @@ where
}

let rsp = if self.is_grpc {
rsp.grpc_response()
rsp.grpc_response(self.emit_headers)
} else {
rsp.http_response(self.version)
rsp.http_response(self.version, self.emit_headers)
};

Ok(rsp)
Expand Down Expand Up @@ -348,6 +371,7 @@ where
inner,
trailers,
rescue,
emit_headers,
} => {
// should not be calling poll_data if we have set trailers derived from an error
assert!(trailers.is_none());
Expand All @@ -358,7 +382,8 @@ where
message,
..
} = rescue.rescue(error)?;
*trailers = Some(Self::grpc_trailers(grpc_status, &*message));
let t = Self::grpc_trailers(grpc_status, &*message, *emit_headers);
*trailers = Some(t);
Poll::Ready(None)
}
data => data,
Expand Down Expand Up @@ -403,17 +428,19 @@ where
}

impl<R, B> ResponseBody<R, B> {
fn grpc_trailers(code: tonic::Code, message: &str) -> http::HeaderMap {
fn grpc_trailers(code: tonic::Code, message: &str, emit_headers: bool) -> http::HeaderMap {
debug!(grpc.status = ?code, "Synthesizing gRPC trailers");
let mut t = http::HeaderMap::new();
t.insert(GRPC_STATUS, code_header(code));
t.insert(
GRPC_MESSAGE,
HeaderValue::from_str(message).unwrap_or_else(|error| {
warn!(%error, "Failed to encode error header");
HeaderValue::from_static("Unexpected error")
}),
);
if emit_headers {
t.insert(
GRPC_MESSAGE,
HeaderValue::from_str(message).unwrap_or_else(|error| {
warn!(%error, "Failed to encode error header");
HeaderValue::from_static("Unexpected error")
}),
);
}
t
}
}
Expand Down
25 changes: 24 additions & 1 deletion linkerd/app/inbound/src/http/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use linkerd_app_core::{
classify, dst, errors, http_tracing, io, metrics,
profiles::{self, DiscoveryRejected},
proxy::{http, tap},
svc::{self, Param},
svc::{self, ExtractParam, Param},
tls,
transport::{self, ClientAddr, Remote, ServerAddr},
Error, Infallible, NameAddr, Result,
Expand Down Expand Up @@ -428,6 +428,29 @@ impl ClientRescue {
}
}

impl<T> ExtractParam<Self, T> for ClientRescue {
#[inline]
fn extract_param(&self, _: &T) -> Self {
*self
}
}

impl ExtractParam<errors::respond::EmitHeaders, Logical> for ClientRescue {
#[inline]
fn extract_param(&self, t: &Logical) -> errors::respond::EmitHeaders {
// Only emit informational headers to meshed peers.
let emit = t
.tls
.value()
.map(|tls| match tls {
tls::ServerTls::Established { client_id, .. } => client_id.is_some(),
_ => false,
})
.unwrap_or(false);
errors::respond::EmitHeaders(emit)
}
}

impl errors::HttpRescue<Error> for ClientRescue {
fn rescue(&self, error: Error) -> Result<errors::SyntheticHttpResponse> {
let cause = errors::root_cause(&*error);
Expand Down
Loading