Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1687,6 +1687,14 @@ dependencies = [
"tracing",
]

[[package]]
name = "linkerd-http-body-compat"
version = "0.1.0"
dependencies = [
"http",
"http-body",
]

[[package]]
name = "linkerd-http-box"
version = "0.1.0"
Expand Down Expand Up @@ -1813,6 +1821,7 @@ dependencies = [
"hyper",
"linkerd-error",
"linkerd-exp-backoff",
"linkerd-http-body-compat",
"linkerd-http-box",
"linkerd-metrics",
"linkerd-mock-http-body",
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ members = [
"linkerd/error-respond",
"linkerd/exp-backoff",
"linkerd/http/access-log",
"linkerd/http/body-compat",
"linkerd/http/box",
"linkerd/http/classify",
"linkerd/http/executor",
Expand Down
11 changes: 11 additions & 0 deletions linkerd/http/body-compat/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "linkerd-http-body-compat"
version = "0.1.0"
authors = ["Linkerd Developers <[email protected]>"]
license = "Apache-2.0"
edition = "2021"
publish = false

[dependencies]
http = { workspace = true }
http-body = { workspace = true }
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use std::{
task::{Context, Poll},
};

pub(crate) use self::frame::Frame;
pub use self::frame::Frame;

mod frame;

#[derive(Debug)]
pub(crate) struct ForwardCompatibleBody<B> {
pub struct ForwardCompatibleBody<B> {
inner: B,
data_finished: bool,
trailers_finished: bool,
Expand All @@ -21,7 +21,7 @@ pub(crate) struct ForwardCompatibleBody<B> {
// === impl ForwardCompatibleBody ===

impl<B: Body> ForwardCompatibleBody<B> {
pub(crate) fn new(body: B) -> Self {
pub fn new(body: B) -> Self {
if body.is_end_stream() {
Self {
inner: body,
Expand All @@ -37,28 +37,28 @@ impl<B: Body> ForwardCompatibleBody<B> {
}
}

pub(crate) fn into_inner(self) -> B {
pub fn into_inner(self) -> B {
self.inner
}

/// Returns a future that resolves to the next frame.
pub(crate) fn frame(&mut self) -> combinators::Frame<'_, B> {
pub fn frame(&mut self) -> combinators::Frame<'_, B> {
combinators::Frame(self)
}

/// Returns `true` when the end of stream has been reached.
pub(crate) fn is_end_stream(&self) -> bool {
pub fn is_end_stream(&self) -> bool {
self.inner.is_end_stream()
}

/// Returns the bounds on the remaining length of the stream.
pub(crate) fn size_hint(&self) -> SizeHint {
pub fn size_hint(&self) -> SizeHint {
self.inner.size_hint()
}
}

impl<B: Body + Unpin> ForwardCompatibleBody<B> {
pub(crate) fn poll_frame(
pub fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<B::Data>, B::Error>>> {
Expand All @@ -78,14 +78,13 @@ impl<B: Body + Unpin> ForwardCompatibleBody<B> {
///
/// [frame]: https://docs.rs/http-body-util/0.1.2/http_body_util/combinators/struct.Frame.html
mod combinators {
use core::future::Future;
use core::pin::Pin;
use core::task;
use http_body::Body;
use std::ops::Not;
use std::task::ready;

use super::ForwardCompatibleBody;
use core::{future::Future, pin::Pin, task};
use http_body::Body;
use std::{
ops::Not,
task::{ready, Context, Poll},
};

#[must_use = "futures don't do anything unless polled"]
#[derive(Debug)]
Expand All @@ -95,7 +94,7 @@ mod combinators {
impl<T: Body + Unpin> Future for Frame<'_, T> {
type Output = Option<Result<super::Frame<T::Data>, T::Error>>;

fn poll(self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let Self(ForwardCompatibleBody {
inner,
data_finished,
Expand Down
1 change: 1 addition & 0 deletions linkerd/http/retry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ thiserror = "2"
linkerd-http-box = { path = "../box" }
linkerd-error = { path = "../../error" }
linkerd-exp-backoff = { path = "../../exp-backoff" }
linkerd-http-body-compat = { path = "../body-compat" }
linkerd-metrics = { path = "../../metrics" }
linkerd-stack = { path = "../../stack" }

Expand Down
2 changes: 0 additions & 2 deletions linkerd/http/retry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
pub mod peek_trailers;
pub mod replay;

mod compat;

pub use self::{peek_trailers::PeekTrailersBody, replay::ReplayBody};
pub use tower::retry::budget::Budget;

Expand Down
6 changes: 3 additions & 3 deletions linkerd/http/retry/src/peek_trailers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl<B: Body> PeekTrailersBody<B> {
{
// XXX(kate): for now, wrap this in a compatibility adapter that yields `Frame<T>`s.
// this can be removed when we upgrade to http-body 1.0.
use crate::compat::ForwardCompatibleBody;
use linkerd_http_body_compat::ForwardCompatibleBody;
let mut body = ForwardCompatibleBody::new(body);

// First, poll the body for its first frame.
Expand Down Expand Up @@ -220,9 +220,9 @@ impl<B: Body> PeekTrailersBody<B> {
///
/// This is an internal helper to facilitate pattern matching in `read_body(..)`, above.
fn split_frame(
frame: crate::compat::Frame<B::Data>,
frame: linkerd_http_body_compat::Frame<B::Data>,
) -> Option<futures::future::Either<B::Data, HeaderMap>> {
use {crate::compat::Frame, futures::future::Either};
use {futures::future::Either, linkerd_http_body_compat::Frame};
match frame.into_data().map_err(Frame::into_trailers) {
Ok(data) => Some(Either::Left(data)),
Err(Ok(trailers)) => Some(Either::Right(trailers)),
Expand Down
8 changes: 4 additions & 4 deletions linkerd/http/retry/src/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ struct SharedState<B> {
struct BodyState<B> {
replay: Replay,
trailers: Option<HeaderMap>,
rest: crate::compat::ForwardCompatibleBody<B>,
rest: linkerd_http_body_compat::ForwardCompatibleBody<B>,
is_completed: bool,

/// Maximum number of bytes to buffer.
Expand Down Expand Up @@ -104,7 +104,7 @@ impl<B: Body> ReplayBody<B> {
state: Some(BodyState {
replay: Default::default(),
trailers: None,
rest: crate::compat::ForwardCompatibleBody::new(body),
rest: linkerd_http_body_compat::ForwardCompatibleBody::new(body),
is_completed: false,
max_bytes: max_bytes + 1,
}),
Expand Down Expand Up @@ -368,9 +368,9 @@ impl<B: Body> ReplayBody<B> {
///
/// This is an internal helper to facilitate pattern matching in `read_body(..)`, above.
fn split_frame(
frame: crate::compat::Frame<B::Data>,
frame: linkerd_http_body_compat::Frame<B::Data>,
) -> Option<futures::future::Either<B::Data, HeaderMap>> {
use {crate::compat::Frame, futures::future::Either};
use {futures::future::Either, linkerd_http_body_compat::Frame};
match frame.into_data().map_err(Frame::into_trailers) {
Ok(data) => Some(Either::Left(data)),
Err(Ok(trailers)) => Some(Either::Right(trailers)),
Expand Down
30 changes: 15 additions & 15 deletions linkerd/http/retry/src/replay/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ async fn replays_trailers() {
drop(tx);

let read_trailers = |body: ReplayBody<_>| async move {
let mut body = crate::compat::ForwardCompatibleBody::new(body);
let mut body = linkerd_http_body_compat::ForwardCompatibleBody::new(body);
let _ = body
.frame()
.await
Expand Down Expand Up @@ -126,8 +126,8 @@ async fn replays_trailers_only() {
replay,
_trace,
} = Test::new();
let mut initial = crate::compat::ForwardCompatibleBody::new(initial);
let mut replay = crate::compat::ForwardCompatibleBody::new(replay);
let mut initial = linkerd_http_body_compat::ForwardCompatibleBody::new(initial);
let mut replay = linkerd_http_body_compat::ForwardCompatibleBody::new(replay);

let mut tlrs = HeaderMap::new();
tlrs.insert("x-hello", HeaderValue::from_str("world").unwrap());
Expand Down Expand Up @@ -332,8 +332,8 @@ async fn eos_only_when_fully_replayed() {
.expect("body must not be too large");
let replay = initial.clone();

let mut initial = crate::compat::ForwardCompatibleBody::new(initial);
let mut replay = crate::compat::ForwardCompatibleBody::new(replay);
let mut initial = linkerd_http_body_compat::ForwardCompatibleBody::new(initial);
let mut replay = linkerd_http_body_compat::ForwardCompatibleBody::new(replay);

// Read the initial body, show that the replay does not consider itself to have reached the
// end-of-stream. Then drop the initial body, show that the replay is still not done.
Expand Down Expand Up @@ -374,7 +374,7 @@ async fn eos_only_when_fully_replayed() {
drop(replay);

// Read the second replay body.
let mut replay2 = crate::compat::ForwardCompatibleBody::new(replay2);
let mut replay2 = linkerd_http_body_compat::ForwardCompatibleBody::new(replay2);
replay2
.frame()
.await
Expand All @@ -396,8 +396,8 @@ async fn eos_only_when_fully_replayed_with_trailers() {
.expect("body must not be too large");
let replay = initial.clone();

let mut initial = crate::compat::ForwardCompatibleBody::new(initial);
let mut replay = crate::compat::ForwardCompatibleBody::new(replay);
let mut initial = linkerd_http_body_compat::ForwardCompatibleBody::new(initial);
let mut replay = linkerd_http_body_compat::ForwardCompatibleBody::new(replay);

// Read the initial body, show that the replay does not consider itself to have reached the
// end-of-stream. Then drop the initial body, show that the replay is still not done.
Expand Down Expand Up @@ -450,7 +450,7 @@ async fn eos_only_when_fully_replayed_with_trailers() {
drop(replay);

// Read the second replay body.
let mut replay2 = crate::compat::ForwardCompatibleBody::new(replay2);
let mut replay2 = linkerd_http_body_compat::ForwardCompatibleBody::new(replay2);
replay2
.frame()
.await
Expand Down Expand Up @@ -508,7 +508,7 @@ async fn caps_buffer() {

// The request's replay should error, since we discarded the buffer when
// we hit the cap.
let mut replay = crate::compat::ForwardCompatibleBody::new(replay);
let mut replay = linkerd_http_body_compat::ForwardCompatibleBody::new(replay);
let err = replay
.frame()
.await
Expand Down Expand Up @@ -554,7 +554,7 @@ async fn caps_across_replays() {
drop(replay);

// The second replay will fail, though, because the buffer was discarded.
let mut replay2 = crate::compat::ForwardCompatibleBody::new(replay2);
let mut replay2 = linkerd_http_body_compat::ForwardCompatibleBody::new(replay2);
let err = replay2
.frame()
.await
Expand Down Expand Up @@ -638,7 +638,7 @@ async fn size_hint_is_correct_across_replays() {
assert_eq!(chunk(&mut initial).await.as_deref(), Some(BODY));
let initial = {
// TODO(kate): the initial body doesn't report ending until it has (not) yielded trailers.
let mut body = crate::compat::ForwardCompatibleBody::new(initial);
let mut body = linkerd_http_body_compat::ForwardCompatibleBody::new(initial);
assert!(body.frame().await.is_none());
body.into_inner()
};
Expand All @@ -661,7 +661,7 @@ async fn size_hint_is_correct_across_replays() {
assert_eq!(chunk(&mut replay).await.as_deref(), Some(BODY));
// let replay = {
// // TODO(kate): the replay doesn't report ending until it has (not) yielded trailers.
// let mut body = crate::compat::ForwardCompatibleBody::new(replay);
// let mut body = linkerd_http_body_compat::ForwardCompatibleBody::new(replay);
// assert!(body.frame().await.is_none());
// body.into_inner()
// };
Expand Down Expand Up @@ -770,7 +770,7 @@ where
T: http_body::Body + Unpin,
{
tracing::trace!("waiting for a body chunk...");
let chunk = crate::compat::ForwardCompatibleBody::new(body)
let chunk = linkerd_http_body_compat::ForwardCompatibleBody::new(body)
.frame()
.await
.expect("yields a result")
Expand All @@ -788,7 +788,7 @@ where
B: http_body::Body + Unpin,
B::Error: std::fmt::Debug,
{
let mut body = crate::compat::ForwardCompatibleBody::new(body);
let mut body = linkerd_http_body_compat::ForwardCompatibleBody::new(body);
let mut data = String::new();
let mut trailers = None;

Expand Down
Loading