Skip to content

Commit 0438359

Browse files
Robert-Cunninghamseanmonstar
authored andcommitted
feat(rt): add Timer trait
This adds a `hyper::rt::Timer` trait, and it is used in connection builders to configure a custom timer source for timeouts.
1 parent 2988baa commit 0438359

File tree

22 files changed

+468
-89
lines changed

22 files changed

+468
-89
lines changed

Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ server = []
9696
runtime = [
9797
"tokio/net",
9898
"tokio/rt",
99-
"tokio/time",
10099
]
101100

102101
# C-API support (currently unstable (no semver))

benches/support/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
2+
mod tokiort;
3+
pub use tokiort::TokioTimer;

benches/support/tokiort.rs

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
#![allow(dead_code)]
2+
//! Various runtimes for hyper
3+
use std::{
4+
pin::Pin,
5+
task::{Context, Poll},
6+
time::{Duration, Instant},
7+
};
8+
9+
use futures_util::Future;
10+
use hyper::rt::{Interval, Sleep, Timer};
11+
12+
/// An Executor that uses the tokio runtime.
13+
pub struct TokioExecutor;
14+
15+
/// A Timer that uses the tokio runtime.
16+
17+
#[derive(Clone, Debug)]
18+
pub struct TokioTimer;
19+
20+
impl Timer for TokioTimer {
21+
fn sleep(&self, duration: Duration) -> Box<dyn Sleep + Unpin> {
22+
let s = tokio::time::sleep(duration);
23+
let hs = TokioSleep { inner: Box::pin(s) };
24+
return Box::new(hs);
25+
}
26+
27+
fn sleep_until(&self, deadline: Instant) -> Box<dyn Sleep + Unpin> {
28+
return Box::new(TokioSleep {
29+
inner: Box::pin(tokio::time::sleep_until(deadline.into())),
30+
});
31+
}
32+
}
33+
34+
/// An Interval object that uses the tokio runtime.
35+
pub struct TokioInterval {
36+
inner: tokio::time::Interval,
37+
}
38+
39+
impl Interval for TokioInterval {
40+
fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<std::time::Instant> {
41+
let raw = tokio::time::Interval::poll_tick(&mut self.inner, cx);
42+
raw.map(|a| a.into_std())
43+
}
44+
}
45+
46+
struct TokioTimeout<T> {
47+
inner: Pin<Box<tokio::time::Timeout<T>>>,
48+
}
49+
50+
impl<T> Future for TokioTimeout<T>
51+
where
52+
T: Future,
53+
{
54+
type Output = Result<T::Output, tokio::time::error::Elapsed>;
55+
56+
fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
57+
self.inner.as_mut().poll(context)
58+
}
59+
}
60+
61+
// Use TokioSleep to get tokio::time::Sleep to implement Unpin.
62+
// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html
63+
pub(crate) struct TokioSleep {
64+
pub(crate) inner: Pin<Box<tokio::time::Sleep>>,
65+
}
66+
67+
impl Future for TokioSleep {
68+
type Output = ();
69+
70+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
71+
self.inner.as_mut().poll(cx)
72+
}
73+
}
74+
75+
// Use HasSleep to get tokio::time::Sleep to implement Unpin.
76+
// see https://docs.rs/tokio/latest/tokio/time/struct.Sleep.html
77+
78+
impl Sleep for TokioSleep {
79+
fn is_elapsed(&self) -> bool {
80+
self.inner.is_elapsed()
81+
}
82+
fn deadline(&self) -> Instant {
83+
self.inner.deadline().into()
84+
}
85+
fn reset(mut self: Pin<&mut Self>, deadline: Instant) {
86+
self.inner.as_mut().reset(deadline.into())
87+
}
88+
}

src/client/conn/http1.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,14 @@ use tokio::io::{AsyncRead, AsyncWrite};
1010

1111
use crate::Recv;
1212
use crate::body::Body;
13+
use super::super::dispatch;
1314
use crate::common::{
1415
exec::{BoxSendFuture, Exec},
1516
task, Future, Pin, Poll,
1617
};
17-
use crate::upgrade::Upgraded;
1818
use crate::proto;
19-
use crate::rt::Executor;
20-
use super::super::dispatch;
19+
use crate::rt::{Executor};
20+
use crate::upgrade::Upgraded;
2121

