From d0ac05e9aceb9aa6ac6e26067d1a263252bd3637 Mon Sep 17 00:00:00 2001 From: Andy Caldwell Date: Fri, 21 Oct 2022 21:39:06 +0100 Subject: [PATCH 1/2] server::conn::http1::Connection always holds a proto --- src/body/body.rs | 4 ++- src/body/mod.rs | 1 + src/error.rs | 13 ---------- src/server/conn/http1.rs | 56 +++++++++++++++------------------------- 4 files changed, 25 insertions(+), 49 deletions(-) diff --git a/src/body/body.rs b/src/body/body.rs index 616e617492..a5a5bc9abd 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -342,6 +342,7 @@ impl Sender { /// This is mostly useful for when trying to send from some other thread /// that doesn't have an async context. If in an async context, prefer /// `send_data()` instead. + #[cfg(feature = "http1")] pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> { self.data_tx .try_send(Ok(chunk)) @@ -447,7 +448,7 @@ mod tests { assert!(err.is_body_write_aborted(), "{:?}", err); } - #[cfg(not(miri))] + #[cfg(all(not(miri), feature = "http1"))] #[tokio::test] async fn channel_abort_when_buffer_is_full() { let (mut tx, mut rx) = Recv::channel(); @@ -463,6 +464,7 @@ mod tests { assert!(err.is_body_write_aborted(), "{:?}", err); } + #[cfg(feature = "http1")] #[test] fn channel_buffers_one() { let (mut tx, _rx) = Recv::channel(); diff --git a/src/body/mod.rs b/src/body/mod.rs index 30ee91ad7e..c910db01b8 100644 --- a/src/body/mod.rs +++ b/src/body/mod.rs @@ -21,6 +21,7 @@ pub use http_body::SizeHint; pub use self::aggregate::aggregate; pub use self::body::Recv; +#[cfg(feature = "http1")] pub(crate) use self::body::Sender; pub(crate) use self::length::DecodedLength; pub use self::to_bytes::to_bytes; diff --git a/src/error.rs b/src/error.rs index 6314a2fe55..bc4414ab78 100644 --- a/src/error.rs +++ b/src/error.rs @@ -110,10 +110,6 @@ pub(super) enum User { #[cfg(feature = "http1")] ManualUpgrade, - /// User called `server::Connection::without_shutdown()` on an HTTP/2 conn. - #[cfg(feature = "server")] - WithoutShutdownNonHttp1, - /// User aborted in an FFI callback. #[cfg(feature = "ffi")] AbortedByCallback, @@ -308,11 +304,6 @@ impl Error { Error::new_user(User::Body).with(cause) } - #[cfg(feature = "server")] - pub(super) fn new_without_shutdown_not_h1() -> Error { - Error::new(Kind::User(User::WithoutShutdownNonHttp1)) - } - #[cfg(feature = "http1")] pub(super) fn new_shutdown(cause: std::io::Error) -> Error { Error::new(Kind::Shutdown).with(cause) @@ -399,10 +390,6 @@ impl Error { Kind::User(User::NoUpgrade) => "no upgrade available", #[cfg(feature = "http1")] Kind::User(User::ManualUpgrade) => "upgrade expected but low level API in use", - #[cfg(feature = "server")] - Kind::User(User::WithoutShutdownNonHttp1) => { - "without_shutdown() called on a non-HTTP/1 connection" - } #[cfg(feature = "ffi")] Kind::User(User::AbortedByCallback) => "operation aborted by an application callback", } diff --git a/src/server/conn/http1.rs b/src/server/conn/http1.rs index 48e0872e4a..8911553327 100644 --- a/src/server/conn/http1.rs +++ b/src/server/conn/http1.rs @@ -27,7 +27,7 @@ pin_project_lite::pin_project! { where S: HttpService, { - conn: Option>, + conn: Http1Dispatcher, } } @@ -98,12 +98,7 @@ where /// pending. If called after `Connection::poll` has resolved, this does /// nothing. pub fn graceful_shutdown(mut self: Pin<&mut Self>) { - match self.conn { - Some(ref mut h1) => { - h1.disable_keep_alive(); - } - None => (), - } + self.conn.disable_keep_alive(); } /// Return the inner IO object, and additional information. @@ -116,25 +111,20 @@ where /// # Panics /// This method will panic if this connection is using an h2 protocol. pub fn into_parts(self) -> Parts { - self.try_into_parts() - .unwrap_or_else(|| panic!("h2 cannot into_inner")) + let (io, read_buf, dispatch) = self.conn.into_inner(); + Parts { + io, + read_buf, + service: dispatch.into_service(), + _inner: (), + } } /// Return the inner IO object, and additional information, if available. /// /// - /// TODO:(mike) does this need to return none for h1 or is it expected to always be present? previously used an "unwrap" - /// This method will return a `None` if this connection is using an h2 protocol. pub fn try_into_parts(self) -> Option> { - self.conn.map(|h1| { - let (io, read_buf, dispatch) = h1.into_inner(); - Parts { - io, - read_buf, - service: dispatch.into_service(), - _inner: (), - } - }) + Some(self.into_parts()) } /// Poll the connection for completion, but without calling `shutdown` @@ -150,7 +140,7 @@ where S::Future: Unpin, B: Unpin, { - self.conn.as_mut().unwrap().poll_without_shutdown(cx) + self.conn.poll_without_shutdown(cx) } /// Prevent shutdown of the underlying IO object at the end of service the request, @@ -165,15 +155,11 @@ where S::Future: Unpin, B: Unpin, { - // TODO(mike): "new_without_shutdown_not_h1" is not possible here - let mut conn = Some(self); + let mut zelf = Some(self); futures_util::future::poll_fn(move |cx| { - ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?; + ready!(zelf.as_mut().unwrap().conn.poll_without_shutdown(cx))?; Poll::Ready( - conn.take() - .unwrap() - .try_into_parts() - .ok_or_else(crate::Error::new_without_shutdown_not_h1), + Ok(zelf.take().unwrap().into_parts()) ) }) } @@ -185,7 +171,7 @@ where where I: Send, { - upgrades::UpgradeableConnection { inner: self } + upgrades::UpgradeableConnection { inner: Some(self) } } } @@ -201,7 +187,7 @@ where type Output = crate::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - match ready!(Pin::new(self.conn.as_mut().unwrap()).poll(cx)) { + match ready!(Pin::new(&mut self.conn).poll(cx)) { Ok(done) => { match done { proto::Dispatched::Shutdown => {} @@ -417,7 +403,7 @@ impl Builder { let sd = proto::h1::dispatch::Server::new(service); let proto = proto::h1::Dispatcher::new(sd, conn); Connection { - conn: Some(proto), + conn: proto, } } } @@ -436,7 +422,7 @@ mod upgrades { where S: HttpService, { - pub(super) inner: Connection, + pub(super) inner: Option>, } impl UpgradeableConnection @@ -452,7 +438,7 @@ mod upgrades { /// This `Connection` should continue to be polled until shutdown /// can finish. pub fn graceful_shutdown(mut self: Pin<&mut Self>) { - Pin::new(&mut self.inner).graceful_shutdown() + Pin::new(self.inner.as_mut().unwrap()).graceful_shutdown() } } @@ -467,10 +453,10 @@ mod upgrades { type Output = crate::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - match ready!(Pin::new(self.inner.conn.as_mut().unwrap()).poll(cx)) { + match ready!(Pin::new(&mut self.inner.as_mut().unwrap().conn).poll(cx)) { Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())), Ok(proto::Dispatched::Upgrade(pending)) => { - let (io, buf, _) = self.inner.conn.take().unwrap().into_inner(); + let (io, buf, _) = self.inner.take().unwrap().conn.into_inner(); pending.fulfill(Upgraded::new(io, buf)); Poll::Ready(Ok(())) } From bb5178e5c1a5b349618d21d39bba67e35a105765 Mon Sep 17 00:00:00 2001 From: Andy Caldwell Date: Fri, 21 Oct 2022 23:17:01 +0100 Subject: [PATCH 2/2] Remove `Connection::try_into_parts` as it can't fail --- src/server/conn/http1.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/src/server/conn/http1.rs b/src/server/conn/http1.rs index 8911553327..421ef9cd72 100644 --- a/src/server/conn/http1.rs +++ b/src/server/conn/http1.rs @@ -120,13 +120,6 @@ where } } - /// Return the inner IO object, and additional information, if available. - /// - /// - pub fn try_into_parts(self) -> Option> { - Some(self.into_parts()) - } - /// Poll the connection for completion, but without calling `shutdown` /// on the underlying IO. ///