From 5e300fa180bce472ed5a7457ec5c60547d353ddc Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Fri, 22 Oct 2021 09:16:45 -0400 Subject: [PATCH 1/4] fix(tonic): Remove `Sync` requirement for streams --- Cargo.toml | 49 +++++++++-------- examples/routeguide-tutorial.md | 4 +- examples/src/authentication/server.rs | 2 +- examples/src/dynamic_load_balance/server.rs | 2 +- examples/src/hyper_warp_multiplex/server.rs | 2 +- examples/src/load_balance/server.rs | 2 +- examples/src/multiplex/server.rs | 2 +- examples/src/routeguide/server.rs | 3 +- examples/src/streaming/server.rs | 2 +- examples/src/tls/server.rs | 2 +- examples/src/tls_client_auth/server.rs | 2 +- interop/src/server.rs | 5 +- tests/compression/src/lib.rs | 4 +- tests/integration_tests/Cargo.toml | 27 +++++----- tests/integration_tests/src/lib.rs | 51 ++++++++++++++++++ tests/integration_tests/tests/status.rs | 59 ++------------------- tests/integration_tests/tests/streams.rs | 47 ++++++++++++++++ tonic-build/src/client.rs | 10 ++-- tonic-build/src/server.rs | 6 +-- tonic-health/src/server.rs | 2 +- tonic-web/tests/integration/src/lib.rs | 2 +- tonic/src/body.rs | 6 ++- tonic/src/client/grpc.rs | 12 ++--- tonic/src/codec/decode.rs | 24 ++++----- tonic/src/codec/encode.rs | 12 ++--- tonic/src/codec/mod.rs | 4 +- tonic/src/codegen.rs | 4 +- tonic/src/request.rs | 6 +-- tonic/src/server/grpc.rs | 19 ++++--- tonic/src/transport/server/mod.rs | 22 ++++---- 30 files changed, 220 insertions(+), 174 deletions(-) create mode 100644 tests/integration_tests/tests/streams.rs diff --git a/Cargo.toml b/Cargo.toml index f05cc4fab..995e2be7b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,29 +1,28 @@ [workspace] members = [ - "tonic", - "tonic-build", - "tonic-health", - "tonic-types", - "tonic-reflection", - "tonic-web", - - # Non-published crates - "examples", - "interop", - - # Tests - "tests/included_service", - "tests/same_name", - "tests/service_named_service", - "tests/wellknown", - "tests/wellknown-compiled", - "tests/extern_path/uuid", - "tests/ambiguous_methods", - "tests/extern_path/my_application", - "tests/integration_tests", - "tests/stream_conflict", - "tests/root-crate-path", - "tests/compression", - "tonic-web/tests/integration" + "tonic", + "tonic-build", + "tonic-health", + "tonic-types", + "tonic-reflection", + "tonic-web", # Non-published crates + "examples", + "interop", # Tests + "tests/included_service", + "tests/same_name", + "tests/service_named_service", + "tests/wellknown", + "tests/wellknown-compiled", + "tests/extern_path/uuid", + "tests/ambiguous_methods", + "tests/extern_path/my_application", + "tests/integration_tests", + "tests/stream_conflict", + "tests/root-crate-path", + "tests/compression", + "tonic-web/tests/integration", ] +[patch.crates-io] +http-body = {path = "../http-body"} +hyper = {path = "../hyper"} diff --git a/examples/routeguide-tutorial.md b/examples/routeguide-tutorial.md index f93801468..bb54e6bac 100644 --- a/examples/routeguide-tutorial.md +++ b/examples/routeguide-tutorial.md @@ -300,7 +300,7 @@ impl RouteGuide for RouteGuideService { unimplemented!() } - type RouteChatStream = Pin> + Send + Sync + 'static>>; + type RouteChatStream = Pin> + Send + 'static>>; async fn route_chat( &self, @@ -493,7 +493,7 @@ use std::collections::HashMap; ```rust type RouteChatStream = - Pin> + Send + Sync + 'static>>; + Pin> + Send + 'static>>; async fn route_chat( diff --git a/examples/src/authentication/server.rs b/examples/src/authentication/server.rs index 0726efcc6..951154079 100644 --- a/examples/src/authentication/server.rs +++ b/examples/src/authentication/server.rs @@ -8,7 +8,7 @@ use std::pin::Pin; use tonic::{metadata::MetadataValue, transport::Server, Request, Response, Status, Streaming}; type EchoResult = Result, Status>; -type ResponseStream = Pin> + Send + Sync>>; +type ResponseStream = Pin> + Send>>; #[derive(Default)] pub struct EchoServer; diff --git a/examples/src/dynamic_load_balance/server.rs b/examples/src/dynamic_load_balance/server.rs index 8623d8f7a..aa1240658 100644 --- a/examples/src/dynamic_load_balance/server.rs +++ b/examples/src/dynamic_load_balance/server.rs @@ -11,7 +11,7 @@ use tonic::{transport::Server, Request, Response, Status, Streaming}; use pb::{EchoRequest, EchoResponse}; type EchoResult = Result, Status>; -type ResponseStream = Pin> + Send + Sync>>; +type ResponseStream = Pin> + Send>>; #[derive(Debug)] pub struct EchoServer { diff --git a/examples/src/hyper_warp_multiplex/server.rs b/examples/src/hyper_warp_multiplex/server.rs index 1e251c4a1..b31f5e81c 100644 --- a/examples/src/hyper_warp_multiplex/server.rs +++ b/examples/src/hyper_warp_multiplex/server.rs @@ -35,7 +35,7 @@ use echo::{ EchoRequest, EchoResponse, }; -type ResponseStream = Pin> + Send + Sync>>; +type ResponseStream = Pin> + Send>>; #[derive(Default)] pub struct MyGreeter {} diff --git a/examples/src/load_balance/server.rs b/examples/src/load_balance/server.rs index 8623d8f7a..aa1240658 100644 --- a/examples/src/load_balance/server.rs +++ b/examples/src/load_balance/server.rs @@ -11,7 +11,7 @@ use tonic::{transport::Server, Request, Response, Status, Streaming}; use pb::{EchoRequest, EchoResponse}; type EchoResult = Result, Status>; -type ResponseStream = Pin> + Send + Sync>>; +type ResponseStream = Pin> + Send>>; #[derive(Debug)] pub struct EchoServer { diff --git a/examples/src/multiplex/server.rs b/examples/src/multiplex/server.rs index d45fb8d50..1422fb055 100644 --- a/examples/src/multiplex/server.rs +++ b/examples/src/multiplex/server.rs @@ -20,7 +20,7 @@ use echo::{ EchoRequest, EchoResponse, }; -type ResponseStream = Pin> + Send + Sync>>; +type ResponseStream = Pin> + Send>>; #[tokio::main] async fn main() -> Result<(), Box> { diff --git a/examples/src/routeguide/server.rs b/examples/src/routeguide/server.rs index 98d15eaf2..76dfb3950 100644 --- a/examples/src/routeguide/server.rs +++ b/examples/src/routeguide/server.rs @@ -102,8 +102,7 @@ impl RouteGuide for RouteGuideService { Ok(Response::new(summary)) } - type RouteChatStream = - Pin> + Send + Sync + 'static>>; + type RouteChatStream = Pin> + Send + 'static>>; async fn route_chat( &self, diff --git a/examples/src/streaming/server.rs b/examples/src/streaming/server.rs index 2de9025e6..a09055729 100644 --- a/examples/src/streaming/server.rs +++ b/examples/src/streaming/server.rs @@ -12,7 +12,7 @@ use tonic::{transport::Server, Request, Response, Status, Streaming}; use pb::{EchoRequest, EchoResponse}; type EchoResult = Result, Status>; -type ResponseStream = Pin> + Send + Sync>>; +type ResponseStream = Pin> + Send>>; #[derive(Debug)] pub struct EchoServer {} diff --git a/examples/src/tls/server.rs b/examples/src/tls/server.rs index 723549af0..53e3e9ba8 100644 --- a/examples/src/tls/server.rs +++ b/examples/src/tls/server.rs @@ -14,7 +14,7 @@ use tonic::{ }; type EchoResult = Result, Status>; -type ResponseStream = Pin> + Send + Sync>>; +type ResponseStream = Pin> + Send>>; #[derive(Default)] pub struct EchoServer; diff --git a/examples/src/tls_client_auth/server.rs b/examples/src/tls_client_auth/server.rs index d1d88d789..be36abce4 100644 --- a/examples/src/tls_client_auth/server.rs +++ b/examples/src/tls_client_auth/server.rs @@ -9,7 +9,7 @@ use tonic::transport::{Certificate, Identity, Server, ServerTlsConfig}; use tonic::{Request, Response, Status}; type EchoResult = Result, Status>; -type ResponseStream = Pin> + Send + Sync>>; +type ResponseStream = Pin> + Send>>; #[derive(Default)] pub struct EchoServer; diff --git a/interop/src/server.rs b/interop/src/server.rs index 112ff3820..375ce1568 100644 --- a/interop/src/server.rs +++ b/interop/src/server.rs @@ -18,9 +18,8 @@ pub struct TestService; type Result = std::result::Result, Status>; type Streaming = Request>; -type Stream = Pin< - Box> + Send + Sync + 'static>, ->; +type Stream = + Pin> + Send + 'static>>; type BoxFuture = Pin> + Send + 'static>>; #[tonic::async_trait] diff --git a/tests/compression/src/lib.rs b/tests/compression/src/lib.rs index 38c12037a..bb6f5dcc8 100644 --- a/tests/compression/src/lib.rs +++ b/tests/compression/src/lib.rs @@ -69,7 +69,7 @@ impl test_server::Test for Svc { } type CompressOutputServerStreamStream = - Pin> + Send + Sync + 'static>>; + Pin> + Send + 'static>>; async fn compress_output_server_stream( &self, @@ -110,7 +110,7 @@ impl test_server::Test for Svc { } type CompressInputOutputBidirectionalStreamStream = - Pin> + Send + Sync + 'static>>; + Pin> + Send + 'static>>; async fn compress_input_output_bidirectional_stream( &self, diff --git a/tests/integration_tests/Cargo.toml b/tests/integration_tests/Cargo.toml index 9d14221a5..96863abba 100644 --- a/tests/integration_tests/Cargo.toml +++ b/tests/integration_tests/Cargo.toml @@ -1,29 +1,30 @@ [package] -name = "integration-tests" -version = "0.1.0" authors = ["Lucio Franco "] edition = "2018" -publish = false license = "MIT" +name = "integration-tests" +publish = false +version = "0.1.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tonic = { path = "../../tonic" } -prost = "0.9" -futures-util = "0.3" bytes = "1.0" +futures-util = "0.3" +prost = "0.9" +tokio = {version = "1.0", features = ["macros", "rt-multi-thread", "net"]} +tonic = {path = "../../tonic"} [dev-dependencies] -tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "net"] } -tokio-stream = { version = "0.1.5", features = ["net"] } -tower-service = "0.3" -hyper = "0.14" +async-stream = "0.3" futures = "0.3" -tower = { version = "0.4", features = [] } -http-body = "0.4" http = "0.2" +http-body = "0.4" +hyper = "0.14" +tokio-stream = {version = "0.1.5", features = ["net"]} +tower = {version = "0.4", features = []} +tower-service = "0.3" tracing-subscriber = "0.2" [build-dependencies] -tonic-build = { path = "../../tonic-build" } +tonic-build = {path = "../../tonic-build"} diff --git a/tests/integration_tests/src/lib.rs b/tests/integration_tests/src/lib.rs index 3c7987a0e..57691ed6b 100644 --- a/tests/integration_tests/src/lib.rs +++ b/tests/integration_tests/src/lib.rs @@ -2,3 +2,54 @@ pub mod pb { tonic::include_proto!("test"); tonic::include_proto!("stream"); } + +pub mod mock { + use std::{ + pin::Pin, + task::{Context, Poll}, + }; + + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + use tonic::transport::server::Connected; + + #[derive(Debug)] + pub struct MockStream(pub tokio::io::DuplexStream); + + impl Connected for MockStream { + type ConnectInfo = (); + + /// Create type holding information about the connection. + fn connect_info(&self) -> Self::ConnectInfo {} + } + + impl AsyncRead for MockStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.0).poll_read(cx, buf) + } + } + + impl AsyncWrite for MockStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.0).poll_flush(cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(&mut self.0).poll_shutdown(cx) + } + } +} diff --git a/tests/integration_tests/tests/status.rs b/tests/integration_tests/tests/status.rs index 4f395e68c..af32ba49b 100644 --- a/tests/integration_tests/tests/status.rs +++ b/tests/integration_tests/tests/status.rs @@ -1,6 +1,7 @@ use bytes::Bytes; use futures_util::FutureExt; use http::Uri; +use integration_tests::mock::MockStream; use integration_tests::pb::{ test_client, test_server, test_stream_client, test_stream_server, Input, InputStream, Output, OutputStream, @@ -125,9 +126,8 @@ async fn status_with_metadata() { jh.await.unwrap(); } -type Stream = std::pin::Pin< - Box> + Send + Sync + 'static>, ->; +type Stream = + std::pin::Pin> + Send + 'static>>; #[tokio::test] async fn status_from_server_stream() { @@ -184,7 +184,7 @@ async fn status_from_server_stream_with_source() { let channel = Endpoint::try_from("http://[::]:50051") .unwrap() .connect_with_connector_lazy(tower::service_fn(move |_: Uri| async move { - Err::(std::io::Error::new(std::io::ErrorKind::Other, "WTF")) + Err::(std::io::Error::new(std::io::ErrorKind::Other, "WTF")) })) .unwrap(); @@ -201,54 +201,3 @@ fn trace_init() { .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) .try_init(); } - -mod mock { - use std::{ - pin::Pin, - task::{Context, Poll}, - }; - - use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; - use tonic::transport::server::Connected; - - #[derive(Debug)] - pub struct MockStream(pub tokio::io::DuplexStream); - - impl Connected for MockStream { - type ConnectInfo = (); - - /// Create type holding information about the connection. - fn connect_info(&self) -> Self::ConnectInfo {} - } - - impl AsyncRead for MockStream { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - Pin::new(&mut self.0).poll_read(cx, buf) - } - } - - impl AsyncWrite for MockStream { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.0).poll_write(cx, buf) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.0).poll_flush(cx) - } - - fn poll_shutdown( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - Pin::new(&mut self.0).poll_shutdown(cx) - } - } -} diff --git a/tests/integration_tests/tests/streams.rs b/tests/integration_tests/tests/streams.rs new file mode 100644 index 000000000..152aca5bd --- /dev/null +++ b/tests/integration_tests/tests/streams.rs @@ -0,0 +1,47 @@ +use futures::FutureExt; +use integration_tests::pb::{test_stream_server, InputStream, OutputStream}; +use tonic::{transport::Server, Request, Response, Status}; + +type Stream = + std::pin::Pin> + Send + 'static>>; + +#[tokio::test] +async fn status_from_server_stream_with_source() { + struct Svc; + + #[tonic::async_trait] + impl test_stream_server::TestStream for Svc { + type StreamCallStream = Stream; + + async fn stream_call( + &self, + _: Request, + ) -> Result, Status> { + let s = Unsync(0 as *mut ()); + + Ok(Response::new(Box::pin(s) as Self::StreamCallStream)) + } + } + + let svc = test_stream_server::TestStreamServer::new(Svc); + + Server::builder() + .add_service(svc) + .serve("127.0.0.1:1339".parse().unwrap()) + .now_or_never(); +} + +struct Unsync(*mut ()); + +unsafe impl Send for Unsync {} + +impl futures::Stream for Unsync { + type Item = Result; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + unimplemented!() + } +} diff --git a/tonic-build/src/client.rs b/tonic-build/src/client.rs index 9cbc5e721..b315c5d51 100644 --- a/tonic-build/src/client.rs +++ b/tonic-build/src/client.rs @@ -57,7 +57,7 @@ pub fn generate( impl #service_ident where T: tonic::client::GrpcService, - T::ResponseBody: Body + Send + Sync + 'static, + T::ResponseBody: Body + Send + 'static, T::Error: Into, ::Error: Into + Send, { @@ -203,8 +203,8 @@ fn generate_server_streaming( tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())) })?; let codec = #codec_name::default(); - let path = http::uri::PathAndQuery::from_static(#path); - self.inner.server_streaming(request.into_request(), path, codec).await + let path = http::uri::PathAndQuery::from_static(#path); + self.inner.server_streaming(request.into_request(), path, codec).await } } } @@ -255,8 +255,8 @@ fn generate_streaming( tonic::Status::new(tonic::Code::Unknown, format!("Service was not ready: {}", e.into())) })?; let codec = #codec_name::default(); - let path = http::uri::PathAndQuery::from_static(#path); - self.inner.streaming(request.into_streaming_request(), path, codec).await + let path = http::uri::PathAndQuery::from_static(#path); + self.inner.streaming(request.into_streaming_request(), path, codec).await } } } diff --git a/tonic-build/src/server.rs b/tonic-build/src/server.rs index 9c650666d..683734cd8 100644 --- a/tonic-build/src/server.rs +++ b/tonic-build/src/server.rs @@ -115,7 +115,7 @@ pub fn generate( impl tonic::codegen::Service> for #server_service where T: #server_trait, - B: Body + Send + Sync + 'static, + B: Body + Send + 'static, B::Error: Into + Send + 'static, { type Response = http::Response; @@ -232,7 +232,7 @@ fn generate_trait_methods( quote! { #stream_doc - type #stream: futures_core::Stream> + Send + Sync + 'static; + type #stream: futures_core::Stream> + Send + 'static; #method_doc async fn #name(&self, request: tonic::Request<#req_message>) @@ -248,7 +248,7 @@ fn generate_trait_methods( quote! { #stream_doc - type #stream: futures_core::Stream> + Send + Sync + 'static; + type #stream: futures_core::Stream> + Send + 'static; #method_doc async fn #name(&self, request: tonic::Request>) diff --git a/tonic-health/src/server.rs b/tonic-health/src/server.rs index 7f032335a..5747d2c5d 100644 --- a/tonic-health/src/server.rs +++ b/tonic-health/src/server.rs @@ -133,7 +133,7 @@ impl Health for HealthService { } type WatchStream = - Pin> + Send + Sync + 'static>>; + Pin> + Send + 'static>>; async fn watch( &self, diff --git a/tonic-web/tests/integration/src/lib.rs b/tonic-web/tests/integration/src/lib.rs index b4cad9a5d..3944dc17b 100644 --- a/tonic-web/tests/integration/src/lib.rs +++ b/tonic-web/tests/integration/src/lib.rs @@ -9,7 +9,7 @@ pub mod pb { tonic::include_proto!("test"); } -type BoxStream = Pin> + Send + Sync + 'static>>; +type BoxStream = Pin> + Send + 'static>>; pub struct Svc; diff --git a/tonic/src/body.rs b/tonic/src/body.rs index ae1cb87eb..d66a4fcb2 100644 --- a/tonic/src/body.rs +++ b/tonic/src/body.rs @@ -3,11 +3,13 @@ use http_body::Body; /// A type erased HTTP body used for tonic services. -pub type BoxBody = http_body::combinators::BoxBody; +pub type BoxBody = http_body::combinators::UnsyncBoxBody; // this also exists in `crate::codegen` but we need it here since `codegen` has // `#[cfg(feature = "codegen")]`. /// Create an empty `BoxBody` pub fn empty_body() -> BoxBody { - http_body::Empty::new().map_err(|err| match err {}).boxed() + http_body::Empty::new() + .map_err(|err| match err {}) + .boxed_unsync() } diff --git a/tonic/src/client/grpc.rs b/tonic/src/client/grpc.rs index 2da5fbf14..210d44432 100644 --- a/tonic/src/client/grpc.rs +++ b/tonic/src/client/grpc.rs @@ -150,7 +150,7 @@ impl Grpc { ) -> Result, Status> where T: GrpcService, - T::ResponseBody: Body + Send + Sync + 'static, + T::ResponseBody: Body + Send + 'static, ::Error: Into, C: Codec, M1: Send + Sync + 'static, @@ -169,9 +169,9 @@ impl Grpc { ) -> Result, Status> where T: GrpcService, - T::ResponseBody: Body + Send + Sync + 'static, + T::ResponseBody: Body + Send + 'static, ::Error: Into, - S: Stream + Send + Sync + 'static, + S: Stream + Send + 'static, C: Codec, M1: Send + Sync + 'static, M2: Send + Sync + 'static, @@ -206,7 +206,7 @@ impl Grpc { ) -> Result>, Status> where T: GrpcService, - T::ResponseBody: Body + Send + Sync + 'static, + T::ResponseBody: Body + Send + 'static, ::Error: Into, C: Codec, M1: Send + Sync + 'static, @@ -225,9 +225,9 @@ impl Grpc { ) -> Result>, Status> where T: GrpcService, - T::ResponseBody: Body + Send + Sync + 'static, + T::ResponseBody: Body + Send + 'static, ::Error: Into, - S: Stream + Send + Sync + 'static, + S: Stream + Send + 'static, C: Codec, M1: Send + Sync + 'static, M2: Send + Sync + 'static, diff --git a/tonic/src/codec/decode.rs b/tonic/src/codec/decode.rs index 1c2fc9ec4..b4dda4729 100644 --- a/tonic/src/codec/decode.rs +++ b/tonic/src/codec/decode.rs @@ -21,7 +21,7 @@ const BUFFER_SIZE: usize = 8 * 1024; /// This will wrap some inner [`Body`] and [`Decoder`] and provide an interface /// to fetch the message stream and trailing metadata pub struct Streaming { - decoder: Box + Send + Sync + 'static>, + decoder: Box + Send + 'static>, body: BoxBody, state: State, direction: Direction, @@ -56,9 +56,9 @@ impl Streaming { #[cfg(feature = "compression")] encoding: Option, ) -> Self where - B: Body + Send + Sync + 'static, + B: Body + Send + 'static, B::Error: Into, - D: Decoder + Send + Sync + 'static, + D: Decoder + Send + 'static, { Self::new( decoder, @@ -71,9 +71,9 @@ impl Streaming { pub(crate) fn new_empty(decoder: D, body: B) -> Self where - B: Body + Send + Sync + 'static, + B: Body + Send + 'static, B::Error: Into, - D: Decoder + Send + Sync + 'static, + D: Decoder + Send + 'static, { Self::new( decoder, @@ -91,9 +91,9 @@ impl Streaming { #[cfg(feature = "compression")] encoding: Option, ) -> Self where - B: Body + Send + Sync + 'static, + B: Body + Send + 'static, B::Error: Into, - D: Decoder + Send + Sync + 'static, + D: Decoder + Send + 'static, { Self::new( decoder, @@ -111,16 +111,16 @@ impl Streaming { #[cfg(feature = "compression")] encoding: Option, ) -> Self where - B: Body + Send + Sync + 'static, + B: Body + Send + 'static, B::Error: Into, - D: Decoder + Send + Sync + 'static, + D: Decoder + Send + 'static, { Self { decoder: Box::new(decoder), body: body .map_data(|mut buf| buf.copy_to_bytes(buf.remaining())) .map_err(|err| Status::map_error(err.into())) - .boxed(), + .boxed_unsync(), state: State::ReadHeader, direction, buf: BytesMut::with_capacity(BUFFER_SIZE), @@ -140,7 +140,7 @@ impl Streaming { /// # use std::fmt::Debug; /// # async fn next_message_ex(mut request: Streaming) -> Result<(), Status> /// # where T: Debug, - /// # D: Decoder + Send + Sync + 'static, + /// # D: Decoder + Send + 'static, /// # { /// if let Some(next_message) = request.message().await? { /// println!("{:?}", next_message); @@ -378,4 +378,4 @@ impl fmt::Debug for Streaming { } #[cfg(test)] -static_assertions::assert_impl_all!(Streaming<()>: Send, Sync); +static_assertions::assert_impl_all!(Streaming<()>: Send); diff --git a/tonic/src/codec/encode.rs b/tonic/src/codec/encode.rs index 54bf64cd7..3e2c1a327 100644 --- a/tonic/src/codec/encode.rs +++ b/tonic/src/codec/encode.rs @@ -22,9 +22,8 @@ pub(crate) fn encode_server( #[cfg(feature = "compression")] compression_override: SingleMessageCompressionOverride, ) -> EncodeBody>> where - T: Encoder + Send + Sync + 'static, - T::Item: Send + Sync, - U: Stream> + Send + Sync + 'static, + T: Encoder, + U: Stream>, { let stream = encode( encoder, @@ -45,9 +44,8 @@ pub(crate) fn encode_client( #[cfg(feature = "compression")] compression_encoding: Option, ) -> EncodeBody>> where - T: Encoder + Send + Sync + 'static, - T::Item: Send + Sync, - U: Stream + Send + Sync + 'static, + T: Encoder, + U: Stream, { let stream = encode( encoder, @@ -157,7 +155,7 @@ pub(crate) struct EncodeBody { impl EncodeBody where - S: Stream> + Send + Sync + 'static, + S: Stream>, { pub(crate) fn new_client(inner: S) -> Self { Self { diff --git a/tonic/src/codec/mod.rs b/tonic/src/codec/mod.rs index d0c9d8b05..486f9d3f6 100644 --- a/tonic/src/codec/mod.rs +++ b/tonic/src/codec/mod.rs @@ -40,9 +40,9 @@ pub trait Codec: Default { type Decode: Send + 'static; /// The encoder that can encode a message. - type Encoder: Encoder + Send + Sync + 'static; + type Encoder: Encoder + Send + 'static; /// The encoder that can decode a message. - type Decoder: Decoder + Send + Sync + 'static; + type Decoder: Decoder + Send + 'static; /// Fetch the encoder. fn encoder(&mut self) -> Self::Encoder; diff --git a/tonic/src/codegen.rs b/tonic/src/codegen.rs index 9d3a06996..42fe53f20 100644 --- a/tonic/src/codegen.rs +++ b/tonic/src/codegen.rs @@ -35,5 +35,7 @@ impl std::fmt::Display for Never { impl std::error::Error for Never {} pub fn empty_body() -> crate::body::BoxBody { - http_body::Empty::new().map_err(|err| match err {}).boxed() + http_body::Empty::new() + .map_err(|err| match err {}) + .boxed_unsync() } diff --git a/tonic/src/request.rs b/tonic/src/request.rs index 9ee333916..46a2d486d 100644 --- a/tonic/src/request.rs +++ b/tonic/src/request.rs @@ -86,7 +86,7 @@ pub trait IntoRequest: sealed::Sealed { /// ``` pub trait IntoStreamingRequest: sealed::Sealed { /// The RPC request stream type - type Stream: Stream + Send + Sync + 'static; + type Stream: Stream + Send + 'static; /// The RPC request type type Message; @@ -357,7 +357,7 @@ impl IntoRequest for Request { impl IntoStreamingRequest for T where - T: Stream + Send + Sync + 'static, + T: Stream + Send + 'static, { type Stream = T; type Message = T::Item; @@ -369,7 +369,7 @@ where impl IntoStreamingRequest for Request where - T: Stream + Send + Sync + 'static, + T: Stream + Send + 'static, { type Stream = T; type Message = T::Item; diff --git a/tonic/src/server/grpc.rs b/tonic/src/server/grpc.rs index 7978e2b22..c9979a802 100644 --- a/tonic/src/server/grpc.rs +++ b/tonic/src/server/grpc.rs @@ -44,7 +44,6 @@ pub struct Grpc { impl Grpc where T: Codec, - T::Encode: Sync, { /// Creates a new gRPC server with the provided [`Codec`]. pub fn new(codec: T) -> Self { @@ -173,7 +172,7 @@ where ) -> http::Response where S: UnaryService, - B: Body + Send + Sync + 'static, + B: Body + Send + 'static, B::Error: Into + Send, { #[cfg(feature = "compression")] @@ -221,8 +220,8 @@ where ) -> http::Response where S: ServerStreamingService, - S::ResponseStream: Send + Sync + 'static, - B: Body + Send + Sync + 'static, + S::ResponseStream: Send + 'static, + B: Body + Send + 'static, B::Error: Into + Send, { #[cfg(feature = "compression")] @@ -265,7 +264,7 @@ where ) -> http::Response where S: ClientStreamingService, - B: Body + Send + Sync + 'static, + B: Body + Send + 'static, B::Error: Into + Send + 'static, { #[cfg(feature = "compression")] @@ -301,8 +300,8 @@ where ) -> http::Response where S: StreamingService + Send, - S::ResponseStream: Send + Sync + 'static, - B: Body + Send + Sync + 'static, + S::ResponseStream: Send + 'static, + B: Body + Send + 'static, B::Error: Into + Send, { #[cfg(feature = "compression")] @@ -329,7 +328,7 @@ where request: http::Request, ) -> Result, Status> where - B: Body + Send + Sync + 'static, + B: Body + Send + 'static, B::Error: Into + Send, { #[cfg(feature = "compression")] @@ -365,7 +364,7 @@ where request: http::Request, ) -> Result>, Status> where - B: Body + Send + Sync + 'static, + B: Body + Send + 'static, B::Error: Into + Send, { #[cfg(feature = "compression")] @@ -388,7 +387,7 @@ where #[cfg(feature = "compression")] compression_override: SingleMessageCompressionOverride, ) -> http::Response where - B: TryStream + Send + Sync + 'static, + B: TryStream + Send + 'static, { let response = match response { Ok(r) => r, diff --git a/tonic/src/transport/server/mod.rs b/tonic/src/transport/server/mod.rs index c0f27a0ca..adba6f896 100644 --- a/tonic/src/transport/server/mod.rs +++ b/tonic/src/transport/server/mod.rs @@ -54,7 +54,7 @@ use tower::{ Service, ServiceBuilder, }; -type BoxHttpBody = http_body::combinators::BoxBody; +type BoxHttpBody = http_body::combinators::UnsyncBoxBody; type BoxService = tower::util::BoxService, Response, crate::Error>; type TraceInterceptor = Arc) -> tracing::Span + Send + Sync + 'static>; @@ -465,7 +465,7 @@ impl Server { IO::ConnectInfo: Clone + Send + Sync + 'static, IE: Into, F: Future, - ResBody: http_body::Body + Send + Sync + 'static, + ResBody: http_body::Body + Send + 'static, ResBody::Error: Into, { let trace_interceptor = self.trace_interceptor.clone(); @@ -627,7 +627,7 @@ where Send + 'static, <>>>::Service as Service>>::Error: Into + Send, - ResBody: http_body::Body + Send + Sync + 'static, + ResBody: http_body::Body + Send + 'static, ResBody::Error: Into, { let incoming = TcpIncoming::new(addr, self.server.tcp_nodelay, self.server.tcp_keepalive) @@ -659,7 +659,7 @@ where Send + 'static, <>>>::Service as Service>>::Error: Into + Send, - ResBody: http_body::Body + Send + Sync + 'static, + ResBody: http_body::Body + Send + 'static, ResBody::Error: Into, { let incoming = TcpIncoming::new(addr, self.server.tcp_nodelay, self.server.tcp_keepalive) @@ -688,7 +688,7 @@ where Send + 'static, <>>>::Service as Service>>::Error: Into + Send, - ResBody: http_body::Body + Send + Sync + 'static, + ResBody: http_body::Body + Send + 'static, ResBody::Error: Into, { self.server @@ -723,7 +723,7 @@ where Send + 'static, <>>>::Service as Service>>::Error: Into + Send, - ResBody: http_body::Body + Send + Sync + 'static, + ResBody: http_body::Body + Send + 'static, ResBody::Error: Into, { self.server @@ -740,7 +740,7 @@ where Send + 'static, <>>>::Service as Service>>::Error: Into + Send, - ResBody: http_body::Body + Send + Sync + 'static, + ResBody: http_body::Body + Send + 'static, ResBody::Error: Into, { let inner = self.server.layer.layer(self.routes); @@ -763,7 +763,7 @@ impl Service> for Svc where S: Service, Response = Response>, S::Error: Into, - ResBody: http_body::Body + Send + Sync + 'static, + ResBody: http_body::Body + Send + 'static, ResBody::Error: Into, { type Response = Response; @@ -807,7 +807,7 @@ impl Future for SvcFuture where F: Future, E>>, E: Into, - ResBody: http_body::Body + Send + Sync + 'static, + ResBody: http_body::Body + Send + 'static, ResBody::Error: Into, { type Output = Result, crate::Error>; @@ -817,7 +817,7 @@ where let _guard = this.span.enter(); let response: Response = ready!(this.inner.poll(cx)).map_err(Into::into)?; - let response = response.map(|body| body.map_err(Into::into).boxed()); + let response = response.map(|body| body.map_err(Into::into).boxed_unsync()); Poll::Ready(Ok(response)) } } @@ -842,7 +842,7 @@ where S: Service, Response = Response> + Clone + Send + 'static, S::Future: Send + 'static, S::Error: Into + Send, - ResBody: http_body::Body + Send + Sync + 'static, + ResBody: http_body::Body + Send + 'static, ResBody::Error: Into, { type Response = BoxService; From e7d8fc6abc5855a299f3f5946319bf32f2aafe93 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Fri, 22 Oct 2021 11:25:00 -0400 Subject: [PATCH 2/4] Move to git version instead of local patch --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 995e2be7b..9715d3d84 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,5 +24,5 @@ members = [ ] [patch.crates-io] -http-body = {path = "../http-body"} -hyper = {path = "../hyper"} +http-body = {git = "https://github.com/hyperium/http-body"} +hyper = {git = "https://github.com/hyperium/hyper"} From 02bca752b2a497c619837cbe456bbcf792f6611c Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Fri, 22 Oct 2021 11:45:44 -0400 Subject: [PATCH 3/4] bump to http body 0.4.4 --- Cargo.toml | 1 - tonic/Cargo.toml | 98 ++++++++++++++++++++++++------------------------ 2 files changed, 49 insertions(+), 50 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9715d3d84..97c19bb43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,5 +24,4 @@ members = [ ] [patch.crates-io] -http-body = {git = "https://github.com/hyperium/http-body"} hyper = {git = "https://github.com/hyperium/hyper"} diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index 7d4f947b2..a836cac98 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -8,97 +8,97 @@ name = "tonic" # - README.md # - Update CHANGELOG.md. # - Create "v0.6.x" git tag. -version = "0.6.0" authors = ["Lucio Franco "] -edition = "2018" -license = "MIT" -documentation = "https://docs.rs/tonic/0.6.0/tonic/" -repository = "https://github.com/hyperium/tonic" -homepage = "https://github.com/hyperium/tonic" +categories = ["web-programming", "network-programming", "asynchronous"] description = """ A gRPC over HTTP/2 implementation focused on high performance, interoperability, and flexibility. """ -readme = "../README.md" -categories = ["web-programming", "network-programming", "asynchronous"] +documentation = "https://docs.rs/tonic/0.6.0/tonic/" +edition = "2018" +homepage = "https://github.com/hyperium/tonic" keywords = ["rpc", "grpc", "async", "futures", "protobuf"] +license = "MIT" +readme = "../README.md" +repository = "https://github.com/hyperium/tonic" +version = "0.6.0" [features] -default = ["transport", "codegen", "prost"] codegen = ["async-trait"] -transport = [ - "h2", - "hyper", - "tokio", - "tower", - "tracing-futures", - "tokio/macros", - "tokio/time", - "hyper-timeout", -] +compression = ["flate2"] +default = ["transport", "codegen", "prost"] +prost = ["prost1", "prost-derive"] tls = ["transport", "tokio-rustls"] -tls-roots-common = ["tls"] tls-roots = ["tls-roots-common", "rustls-native-certs"] +tls-roots-common = ["tls"] tls-webpki-roots = ["tls-roots-common", "webpki-roots"] -prost = ["prost1", "prost-derive"] -compression = ["flate2"] +transport = [ + "h2", + "hyper", + "tokio", + "tower", + "tracing-futures", + "tokio/macros", + "tokio/time", + "hyper-timeout", +] # [[bench]] # name = "bench_main" # harness = false [dependencies] +base64 = "0.13" bytes = "1.0" -futures-core = { version = "0.3", default-features = false } -futures-util = { version = "0.3", default-features = false } -tracing = "0.1" +futures-core = {version = "0.3", default-features = false} +futures-util = {version = "0.3", default-features = false} http = "0.2" -base64 = "0.13" +tracing = "0.1" -percent-encoding = "2.1" -tower-service = "0.3" -tower-layer = "0.3" -tokio-util = { version = "0.6", features = ["codec"] } async-stream = "0.3" -http-body = "0.4.2" +http-body = "0.4.4" +percent-encoding = "2.1" pin-project = "1.0" +tokio-util = {version = "0.6", features = ["codec"]} +tower-layer = "0.3" +tower-service = "0.3" # prost -prost1 = { package = "prost", version = "0.9", optional = true } -prost-derive = { version = "0.9", optional = true } +prost-derive = {version = "0.9", optional = true} +prost1 = {package = "prost", version = "0.9", optional = true} # codegen -async-trait = { version = "0.1.13", optional = true } +async-trait = {version = "0.1.13", optional = true} # transport -h2 = { version = "0.3", optional = true } -hyper = { version = "0.14.2", features = ["full"], optional = true } -tokio = { version = "1.0.1", features = ["net"], optional = true } +h2 = {version = "0.3", optional = true} +hyper = {version = "0.14.2", features = ["full"], optional = true} +hyper-timeout = {version = "0.4", optional = true} +tokio = {version = "1.0.1", features = ["net"], optional = true} tokio-stream = "0.1" -tower = { version = "0.4.7", features = ["balance", "buffer", "discover", "limit", "load", "make", "timeout", "util"], optional = true } -tracing-futures = { version = "0.2", optional = true } -hyper-timeout = { version = "0.4", optional = true } +tower = {version = "0.4.7", features = ["balance", "buffer", "discover", "limit", "load", "make", "timeout", "util"], optional = true} +tracing-futures = {version = "0.2", optional = true} # rustls -tokio-rustls = { version = "0.22", optional = true } -rustls-native-certs = { version = "0.5", optional = true } -webpki-roots = { version = "0.21.1", optional = true } +rustls-native-certs = {version = "0.5", optional = true} +tokio-rustls = {version = "0.22", optional = true} +webpki-roots = {version = "0.21.1", optional = true} # compression -flate2 = { version = "1.0", optional = true } +flate2 = {version = "1.0", optional = true} [dev-dependencies] -tokio = { version = "1.0", features = ["rt", "macros"] } -static_assertions = "1.0" -rand = "0.8" bencher = "0.1.5" quickcheck = "1.0" quickcheck_macros = "1.0" -tower = { version = "0.4.7", features = ["full"] } +rand = "0.8" +static_assertions = "1.0" +tokio = {version = "1.0", features = ["rt", "macros"]} +tower = {version = "0.4.7", features = ["full"]} [package.metadata.docs.rs] all-features = true rustdoc-args = ["--cfg", "docsrs"] [[bench]] -name = "decode" harness = false +name = "decode" From cb8230eeb0d1e043c67c38832b3bc2e263e90618 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Sun, 24 Oct 2021 21:06:15 -0400 Subject: [PATCH 4/4] Bump to hyper 0.14.4 --- Cargo.toml | 3 --- tonic/Cargo.toml | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 97c19bb43..5d96b7842 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,3 @@ members = [ "tests/compression", "tonic-web/tests/integration", ] - -[patch.crates-io] -hyper = {git = "https://github.com/hyperium/hyper"} diff --git a/tonic/Cargo.toml b/tonic/Cargo.toml index a836cac98..d88cda2cb 100644 --- a/tonic/Cargo.toml +++ b/tonic/Cargo.toml @@ -71,7 +71,7 @@ async-trait = {version = "0.1.13", optional = true} # transport h2 = {version = "0.3", optional = true} -hyper = {version = "0.14.2", features = ["full"], optional = true} +hyper = {version = "0.14.4", features = ["full"], optional = true} hyper-timeout = {version = "0.4", optional = true} tokio = {version = "1.0.1", features = ["net"], optional = true} tokio-stream = "0.1"