2222
type Dispatcher<T, B> =
2323
proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
@@ -120,7 +120,10 @@ where
120120
/// before calling this method.
121121
/// - Since absolute-form `Uri`s are not required, if received, they will
122122
/// be serialized as-is.
123-
pub fn send_request(&mut self, req: Request<B>) -> impl Future<Output = crate::Result<Response<Recv>>> {
123+
pub fn send_request(
124+
&mut self,
125+
req: Request<B>,
126+
) -> impl Future<Output = crate::Result<Response<Recv>>> {
124127
let sent = self.dispatch.send(req);
125128

126129
async move {
@@ -130,7 +133,7 @@ where
130133
Ok(Err(err)) => Err(err),
131134
// this is definite bug if it happens, but it shouldn't happen!
132135
Err(_canceled) => panic!("dispatch dropped without returning error"),
133-
}
136+
},
134137
Err(_req) => {
135138
tracing::debug!("connection was not ready");
136139

@@ -476,4 +479,3 @@ impl Builder {
476479
}
477480
}
478481
}
479-

src/client/conn/http2.rs

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@ use tokio::io::{AsyncRead, AsyncWrite};
1212

1313
use crate::Recv;
1414
use crate::body::Body;
15+
use super::super::dispatch;
16+
use crate::common::time::Time;
1517
use crate::common::{
1618
exec::{BoxSendFuture, Exec},
1719
task, Future, Pin, Poll,
1820
};
1921
use crate::proto;
20-
use crate::rt::Executor;
21-
use super::super::dispatch;
22+
use crate::rt::{Executor, Timer};
2223

2324
/// The sender side of an established connection.
2425
pub struct SendRequest<B> {
@@ -44,6 +45,7 @@ where
4445
#[derive(Clone, Debug)]
4546
pub struct Builder {
4647
pub(super) exec: Exec,
48+
pub(super) timer: Time,
4749
h2_builder: proto::h2::client::Config,
4850
}
4951

@@ -114,7 +116,10 @@ where
114116
/// before calling this method.
115117
/// - Since absolute-form `Uri`s are not required, if received, they will
116118
/// be serialized as-is.
117-
pub fn send_request(&mut self, req: Request<B>) -> impl Future<Output = crate::Result<Response<Recv>>> {
119+
pub fn send_request(
120+
&mut self,
121+
req: Request<B>,
122+
) -> impl Future<Output = crate::Result<Response<Recv>>> {
118123
let sent = self.dispatch.send(req);
119124

120125
async move {
@@ -124,7 +129,7 @@ where
124129
Ok(Err(err)) => Err(err),
125130
// this is definite bug if it happens, but it shouldn't happen!
126131
Err(_canceled) => panic!("dispatch dropped without returning error"),
127-
}
132+
},
128133
Err(_req) => {
129134
tracing::debug!("connection was not ready");
130135

@@ -207,6 +212,7 @@ impl Builder {
207212
pub fn new() -> Builder {
208213
Builder {
209214
exec: Exec::Default,
215+
timer: Time::Empty,
210216
h2_builder: Default::default(),
211217
}
212218
}
@@ -220,6 +226,15 @@ impl Builder {
220226
self
221227
}
222228

229+
/// Provide a timer to execute background HTTP2 tasks.
230+
pub fn timer<M>(&mut self, timer: M) -> &mut Builder
231+
where
232+
M: Timer + Send + Sync + 'static,
233+
{
234+
self.timer = Time::Timer(Arc::new(timer));
235+
self
236+
}
237+
223238
/// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
224239
/// stream-level flow control.
225240
///
@@ -398,14 +413,13 @@ impl Builder {
398413
tracing::trace!("client handshake HTTP/1");
399414

400415
let (tx, rx) = dispatch::channel();
401-
let h2 =
402-
proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec)
403-
.await?;
416+
let h2 = proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec, opts.timer)
417+
.await?;
404418
Ok((
405419
SendRequest { dispatch: tx.unbound() },
420+
//SendRequest { dispatch: tx },
406421
Connection { inner: (PhantomData, h2) },
407422
))
408423
}
409424
}
410425
}
411-

src/client/conn/mod.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ use crate::rt::Executor;
8585
#[cfg(feature = "http1")]
8686
use crate::upgrade::Upgraded;
8787
use crate::{Recv, Request, Response};
88+
use crate::{common::time::Time, rt::Timer};
8889

