Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1815,6 +1815,7 @@ dependencies = [
"linkerd-exp-backoff",
"linkerd-http-box",
"linkerd-metrics",
"linkerd-mock-http-body",
"linkerd-stack",
"linkerd-tracing",
"parking_lot",
Expand Down Expand Up @@ -2023,6 +2024,17 @@ dependencies = [
"tracing",
]

[[package]]
name = "linkerd-mock-http-body"
version = "0.1.0"
dependencies = [
"bytes",
"http",
"http-body",
"linkerd-error",
"tokio",
]

[[package]]
name = "linkerd-opaq-route"
version = "0.1.0"
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ members = [
"linkerd/meshtls/rustls",
"linkerd/meshtls/verifier",
"linkerd/metrics",
"linkerd/mock/http-body",
"linkerd/opaq-route",
"linkerd/opencensus",
"linkerd/opentelemetry",
Expand Down
1 change: 1 addition & 0 deletions linkerd/http/retry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ linkerd-stack = { path = "../../stack" }
[dev-dependencies]
hyper = { workspace = true, features = ["deprecated"] }
linkerd-tracing = { path = "../../tracing", features = ["ansi"] }
linkerd-mock-http-body = { path = "../../mock/http-body" }
tokio = { version = "1", features = ["macros", "rt"] }
95 changes: 2 additions & 93 deletions linkerd/http/retry/src/peek_trailers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,21 +349,8 @@ mod tests {
use http::{HeaderMap, HeaderValue};
use http_body::Body;
use linkerd_error::Error;
use std::{
collections::VecDeque,
ops::Not,
pin::Pin,
task::{Context, Poll},
};

/// A "mock" body.
///
/// This type contains polling results for [`Body`].
#[derive(Default)]
struct MockBody {
data_polls: VecDeque<Poll<Option<Result<Bytes, Error>>>>,
trailer_polls: VecDeque<Poll<Result<Option<http::HeaderMap>, Error>>>,
}
use linkerd_mock_http_body::MockBody;
use std::{ops::Not, task::Poll};

fn data() -> Option<Result<Bytes, Error>> {
let bytes = Bytes::from_static(b"hello");
Expand Down Expand Up @@ -441,82 +428,4 @@ mod tests {
assert!(peek.peek_trailers().is_none());
assert!(peek.is_end_stream().not());
}

// === impl MockBody ===

impl MockBody {
/// Appends a poll outcome for [`Body::poll_data()`].
fn then_yield_data(mut self, poll: Poll<Option<Result<Bytes, Error>>>) -> Self {
self.data_polls.push_back(poll);
self
}

/// Appends a poll outcome for [`Body::poll_trailers()`].
fn then_yield_trailer(
mut self,
poll: Poll<Result<Option<http::HeaderMap>, Error>>,
) -> Self {
self.trailer_polls.push_back(poll);
self
}

/// Schedules a task to be awoken.
fn schedule(cx: &Context<'_>) {
let waker = cx.waker().clone();
tokio::spawn(async move {
waker.wake();
});
}
}

impl Body for MockBody {
type Data = Bytes;
type Error = Error;

fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let poll = self
.get_mut()
.data_polls
.pop_front()
.unwrap_or(Poll::Ready(None));
// If we return `Poll::Pending`, we must schedule the task to be awoken.
if poll.is_pending() {
Self::schedule(cx);
}
poll
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let Self {
data_polls,
trailer_polls,
} = self.get_mut();

let poll = if data_polls.is_empty() {
trailer_polls.pop_front().unwrap_or(Poll::Ready(Ok(None)))
} else {
// The called has polled for trailers before exhausting the stream of DATA frames.
// This indicates `PeekTrailersBody<B>` isn't respecting the contract outlined in
// <https://docs.rs/http-body/0.4.6/http_body/trait.Body.html#tymethod.poll_trailers>.
panic!("`poll_trailers()` was called before `poll_data()` returned `Poll::Ready(None)`");
};

// If we return `Poll::Pending`, we must schedule the task to be awoken.
if poll.is_pending() {
Self::schedule(cx);
}

poll
}

fn is_end_stream(&self) -> bool {
self.data_polls.is_empty() && self.trailer_polls.is_empty()
}
}
}
17 changes: 17 additions & 0 deletions linkerd/mock/http-body/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
[package]
name = "linkerd-mock-http-body"
version = "0.1.0"
authors = ["Linkerd Developers <[email protected]>"]
license = "Apache-2.0"
edition = "2021"
publish = false
description = """
Mock `http_body::Body` facilities for use in tests.
"""

[dependencies]
bytes = { workspace = true }
http = { workspace = true }
http-body = { workspace = true }
linkerd-error = { path = "../../error" }
tokio = { version = "1", default-features = false, features = ["rt"] }
101 changes: 101 additions & 0 deletions linkerd/mock/http-body/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
//! Mock [`http_body::Body`] facilities for use in tests.
//!
//! See [`MockBody`] for more information.

use bytes::Bytes;
use http_body::Body;
use linkerd_error::Error;
use std::{
collections::VecDeque,
pin::Pin,
task::{Context, Poll},
};

/// A "mock" body.
///
/// This type contains polling results for [`Body`].
#[derive(Default)]
pub struct MockBody {
data_polls: VecDeque<Poll<Option<Result<Bytes, Error>>>>,
trailer_polls: VecDeque<Poll<Result<Option<http::HeaderMap>, Error>>>,
}

// === impl MockBody ===

impl MockBody {
/// Appends a poll outcome for [`Body::poll_data()`].
pub fn then_yield_data(mut self, poll: Poll<Option<Result<Bytes, Error>>>) -> Self {
self.data_polls.push_back(poll);
self
}

/// Appends a poll outcome for [`Body::poll_trailers()`].
pub fn then_yield_trailer(
mut self,
poll: Poll<Result<Option<http::HeaderMap>, Error>>,
) -> Self {
self.trailer_polls.push_back(poll);
self
}

/// Schedules a task to be awoken.
fn schedule(cx: &Context<'_>) {
let waker = cx.waker().clone();
tokio::spawn(async move {
waker.wake();
});
}
}

impl Body for MockBody {
type Data = Bytes;
type Error = Error;

fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let poll = self
.get_mut()
.data_polls
.pop_front()
.unwrap_or(Poll::Ready(None));
// If we return `Poll::Pending`, we must schedule the task to be awoken.
if poll.is_pending() {
Self::schedule(cx);
}
poll
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let Self {
data_polls,
trailer_polls,
} = self.get_mut();

let poll = if data_polls.is_empty() {
trailer_polls.pop_front().unwrap_or(Poll::Ready(Ok(None)))
} else {
// The caller has polled for trailers before exhausting the stream of DATA frames.
// This indicates `PeekTrailersBody<B>` isn't respecting the contract outlined in
// <https://docs.rs/http-body/0.4.6/http_body/trait.Body.html#tymethod.poll_trailers>.
panic!(
"`poll_trailers()` was called before `poll_data()` returned `Poll::Ready(None)`"
);
};

// If we return `Poll::Pending`, we must schedule the task to be awoken.
if poll.is_pending() {
Self::schedule(cx);
}

poll
}

fn is_end_stream(&self) -> bool {
self.data_polls.is_empty() && self.trailer_polls.is_empty()
}
}
Loading