Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 17 additions & 27 deletions linkerd/app/inbound/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ async fn http1_bad_gateway_meshed_response_error_header() {
let cfg = default_config();
let (rt, _shutdown) = runtime();
let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_http1());
let (client, bg) = http_util::connect_and_accept(&mut client, server).await;
let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await;

// Send a request and assert that it is a BAD_GATEWAY with the expected
// header message.
Expand All @@ -209,7 +209,7 @@ async fn http1_bad_gateway_meshed_response_error_header() {
.body(Body::default())
.unwrap();
let rsp = client
.oneshot(req)
.send_request(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
Expand Down Expand Up @@ -380,17 +380,15 @@ async fn h2_response_meshed_error_header() {
let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error());

// Build a client using the connect that always errors.
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut client = hyper::client::conn::Builder::new();
client.http2_only(true);
let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the change we're applying to these inbound proxy tests.

let profiles = profile::resolver();
let profile_tx =
profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap());
profile_tx.send(profile::Profile::default()).unwrap();
let cfg = default_config();
let (rt, _shutdown) = runtime();
let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_h2());
let (client, bg) = http_util::connect_and_accept(&mut client, server).await;
let (mut client, bg) = http_util::connect_and_accept_http2(&mut client, server).await;

// Send a request and assert that it is SERVICE_UNAVAILABLE with the
// expected header message.
Expand All @@ -400,7 +398,7 @@ async fn h2_response_meshed_error_header() {
.body(Body::default())
.unwrap();
let rsp = client
.oneshot(req)
.send_request(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
Expand All @@ -422,17 +420,15 @@ async fn h2_response_unmeshed_error_header() {
let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error());

// Build a client using the connect that always errors.
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut client = hyper::client::conn::Builder::new();
client.http2_only(true);
let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor);
let profiles = profile::resolver();
let profile_tx =
profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap());
profile_tx.send(profile::Profile::default()).unwrap();
let cfg = default_config();
let (rt, _shutdown) = runtime();
let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_H2);
let (client, bg) = http_util::connect_and_accept(&mut client, server).await;
let (mut client, bg) = http_util::connect_and_accept_http2(&mut client, server).await;

// Send a request and assert that it is SERVICE_UNAVAILABLE with the
// expected header message.
Expand All @@ -442,7 +438,7 @@ async fn h2_response_unmeshed_error_header() {
.body(Body::default())
.unwrap();
let rsp = client
.oneshot(req)
.send_request(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
Expand All @@ -466,17 +462,15 @@ async fn grpc_meshed_response_error_header() {
let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error());

// Build a client using the connect that always errors.
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut client = hyper::client::conn::Builder::new();
client.http2_only(true);
let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor);
let profiles = profile::resolver();
let profile_tx =
profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap());
profile_tx.send(profile::Profile::default()).unwrap();
let cfg = default_config();
let (rt, _shutdown) = runtime();
let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_h2());
let (client, bg) = http_util::connect_and_accept(&mut client, server).await;
let (mut client, bg) = http_util::connect_and_accept_http2(&mut client, server).await;

// Send a request and assert that it is OK with the expected header
// message.
Expand All @@ -487,7 +481,7 @@ async fn grpc_meshed_response_error_header() {
.body(Body::default())
.unwrap();
let rsp = client
.oneshot(req)
.send_request(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
Expand All @@ -509,17 +503,15 @@ async fn grpc_unmeshed_response_error_header() {
let connect = support::connect().endpoint_fn_boxed(Target::addr(), connect_error());

// Build a client using the connect that always errors.
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut client = hyper::client::conn::Builder::new();
client.http2_only(true);
let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor);
let profiles = profile::resolver();
let profile_tx =
profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap());
profile_tx.send(profile::Profile::default()).unwrap();
let cfg = default_config();
let (rt, _shutdown) = runtime();
let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_H2);
let (client, bg) = http_util::connect_and_accept(&mut client, server).await;
let (mut client, bg) = http_util::connect_and_accept_http2(&mut client, server).await;