8990
#[cfg(feature = "http1")]
9091
pub mod http1;
@@ -161,6 +162,7 @@ where
161162
#[derive(Clone, Debug)]
162163
pub struct Builder {
163164
pub(super) exec: Exec,
165+
pub(super) timer: Time,
164166
h09_responses: bool,
165167
h1_parser_config: ParserConfig,
166168
h1_writev: Option<bool>,
@@ -418,6 +420,7 @@ impl Builder {
418420
pub fn new() -> Builder {
419421
Builder {
420422
exec: Exec::Default,
423+
timer: Time::Empty,
421424
h09_responses: false,
422425
h1_writev: None,
423426
h1_read_buf_exact_size: None,
@@ -447,6 +450,15 @@ impl Builder {
447450
self
448451
}
449452

453+
/// Provide a timer to execute background HTTP2 tasks.
454+
pub fn timer<M>(&mut self, timer: M) -> &mut Builder
455+
where
456+
M: Timer + Send + Sync + 'static,
457+
{
458+
self.timer = Time::Timer(Arc::new(timer));
459+
self
460+
}
461+
450462
/// Set whether HTTP/0.9 responses should be tolerated.
451463
///
452464
/// Default is false.
@@ -857,9 +869,14 @@ impl Builder {
857869
}
858870
#[cfg(feature = "http2")]
859871
Proto::Http2 => {
860-
let h2 =
861-
proto::h2::client::handshake(io, rx, &opts.h2_builder, opts.exec.clone())
862-
.await?;
872+
let h2 = proto::h2::client::handshake(
873+
io,
874+
rx,
875+
&opts.h2_builder,
876+
opts.exec.clone(),
877+
opts.timer.clone(),
878+
)
879+
.await?;
863880
ProtoClient::H2 { h2 }
864881
}
865882
};

src/common/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ pub(crate) mod exec;
1515
pub(crate) mod io;
1616
mod never;
1717
pub(crate) mod task;
18+
#[cfg(any(feature = "http1", feature = "http2", feature = "server"))]
19+
pub(crate) mod time;
1820
pub(crate) mod watch;
1921

2022
#[cfg(any(feature = "http1", feature = "http2", feature = "runtime"))]

src/common/time.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#[cfg(all(feature = "server", feature = "runtime"))]
2+
use std::time::{Duration, Instant};
3+
use std::{fmt, sync::Arc};
4+
5+
#[cfg(all(feature = "server", feature = "runtime"))]
6+
use crate::rt::Sleep;
7+
use crate::rt::Timer;
8+
9+
/// A user-provided timer to time background tasks.
10+
#[derive(Clone)]
11+
pub(crate) enum Time {
12+
Timer(Arc<dyn Timer + Send + Sync>),
13+
Empty,
14+
}
15+
16+
impl fmt::Debug for Time {
17+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
18+
f.debug_struct("Time").finish()
19+
}
20+
}
21+
22+
/*
23+
pub(crate) fn timeout<F>(tim: Tim, future: F, duration: Duration) -> HyperTimeout<F> {
24+
HyperTimeout { sleep: tim.sleep(duration), future: future }
25+
}
26+
27+
pin_project_lite::pin_project! {
28+
pub(crate) struct HyperTimeout<F> {
29+
sleep: Box<dyn Sleep>,
30+
#[pin]
31+
future: F
32+
}
33+
}
34+
35+
pub(crate) struct Timeout;
36+
37+
impl<F> Future for HyperTimeout<F> where F: Future {
38+
39+
type Output = Result<F::Output, Timeout>;
40+
41+
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output>{
42+
let mut this = self.project();
43+
if let Poll::Ready(v) = this.future.poll(ctx) {
44+
return Poll::Ready(Ok(v));
45+
}
46+
47+
if let Poll::Ready(_) = Pin::new(&mut this.sleep).poll(ctx) {
48+
return Poll::Ready(Err(Timeout));
49+
}
50+
51+
return Poll::Pending;
52+
}
53+
}
54+
*/
55+
56+
#[cfg(all(feature = "server", feature = "runtime"))]
57+
impl Time {
58+
pub(crate) fn sleep(&self, duration: Duration) -> Box<dyn Sleep + Unpin> {
59+
match *self {
60+
Time::Empty => {
61+
panic!("You must supply a timer.")
62+
}
63+
Time::Timer(ref t) => t.sleep(duration),
64+
}
65+
}
66+
67+
pub(crate) fn sleep_until(&self, deadline: Instant) -> Box<dyn Sleep + Unpin> {
68+
match *self {
69+
Time::Empty => {
70+
panic!("You must supply a timer.")
71+
}
72+
Time::Timer(ref t) => t.sleep_until(deadline),
73+
}
74+
}
75+
}

0 commit comments

Comments
 (0)