diff --git a/src/body/body.rs b/src/body/body.rs index 616e617492..a5a5bc9abd 100644 --- a/src/body/body.rs +++ b/src/body/body.rs @@ -342,6 +342,7 @@ impl Sender { /// This is mostly useful for when trying to send from some other thread /// that doesn't have an async context. If in an async context, prefer /// `send_data()` instead. + #[cfg(feature = "http1")] pub(crate) fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> { self.data_tx .try_send(Ok(chunk)) @@ -447,7 +448,7 @@ mod tests { assert!(err.is_body_write_aborted(), "{:?}", err); } - #[cfg(not(miri))] + #[cfg(all(not(miri), feature = "http1"))] #[tokio::test] async fn channel_abort_when_buffer_is_full() { let (mut tx, mut rx) = Recv::channel(); @@ -463,6 +464,7 @@ mod tests { assert!(err.is_body_write_aborted(), "{:?}", err); } + #[cfg(feature = "http1")] #[test] fn channel_buffers_one() { let (mut tx, _rx) = Recv::channel(); diff --git a/src/body/mod.rs b/src/body/mod.rs index 30ee91ad7e..c910db01b8 100644 --- a/src/body/mod.rs +++ b/src/body/mod.rs @@ -21,6 +21,7 @@ pub use http_body::SizeHint; pub use self::aggregate::aggregate; pub use self::body::Recv; +#[cfg(feature = "http1")] pub(crate) use self::body::Sender; pub(crate) use self::length::DecodedLength; pub use self::to_bytes::to_bytes; diff --git a/src/error.rs b/src/error.rs index 6314a2fe55..bc4414ab78 100644 --- a/src/error.rs +++ b/src/error.rs @@ -110,10 +110,6 @@ pub(super) enum User { #[cfg(feature = "http1")] ManualUpgrade, - /// User called `server::Connection::without_shutdown()` on an HTTP/2 conn. - #[cfg(feature = "server")] - WithoutShutdownNonHttp1, - /// User aborted in an FFI callback. #[cfg(feature = "ffi")] AbortedByCallback, @@ -308,11 +304,6 @@ impl Error { Error::new_user(User::Body).with(cause) } - #[cfg(feature = "server")] - pub(super) fn new_without_shutdown_not_h1() -> Error { - Error::new(Kind::User(User::WithoutShutdownNonHttp1)) - } - #[cfg(feature = "http1")] pub(super) fn new_shutdown(cause: std::io::Error) -> Error { Error::new(Kind::Shutdown).with(cause) @@ -399,10 +390,6 @@ impl Error { Kind::User(User::NoUpgrade) => "no upgrade available", #[cfg(feature = "http1")] Kind::User(User::ManualUpgrade) => "upgrade expected but low level API in use", - #[cfg(feature = "server")] - Kind::User(User::WithoutShutdownNonHttp1) => { - "without_shutdown() called on a non-HTTP/1 connection" - } #[cfg(feature = "ffi")] Kind::User(User::AbortedByCallback) => "operation aborted by an application callback", } diff --git a/src/server/conn/http1.rs b/src/server/conn/http1.rs index 48e0872e4a..421ef9cd72 100644 --- a/src/server/conn/http1.rs +++ b/src/server/conn/http1.rs @@ -27,7 +27,7 @@ pin_project_lite::pin_project! { where S: HttpService, { - conn: Option>, + conn: Http1Dispatcher, } } @@ -98,12 +98,7 @@ where /// pending. If called after `Connection::poll` has resolved, this does /// nothing. pub fn graceful_shutdown(mut self: Pin<&mut Self>) { - match self.conn { - Some(ref mut h1) => { - h1.disable_keep_alive(); - } - None => (), - } + self.conn.disable_keep_alive(); } /// Return the inner IO object, and additional information. @@ -116,25 +111,13 @@ where /// # Panics /// This method will panic if this connection is using an h2 protocol. pub fn into_parts(self) -> Parts { - self.try_into_parts() - .unwrap_or_else(|| panic!("h2 cannot into_inner")) - } - - /// Return the inner IO object, and additional information, if available. - /// - /// - /// TODO:(mike) does this need to return none for h1 or is it expected to always be present? previously used an "unwrap" - /// This method will return a `None` if this connection is using an h2 protocol. - pub fn try_into_parts(self) -> Option> { - self.conn.map(|h1| { - let (io, read_buf, dispatch) = h1.into_inner(); - Parts { - io, - read_buf, - service: dispatch.into_service(), - _inner: (), - } - }) + let (io, read_buf, dispatch) = self.conn.into_inner(); + Parts { + io, + read_buf, + service: dispatch.into_service(), + _inner: (), + } } /// Poll the connection for completion, but without calling `shutdown` @@ -150,7 +133,7 @@ where S::Future: Unpin, B: Unpin, { - self.conn.as_mut().unwrap().poll_without_shutdown(cx) + self.conn.poll_without_shutdown(cx) } /// Prevent shutdown of the underlying IO object at the end of service the request, @@ -165,15 +148,11 @@ where S::Future: Unpin, B: Unpin, { - // TODO(mike): "new_without_shutdown_not_h1" is not possible here - let mut conn = Some(self); + let mut zelf = Some(self); futures_util::future::poll_fn(move |cx| { - ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?; + ready!(zelf.as_mut().unwrap().conn.poll_without_shutdown(cx))?; Poll::Ready( - conn.take() - .unwrap() - .try_into_parts() - .ok_or_else(crate::Error::new_without_shutdown_not_h1), + Ok(zelf.take().unwrap().into_parts()) ) }) } @@ -185,7 +164,7 @@ where where I: Send, { - upgrades::UpgradeableConnection { inner: self } + upgrades::UpgradeableConnection { inner: Some(self) } } } @@ -201,7 +180,7 @@ where type Output = crate::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - match ready!(Pin::new(self.conn.as_mut().unwrap()).poll(cx)) { + match ready!(Pin::new(&mut self.conn).poll(cx)) { Ok(done) => { match done { proto::Dispatched::Shutdown => {} @@ -417,7 +396,7 @@ impl Builder { let sd = proto::h1::dispatch::Server::new(service); let proto = proto::h1::Dispatcher::new(sd, conn); Connection { - conn: Some(proto), + conn: proto, } } } @@ -436,7 +415,7 @@ mod upgrades { where S: HttpService, { - pub(super) inner: Connection, + pub(super) inner: Option>, } impl UpgradeableConnection @@ -452,7 +431,7 @@ mod upgrades { /// This `Connection` should continue to be polled until shutdown /// can finish. pub fn graceful_shutdown(mut self: Pin<&mut Self>) { - Pin::new(&mut self.inner).graceful_shutdown() + Pin::new(self.inner.as_mut().unwrap()).graceful_shutdown() } } @@ -467,10 +446,10 @@ mod upgrades { type Output = crate::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { - match ready!(Pin::new(self.inner.conn.as_mut().unwrap()).poll(cx)) { + match ready!(Pin::new(&mut self.inner.as_mut().unwrap().conn).poll(cx)) { Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())), Ok(proto::Dispatched::Upgrade(pending)) => { - let (io, buf, _) = self.inner.conn.take().unwrap().into_inner(); + let (io, buf, _) = self.inner.take().unwrap().conn.into_inner(); pending.fulfill(Upgraded::new(io, buf)); Poll::Ready(Ok(())) }