diff --git a/Cargo.lock b/Cargo.lock index ae99ec9214..7e897e1749 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1687,6 +1687,14 @@ dependencies = [ "tracing", ] +[[package]] +name = "linkerd-http-body-compat" +version = "0.1.0" +dependencies = [ + "http", + "http-body", +] + [[package]] name = "linkerd-http-box" version = "0.1.0" @@ -1813,6 +1821,7 @@ dependencies = [ "hyper", "linkerd-error", "linkerd-exp-backoff", + "linkerd-http-body-compat", "linkerd-http-box", "linkerd-metrics", "linkerd-mock-http-body", diff --git a/Cargo.toml b/Cargo.toml index 925307856a..71c722a772 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ members = [ "linkerd/error-respond", "linkerd/exp-backoff", "linkerd/http/access-log", + "linkerd/http/body-compat", "linkerd/http/box", "linkerd/http/classify", "linkerd/http/executor", diff --git a/linkerd/http/body-compat/Cargo.toml b/linkerd/http/body-compat/Cargo.toml new file mode 100644 index 0000000000..b4ca7f6e4a --- /dev/null +++ b/linkerd/http/body-compat/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "linkerd-http-body-compat" +version = "0.1.0" +authors = ["Linkerd Developers "] +license = "Apache-2.0" +edition = "2021" +publish = false + +[dependencies] +http = { workspace = true } +http-body = { workspace = true } diff --git a/linkerd/http/retry/src/compat/frame.rs b/linkerd/http/body-compat/src/frame.rs similarity index 100% rename from linkerd/http/retry/src/compat/frame.rs rename to linkerd/http/body-compat/src/frame.rs diff --git a/linkerd/http/retry/src/compat.rs b/linkerd/http/body-compat/src/lib.rs similarity index 87% rename from linkerd/http/retry/src/compat.rs rename to linkerd/http/body-compat/src/lib.rs index a4e452d6be..2704823154 100644 --- a/linkerd/http/retry/src/compat.rs +++ b/linkerd/http/body-compat/src/lib.rs @@ -7,12 +7,12 @@ use std::{ task::{Context, Poll}, }; -pub(crate) use self::frame::Frame; +pub use self::frame::Frame; mod frame; #[derive(Debug)] -pub(crate) struct ForwardCompatibleBody { +pub struct ForwardCompatibleBody { inner: B, data_finished: bool, trailers_finished: bool, @@ -21,7 +21,7 @@ pub(crate) struct ForwardCompatibleBody { // === impl ForwardCompatibleBody === impl ForwardCompatibleBody { - pub(crate) fn new(body: B) -> Self { + pub fn new(body: B) -> Self { if body.is_end_stream() { Self { inner: body, @@ -37,28 +37,28 @@ impl ForwardCompatibleBody { } } - pub(crate) fn into_inner(self) -> B { + pub fn into_inner(self) -> B { self.inner } /// Returns a future that resolves to the next frame. - pub(crate) fn frame(&mut self) -> combinators::Frame<'_, B> { + pub fn frame(&mut self) -> combinators::Frame<'_, B> { combinators::Frame(self) } /// Returns `true` when the end of stream has been reached. - pub(crate) fn is_end_stream(&self) -> bool { + pub fn is_end_stream(&self) -> bool { self.inner.is_end_stream() } /// Returns the bounds on the remaining length of the stream. - pub(crate) fn size_hint(&self) -> SizeHint { + pub fn size_hint(&self) -> SizeHint { self.inner.size_hint() } } impl ForwardCompatibleBody { - pub(crate) fn poll_frame( + pub fn poll_frame( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, B::Error>>> { @@ -78,14 +78,13 @@ impl ForwardCompatibleBody { /// /// [frame]: https://docs.rs/http-body-util/0.1.2/http_body_util/combinators/struct.Frame.html mod combinators { - use core::future::Future; - use core::pin::Pin; - use core::task; - use http_body::Body; - use std::ops::Not; - use std::task::ready; - use super::ForwardCompatibleBody; + use core::{future::Future, pin::Pin, task}; + use http_body::Body; + use std::{ + ops::Not, + task::{ready, Context, Poll}, + }; #[must_use = "futures don't do anything unless polled"] #[derive(Debug)] @@ -95,7 +94,7 @@ mod combinators { impl Future for Frame<'_, T> { type Output = Option, T::Error>>; - fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> task::Poll { + fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { let Self(ForwardCompatibleBody { inner, data_finished, diff --git a/linkerd/http/retry/Cargo.toml b/linkerd/http/retry/Cargo.toml index a768f00648..dfbc05b50a 100644 --- a/linkerd/http/retry/Cargo.toml +++ b/linkerd/http/retry/Cargo.toml @@ -21,6 +21,7 @@ thiserror = "2" linkerd-http-box = { path = "../box" } linkerd-error = { path = "../../error" } linkerd-exp-backoff = { path = "../../exp-backoff" } +linkerd-http-body-compat = { path = "../body-compat" } linkerd-metrics = { path = "../../metrics" } linkerd-stack = { path = "../../stack" } diff --git a/linkerd/http/retry/src/lib.rs b/linkerd/http/retry/src/lib.rs index 4b2b2a9ce1..01375b65b1 100644 --- a/linkerd/http/retry/src/lib.rs +++ b/linkerd/http/retry/src/lib.rs @@ -4,8 +4,6 @@ pub mod peek_trailers; pub mod replay; -mod compat; - pub use self::{peek_trailers::PeekTrailersBody, replay::ReplayBody}; pub use tower::retry::budget::Budget; diff --git a/linkerd/http/retry/src/peek_trailers.rs b/linkerd/http/retry/src/peek_trailers.rs index fc0c2bf299..dfed1d89ca 100644 --- a/linkerd/http/retry/src/peek_trailers.rs +++ b/linkerd/http/retry/src/peek_trailers.rs @@ -123,7 +123,7 @@ impl PeekTrailersBody { { // XXX(kate): for now, wrap this in a compatibility adapter that yields `Frame`s. // this can be removed when we upgrade to http-body 1.0. - use crate::compat::ForwardCompatibleBody; + use linkerd_http_body_compat::ForwardCompatibleBody; let mut body = ForwardCompatibleBody::new(body); // First, poll the body for its first frame. @@ -220,9 +220,9 @@ impl PeekTrailersBody { /// /// This is an internal helper to facilitate pattern matching in `read_body(..)`, above. fn split_frame( - frame: crate::compat::Frame, + frame: linkerd_http_body_compat::Frame, ) -> Option> { - use {crate::compat::Frame, futures::future::Either}; + use {futures::future::Either, linkerd_http_body_compat::Frame}; match frame.into_data().map_err(Frame::into_trailers) { Ok(data) => Some(Either::Left(data)), Err(Ok(trailers)) => Some(Either::Right(trailers)), diff --git a/linkerd/http/retry/src/replay.rs b/linkerd/http/retry/src/replay.rs index 03804feab0..91f2a52618 100644 --- a/linkerd/http/retry/src/replay.rs +++ b/linkerd/http/retry/src/replay.rs @@ -68,7 +68,7 @@ struct SharedState { struct BodyState { replay: Replay, trailers: Option, - rest: crate::compat::ForwardCompatibleBody, + rest: linkerd_http_body_compat::ForwardCompatibleBody, is_completed: bool, /// Maximum number of bytes to buffer. @@ -104,7 +104,7 @@ impl ReplayBody { state: Some(BodyState { replay: Default::default(), trailers: None, - rest: crate::compat::ForwardCompatibleBody::new(body), + rest: linkerd_http_body_compat::ForwardCompatibleBody::new(body), is_completed: false, max_bytes: max_bytes + 1, }), @@ -368,9 +368,9 @@ impl ReplayBody { /// /// This is an internal helper to facilitate pattern matching in `read_body(..)`, above. fn split_frame( - frame: crate::compat::Frame, + frame: linkerd_http_body_compat::Frame, ) -> Option> { - use {crate::compat::Frame, futures::future::Either}; + use {futures::future::Either, linkerd_http_body_compat::Frame}; match frame.into_data().map_err(Frame::into_trailers) { Ok(data) => Some(Either::Left(data)), Err(Ok(trailers)) => Some(Either::Right(trailers)), diff --git a/linkerd/http/retry/src/replay/tests.rs b/linkerd/http/retry/src/replay/tests.rs index 67640ac23f..7eb2c218eb 100644 --- a/linkerd/http/retry/src/replay/tests.rs +++ b/linkerd/http/retry/src/replay/tests.rs @@ -89,7 +89,7 @@ async fn replays_trailers() { drop(tx); let read_trailers = |body: ReplayBody<_>| async move { - let mut body = crate::compat::ForwardCompatibleBody::new(body); + let mut body = linkerd_http_body_compat::ForwardCompatibleBody::new(body); let _ = body .frame() .await @@ -126,8 +126,8 @@ async fn replays_trailers_only() { replay, _trace, } = Test::new(); - let mut initial = crate::compat::ForwardCompatibleBody::new(initial); - let mut replay = crate::compat::ForwardCompatibleBody::new(replay); + let mut initial = linkerd_http_body_compat::ForwardCompatibleBody::new(initial); + let mut replay = linkerd_http_body_compat::ForwardCompatibleBody::new(replay); let mut tlrs = HeaderMap::new(); tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap()); @@ -332,8 +332,8 @@ async fn eos_only_when_fully_replayed() { .expect("body must not be too large"); let replay = initial.clone(); - let mut initial = crate::compat::ForwardCompatibleBody::new(initial); - let mut replay = crate::compat::ForwardCompatibleBody::new(replay); + let mut initial = linkerd_http_body_compat::ForwardCompatibleBody::new(initial); + let mut replay = linkerd_http_body_compat::ForwardCompatibleBody::new(replay); // Read the initial body, show that the replay does not consider itself to have reached the // end-of-stream. Then drop the initial body, show that the replay is still not done. @@ -374,7 +374,7 @@ async fn eos_only_when_fully_replayed() { drop(replay); // Read the second replay body. - let mut replay2 = crate::compat::ForwardCompatibleBody::new(replay2); + let mut replay2 = linkerd_http_body_compat::ForwardCompatibleBody::new(replay2); replay2 .frame() .await @@ -396,8 +396,8 @@ async fn eos_only_when_fully_replayed_with_trailers() { .expect("body must not be too large"); let replay = initial.clone(); - let mut initial = crate::compat::ForwardCompatibleBody::new(initial); - let mut replay = crate::compat::ForwardCompatibleBody::new(replay); + let mut initial = linkerd_http_body_compat::ForwardCompatibleBody::new(initial); + let mut replay = linkerd_http_body_compat::ForwardCompatibleBody::new(replay); // Read the initial body, show that the replay does not consider itself to have reached the // end-of-stream. Then drop the initial body, show that the replay is still not done. @@ -450,7 +450,7 @@ async fn eos_only_when_fully_replayed_with_trailers() { drop(replay); // Read the second replay body. - let mut replay2 = crate::compat::ForwardCompatibleBody::new(replay2); + let mut replay2 = linkerd_http_body_compat::ForwardCompatibleBody::new(replay2); replay2 .frame() .await @@ -508,7 +508,7 @@ async fn caps_buffer() { // The request's replay should error, since we discarded the buffer when // we hit the cap. - let mut replay = crate::compat::ForwardCompatibleBody::new(replay); + let mut replay = linkerd_http_body_compat::ForwardCompatibleBody::new(replay); let err = replay .frame() .await @@ -554,7 +554,7 @@ async fn caps_across_replays() { drop(replay); // The second replay will fail, though, because the buffer was discarded. - let mut replay2 = crate::compat::ForwardCompatibleBody::new(replay2); + let mut replay2 = linkerd_http_body_compat::ForwardCompatibleBody::new(replay2); let err = replay2 .frame() .await @@ -638,7 +638,7 @@ async fn size_hint_is_correct_across_replays() { assert_eq!(chunk(&mut initial).await.as_deref(), Some(BODY)); let initial = { // TODO(kate): the initial body doesn't report ending until it has (not) yielded trailers. - let mut body = crate::compat::ForwardCompatibleBody::new(initial); + let mut body = linkerd_http_body_compat::ForwardCompatibleBody::new(initial); assert!(body.frame().await.is_none()); body.into_inner() }; @@ -661,7 +661,7 @@ async fn size_hint_is_correct_across_replays() { assert_eq!(chunk(&mut replay).await.as_deref(), Some(BODY)); // let replay = { // // TODO(kate): the replay doesn't report ending until it has (not) yielded trailers. - // let mut body = crate::compat::ForwardCompatibleBody::new(replay); + // let mut body = linkerd_http_body_compat::ForwardCompatibleBody::new(replay); // assert!(body.frame().await.is_none()); // body.into_inner() // }; @@ -770,7 +770,7 @@ where T: http_body::Body + Unpin, { tracing::trace!("waiting for a body chunk..."); - let chunk = crate::compat::ForwardCompatibleBody::new(body) + let chunk = linkerd_http_body_compat::ForwardCompatibleBody::new(body) .frame() .await .expect("yields a result") @@ -788,7 +788,7 @@ where B: http_body::Body + Unpin, B::Error: std::fmt::Debug, { - let mut body = crate::compat::ForwardCompatibleBody::new(body); + let mut body = linkerd_http_body_compat::ForwardCompatibleBody::new(body); let mut data = String::new(); let mut trailers = None;