// Send a request and assert that it is OK with the expected header
// message.
Expand All @@ -530,7 +522,7 @@ async fn grpc_unmeshed_response_error_header() {
.body(Body::default())
.unwrap();
let rsp = client
.oneshot(req)
.send_request(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
Expand Down Expand Up @@ -560,9 +552,7 @@ async fn grpc_response_class() {
};

// Build a client using the connect that always errors.
#[allow(deprecated)] // linkerd/linkerd2#8733
let mut client = hyper::client::conn::Builder::new();
client.http2_only(true);
let mut client = hyper::client::conn::http2::Builder::new(TracingExecutor);
let profiles = profile::resolver();
let profile_tx =
profiles.profile_tx(NameAddr::from_str_and_port("foo.svc.cluster.local", 5550).unwrap());
Expand All @@ -575,7 +565,7 @@ async fn grpc_response_class() {
.http_endpoint
.into_report(time::Duration::from_secs(3600));
let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_h2());
let (client, bg) = http_util::connect_and_accept(&mut client, server).await;
let (mut client, bg) = http_util::connect_and_accept_http2(&mut client, server).await;

// Send a request and assert that it is OK with the expected header
// message.
Expand All @@ -587,7 +577,7 @@ async fn grpc_response_class() {
.unwrap();

let mut rsp = client
.oneshot(req)
.send_request(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
Expand Down
64 changes: 57 additions & 7 deletions linkerd/app/test/src/http_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,75 @@
app_core::{svc, Error},
io, ContextError,
};
use hyper::{body::HttpBody, Body};
use http_body::Body;
use tokio::task::JoinSet;
use tower::ServiceExt;
use tracing::Instrument;

#[allow(deprecated)] // linkerd/linkerd2#8733
use hyper::client::conn::{Builder as ClientBuilder, SendRequest};

type BoxServer = svc::BoxTcp<io::DuplexStream>;

/// Connects a client and server, running a proxy between them.
///
/// Returns a tuple containing (1) a [`SendRequest`] that can be used to transmit a request and

Check warning on line 14 in linkerd/app/test/src/http_util.rs

View workflow job for this annotation

GitHub Actions / rust

warning: unresolved link to `SendRequest` --> linkerd/app/test/src/http_util.rs:14:40 | 14 | /// Returns a tuple containing (1) a [`SendRequest`] that can be used to transmit a request and | ^^^^^^^^^^^ no item named `SendRequest` in scope | = help: to escape `[` and `]` characters, add '\' before them like `\[` or `\]` = note: `#[warn(rustdoc::broken_intra_doc_links)]` on by default
/// await a response, and (2) a [`JoinSet<T>`] running background tasks.
#[allow(deprecated)] // linkerd/linkerd2#8733
pub async fn connect_and_accept(
client_settings: &mut ClientBuilder,
client_settings: &mut hyper::client::conn::Builder,
server: BoxServer,
) -> (SendRequest<Body>, JoinSet<Result<(), Error>>) {
) -> (
hyper::client::conn::SendRequest<hyper::Body>,
JoinSet<Result<(), Error>>,
) {
tracing::info!(settings = ?client_settings, "connecting client with");
let (client_io, server_io) = io::duplex(4096);

let (client, conn) = client_settings
.handshake(client_io)
.await
.expect("Client must connect");

let mut bg = tokio::task::JoinSet::new();
bg.spawn(
async move {
server
.oneshot(server_io)
.await
.map_err(ContextError::ctx("proxy background task failed"))?;
tracing::info!("proxy serve task complete");
Ok(())
}
.instrument(tracing::info_span!("proxy")),
);
bg.spawn(
async move {
conn.await
.map_err(ContextError::ctx("client background task failed"))
.map_err(Error::from)?;
tracing::info!("client background complete");
Ok(())
}
.instrument(tracing::info_span!("client_bg")),
);

(client, bg)
}

/// Connects a client and server, running a proxy between them.
///
/// Returns a tuple containing (1) a [`SendRequest`] that can be used to transmit a request and

Check warning on line 60 in linkerd/app/test/src/http_util.rs

View workflow job for this annotation

GitHub Actions / rust

warning: unresolved link to `SendRequest` --> linkerd/app/test/src/http_util.rs:60:40 | 60 | /// Returns a tuple containing (1) a [`SendRequest`] that can be used to transmit a request and | ^^^^^^^^^^^ no item named `SendRequest` in scope | = help: to escape `[` and `]` characters, add '\' before them like `\[` or `\]`
/// await a response, and (2) a [`JoinSet<T>`] running background tasks.
pub async fn connect_and_accept_http2<B>(
client_settings: &mut hyper::client::conn::http2::Builder,
server: BoxServer,
) -> (
hyper::client::conn::http2::SendRequest<B>,
JoinSet<Result<(), Error>>,
)
Comment on lines +62 to +68
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the diff presents this unfortunately, but the body of connect_and_accept is not changed, this is the function added in this branch.

where
B: Body + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
tracing::info!(settings = ?client_settings, "connecting client with");
let (client_io, server_io) = io::duplex(4096);

Expand Down Expand Up @@ -58,7 +108,7 @@
/// Collects a request or response body, returning it as a [`String`].
pub async fn body_to_string<T>(body: T) -> Result<String, Error>
where
T: HttpBody,
T: Body,
T::Error: Into<Error>,
{
let bytes = body
Expand Down
Loading