From c0ad90b6c4fc2f893acb599fd80a5fe91a040301 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Wed, 12 Feb 2025 00:00:00 +0000 Subject: [PATCH] refactor(mock/http-body): outline `MockBody` test body `MockBody` is a type that we use to implement tests for our `peek_trailers::PeekTrailersBody` body middleware. this is a useful tool for mocking the polling outcomes of the inner body we wrap, which would be useful for testing other `http_body::Body` middleware. this commit moves `MockBody` out of `linkerd-http-retry`, and into a new `linkerd-mock-http-body` crate. this is added as a test dependency for the retry crate, and can now be used (rather than vendored) by other bodies. Signed-off-by: katelyn martin --- Cargo.lock | 12 +++ Cargo.toml | 1 + linkerd/http/retry/Cargo.toml | 1 + linkerd/http/retry/src/peek_trailers.rs | 95 +--------------------- linkerd/mock/http-body/Cargo.toml | 17 ++++ linkerd/mock/http-body/src/lib.rs | 101 ++++++++++++++++++++++++ 6 files changed, 134 insertions(+), 93 deletions(-) create mode 100644 linkerd/mock/http-body/Cargo.toml create mode 100644 linkerd/mock/http-body/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index dc96494a19..7a45849eee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1815,6 +1815,7 @@ dependencies = [ "linkerd-exp-backoff", "linkerd-http-box", "linkerd-metrics", + "linkerd-mock-http-body", "linkerd-stack", "linkerd-tracing", "parking_lot", @@ -2023,6 +2024,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "linkerd-mock-http-body" +version = "0.1.0" +dependencies = [ + "bytes", + "http", + "http-body", + "linkerd-error", + "tokio", +] + [[package]] name = "linkerd-opaq-route" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 30c3d87e21..925307856a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,6 +47,7 @@ members = [ "linkerd/meshtls/rustls", "linkerd/meshtls/verifier", "linkerd/metrics", + "linkerd/mock/http-body", "linkerd/opaq-route", "linkerd/opencensus", "linkerd/opentelemetry", diff --git a/linkerd/http/retry/Cargo.toml b/linkerd/http/retry/Cargo.toml index 09e15bd356..a768f00648 100644 --- a/linkerd/http/retry/Cargo.toml +++ b/linkerd/http/retry/Cargo.toml @@ -27,4 +27,5 @@ linkerd-stack = { path = "../../stack" } [dev-dependencies] hyper = { workspace = true, features = ["deprecated"] } linkerd-tracing = { path = "../../tracing", features = ["ansi"] } +linkerd-mock-http-body = { path = "../../mock/http-body" } tokio = { version = "1", features = ["macros", "rt"] } diff --git a/linkerd/http/retry/src/peek_trailers.rs b/linkerd/http/retry/src/peek_trailers.rs index b11bb583ec..fc0c2bf299 100644 --- a/linkerd/http/retry/src/peek_trailers.rs +++ b/linkerd/http/retry/src/peek_trailers.rs @@ -349,21 +349,8 @@ mod tests { use http::{HeaderMap, HeaderValue}; use http_body::Body; use linkerd_error::Error; - use std::{ - collections::VecDeque, - ops::Not, - pin::Pin, - task::{Context, Poll}, - }; - - /// A "mock" body. - /// - /// This type contains polling results for [`Body`]. - #[derive(Default)] - struct MockBody { - data_polls: VecDeque>>>, - trailer_polls: VecDeque, Error>>>, - } + use linkerd_mock_http_body::MockBody; + use std::{ops::Not, task::Poll}; fn data() -> Option> { let bytes = Bytes::from_static(b"hello"); @@ -441,82 +428,4 @@ mod tests { assert!(peek.peek_trailers().is_none()); assert!(peek.is_end_stream().not()); } - - // === impl MockBody === - - impl MockBody { - /// Appends a poll outcome for [`Body::poll_data()`]. - fn then_yield_data(mut self, poll: Poll>>) -> Self { - self.data_polls.push_back(poll); - self - } - - /// Appends a poll outcome for [`Body::poll_trailers()`]. - fn then_yield_trailer( - mut self, - poll: Poll, Error>>, - ) -> Self { - self.trailer_polls.push_back(poll); - self - } - - /// Schedules a task to be awoken. - fn schedule(cx: &Context<'_>) { - let waker = cx.waker().clone(); - tokio::spawn(async move { - waker.wake(); - }); - } - } - - impl Body for MockBody { - type Data = Bytes; - type Error = Error; - - fn poll_data( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll>> { - let poll = self - .get_mut() - .data_polls - .pop_front() - .unwrap_or(Poll::Ready(None)); - // If we return `Poll::Pending`, we must schedule the task to be awoken. - if poll.is_pending() { - Self::schedule(cx); - } - poll - } - - fn poll_trailers( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, Self::Error>> { - let Self { - data_polls, - trailer_polls, - } = self.get_mut(); - - let poll = if data_polls.is_empty() { - trailer_polls.pop_front().unwrap_or(Poll::Ready(Ok(None))) - } else { - // The called has polled for trailers before exhausting the stream of DATA frames. - // This indicates `PeekTrailersBody` isn't respecting the contract outlined in - // . - panic!("`poll_trailers()` was called before `poll_data()` returned `Poll::Ready(None)`"); - }; - - // If we return `Poll::Pending`, we must schedule the task to be awoken. - if poll.is_pending() { - Self::schedule(cx); - } - - poll - } - - fn is_end_stream(&self) -> bool { - self.data_polls.is_empty() && self.trailer_polls.is_empty() - } - } } diff --git a/linkerd/mock/http-body/Cargo.toml b/linkerd/mock/http-body/Cargo.toml new file mode 100644 index 0000000000..e545855c45 --- /dev/null +++ b/linkerd/mock/http-body/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "linkerd-mock-http-body" +version = "0.1.0" +authors = ["Linkerd Developers "] +license = "Apache-2.0" +edition = "2021" +publish = false +description = """ +Mock `http_body::Body` facilities for use in tests. +""" + +[dependencies] +bytes = { workspace = true } +http = { workspace = true } +http-body = { workspace = true } +linkerd-error = { path = "../../error" } +tokio = { version = "1", default-features = false, features = ["rt"] } diff --git a/linkerd/mock/http-body/src/lib.rs b/linkerd/mock/http-body/src/lib.rs new file mode 100644 index 0000000000..51b2632ee7 --- /dev/null +++ b/linkerd/mock/http-body/src/lib.rs @@ -0,0 +1,101 @@ +//! Mock [`http_body::Body`] facilities for use in tests. +//! +//! See [`MockBody`] for more information. + +use bytes::Bytes; +use http_body::Body; +use linkerd_error::Error; +use std::{ + collections::VecDeque, + pin::Pin, + task::{Context, Poll}, +}; + +/// A "mock" body. +/// +/// This type contains polling results for [`Body`]. +#[derive(Default)] +pub struct MockBody { + data_polls: VecDeque>>>, + trailer_polls: VecDeque, Error>>>, +} + +// === impl MockBody === + +impl MockBody { + /// Appends a poll outcome for [`Body::poll_data()`]. + pub fn then_yield_data(mut self, poll: Poll>>) -> Self { + self.data_polls.push_back(poll); + self + } + + /// Appends a poll outcome for [`Body::poll_trailers()`]. + pub fn then_yield_trailer( + mut self, + poll: Poll, Error>>, + ) -> Self { + self.trailer_polls.push_back(poll); + self + } + + /// Schedules a task to be awoken. + fn schedule(cx: &Context<'_>) { + let waker = cx.waker().clone(); + tokio::spawn(async move { + waker.wake(); + }); + } +} + +impl Body for MockBody { + type Data = Bytes; + type Error = Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let poll = self + .get_mut() + .data_polls + .pop_front() + .unwrap_or(Poll::Ready(None)); + // If we return `Poll::Pending`, we must schedule the task to be awoken. + if poll.is_pending() { + Self::schedule(cx); + } + poll + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + let Self { + data_polls, + trailer_polls, + } = self.get_mut(); + + let poll = if data_polls.is_empty() { + trailer_polls.pop_front().unwrap_or(Poll::Ready(Ok(None))) + } else { + // The caller has polled for trailers before exhausting the stream of DATA frames. + // This indicates `PeekTrailersBody` isn't respecting the contract outlined in + // . + panic!( + "`poll_trailers()` was called before `poll_data()` returned `Poll::Ready(None)`" + ); + }; + + // If we return `Poll::Pending`, we must schedule the task to be awoken. + if poll.is_pending() { + Self::schedule(cx); + } + + poll + } + + fn is_end_stream(&self) -> bool { + self.data_polls.is_empty() && self.trailer_polls.is_empty() + } +}