Skip to content

Remove Sync requirement for Streaming  #117

@parasyte

Description

@parasyte

This is a followup to the conversation in #81 (comment) - I decided to create a new ticket to raise awareness.

Bug Report

Version

tonic = "0.1.0-alpha.5"

Platform

Darwin JayMBP-2.local 18.7.0 Darwin Kernel Version 18.7.0: Sat Oct 12 00:02:19 PDT 2019; root:xnu-4903.278.12~1/RELEASE_X86_64 x86_64

Description

In older releases, we're able to use non-Sync futures, like hyper::client::ResponseFuture to yield items in a streaming response. #84 added a Sync trait bound to the pinned Stream trait object returned by the service impl. The following code works with the previous version of tonic:

Works with 0.1.0-alpha.4

#[derive(Debug)]
pub struct RouteGuide {
    client: Client<HttpConnector, Body>,
}

#[tonic::async_trait]
impl server::RouteGuide for RouteGuide {
    type RouteChatStream =
        Pin<Box<dyn Stream<Item = Result<RouteNote, Status>> + Send + 'static>>;

    async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<Self::RouteChatStream>, Status> {
        println!("RouteChat");

        let stream = request.into_inner();
        let client = self.client.clone();

        let output = async_stream::try_stream! {
            futures::pin_mut!(stream);

            while let Some(note) = stream.next().await {
                let _note = note?;

                // Make a simple HTTP request. What could possibly go wrong?
                let res = client.get(hyper::Uri::from_static("http://httpbin.org/get")).await;

                // Receive the response as a byte stream
                let mut body = res.unwrap().into_body();
                let mut bytes = Vec::new();
                while let Some(chunk) = body.next().await {
                    bytes.extend(chunk.map_err(|_| Status::new(tonic::Code::Internal, "Error"))?);
                }
                let message = String::from_utf8_lossy(&bytes).to_string();

                let note = RouteNote {
                    location: None,
                    message,
                };

                yield note;
            }
        };

        Ok(Response::new(Box::pin(output)
            as Pin<
                Box<dyn Stream<Item = Result<RouteNote, Status>> + Send + 'static>,
            >))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:10000".parse().unwrap();

    println!("Listening on: {}", addr);

    let client = hyper::client::Client::new();
    let route_guide = RouteGuide {
        client,
    };

    let svc = server::RouteGuideServer::new(route_guide);

    Server::builder().serve(addr, svc).await?;

    Ok(())
}

And the updated code now fails:

Fails with 0.1.0-alpha.5

#[derive(Debug)]
pub struct RouteGuide {
    client: Client<HttpConnector, Body>,
}

#[tonic::async_trait]
impl server::RouteGuide for RouteGuide {
    type RouteChatStream =
        Pin<Box<dyn Stream<Item = Result<RouteNote, Status>> + Send + Sync + 'static>>;

    async fn route_chat(
        &self,
        request: Request<tonic::Streaming<RouteNote>>,
    ) -> Result<Response<Self::RouteChatStream>, Status> {
        println!("RouteChat");

        let stream = request.into_inner();
        let client = self.client.clone();

        let output = async_stream::try_stream! {
            futures::pin_mut!(stream);

            while let Some(note) = stream.next().await {
                let _note = note?;

                // Make a simple HTTP request. What could possibly go wrong?
                let res = client.get(hyper::Uri::from_static("http://httpbin.org/get")).await;

                // Receive the response as a byte stream
                let mut body = res.unwrap().into_body();
                let mut bytes = Vec::new();
                while let Some(chunk) = body.next().await {
                    bytes.extend(chunk.map_err(|_| Status::new(tonic::Code::Internal, "Error"))?);
                }
                let message = String::from_utf8_lossy(&bytes).to_string();

                let note = RouteNote {
                    location: None,
                    message,
                };

                yield note;
            }
        };

        Ok(Response::new(Box::pin(output)
            as Pin<
                Box<dyn Stream<Item = Result<RouteNote, Status>> + Send + Sync + 'static>,
            >))
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let addr = "[::1]:10000".parse().unwrap();

    println!("Listening on: {}", addr);

    let client = hyper::client::Client::new();
    let route_guide = RouteGuide {
        client,
    };

    let svc = server::RouteGuideServer::new(route_guide);

    Server::builder().add_service(svc).serve(addr).await?;

    Ok(())
}

Compile error

error[E0277]: `(dyn core::future::future::Future<Output = std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>> + std::marker::Send + 'static)` cannot be shared between threads safely
  |
  = help: the trait `std::marker::Sync` is not implemented for `(dyn core::future::future::Future<Output = std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>> + std::marker::Send + 'static)`
  = note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<(dyn core::future::future::Future<Output = std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>> + std::marker::Send + 'static)>`
  = note: required because it appears within the type `std::boxed::Box<(dyn core::future::future::Future<Output = std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>> + std::marker::Send + 'static)>`
  = note: required because it appears within the type `std::pin::Pin<std::boxed::Box<(dyn core::future::future::Future<Output = std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>> + std::marker::Send + 'static)>>`
  = note: required because it appears within the type `hyper::client::ResponseFuture`
  = note: required because it appears within the type `for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11, 't12, 't13, 't14> {tonic::codec::decode::Streaming<routeguide::RouteNote>, std::pin::Pin<&'r mut tonic::codec::decode::Streaming<routeguide::RouteNote>>, &'s mut std::pin::Pin<&'t0 mut tonic::codec::decode::Streaming<routeguide::RouteNote>>, std::pin::Pin<&'t1 mut tonic::codec::decode::Streaming<routeguide::RouteNote>>, futures_util::stream::next::Next<'t2, std::pin::Pin<&'t3 mut tonic::codec::decode::Streaming<routeguide::RouteNote>>>, futures_util::stream::next::Next<'t4, std::pin::Pin<&'t5 mut tonic::codec::decode::Streaming<routeguide::RouteNote>>>, (), std::option::Option<std::result::Result<routeguide::RouteNote, tonic::status::Status>>, std::result::Result<routeguide::RouteNote, tonic::status::Status>, std::result::Result<routeguide::RouteNote, tonic::status::Status>, tonic::status::Status, &'t6 mut async_stream::yielder::Sender<std::result::Result<routeguide::RouteNote, tonic::status::Status>>, async_stream::yielder::Sender<std::result::Result<routeguide::RouteNote, tonic::status::Status>>, std::result::Result<routeguide::RouteNote, tonic::status::Status>, impl core::future::future::Future, impl core::future::future::Future, (), routeguide::RouteNote, &'t7 hyper::client::Client<hyper::client::connect::http::HttpConnector>, hyper::client::Client<hyper::client::connect::http::HttpConnector>, &'t8 str, http::uri::Uri, hyper::client::ResponseFuture, hyper::client::ResponseFuture, (), std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>, hyper::body::body::Body, std::vec::Vec<u8>, &'t9 mut hyper::body::body::Body, hyper::body::body::Body, impl core::future::future::Future, impl core::future::future::Future, (), std::option::Option<std::result::Result<hyper::body::chunk::Chunk, hyper::error::Error>>, std::result::Result<hyper::body::chunk::Chunk, hyper::error::Error>, &'t12 mut std::vec::Vec<u8>, std::vec::Vec<u8>, std::result::Result<hyper::body::chunk::Chunk, hyper::error::Error>, [closure@<::async_stream::try_stream macros>:8:25: 8:54], std::result::Result<hyper::body::chunk::Chunk, tonic::status::Status>, &'t13 mut async_stream::yielder::Sender<std::result::Result<routeguide::RouteNote, tonic::status::Status>>, impl core::future::future::Future, (), std::string::String, routeguide::RouteNote, &'t14 mut async_stream::yielder::Sender<std::result::Result<routeguide::RouteNote, tonic::status::Status>>, impl core::future::future::Future, ()}`  = note: required because it appears within the type `[static generator@<::async_stream::try_stream macros>:7:10: 11:11 stream:tonic::codec::decode::Streaming<routeguide::RouteNote>, __yield_tx:async_stream::yielder::Sender<std::result::Result<routeguide::RouteNote, tonic::status::Status>>, client:hyper::client::Client<hyper::client::connect::http::HttpConnector> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11, 't12, 't13, 't14> {tonic::codec::decode::Streaming<routeguide::RouteNote>, std::pin::Pin<&'r mut tonic::codec::decode::Streaming<routeguide::RouteNote>>, &'s mut std::pin::Pin<&'t0 mut tonic::codec::decode::Streaming<routeguide::RouteNote>>, std::pin::Pin<&'t1 mut tonic::codec::decode::Streaming<routeguide::RouteNote>>, futures_util::stream::next::Next<'t2, std::pin::Pin<&'t3 mut tonic::codec::decode::Streaming<routeguide::RouteNote>>>, futures_util::stream::next::Next<'t4, std::pin::Pin<&'t5 mut tonic::codec::decode::Streaming<routeguide::RouteNote>>>, (), std::option::Option<std::result::Result<routeguide::RouteNote, tonic::status::Status>>, std::result::Result<routeguide::RouteNote, tonic::status::Status>, std::result::Result<routeguide::RouteNote, tonic::status::Status>, tonic::status::Status, &'t6 mut async_stream::yielder::Sender<std::result::Result<routeguide::RouteNote, tonic::status::Status>>, async_stream::yielder::Sender<std::result::Result<routeguide::RouteNote, tonic::status::Status>>, std::result::Result<routeguide::RouteNote, tonic::status::Status>, impl core::future::future::Future, impl core::future::future::Future, (), routeguide::RouteNote, &'t7 hyper::client::Client<hyper::client::connect::http::HttpConnector>, hyper::client::Client<hyper::client::connect::http::HttpConnector>, &'t8 str, http::uri::Uri, hyper::client::ResponseFuture, hyper::client::ResponseFuture, (), std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>, hyper::body::body::Body, std::vec::Vec<u8>, &'t9 mut hyper::body::body::Body, hyper::body::body::Body, impl core::future::future::Future, impl core::future::future::Future, (), std::option::Option<std::result::Result<hyper::body::chunk::Chunk, hyper::error::Error>>, std::result::Result<hyper::body::chunk::Chunk, hyper::error::Error>, &'t12 mut std::vec::Vec<u8>, std::vec::Vec<u8>, std::result::Result<hyper::body::chunk::Chunk, hyper::error::Error>, [closure@<::async_stream::try_stream macros>:8:25: 8:54], std::result::Result<hyper::body::chunk::Chunk, tonic::status::Status>, &'t13 mut async_stream::yielder::Sender<std::result::Result<routeguide::RouteNote, tonic::status::Status>>, impl core::future::future::Future, (), std::string::String, routeguide::RouteNote, &'t14 mut async_stream::yielder::Sender<std::result::Result<routeguide::RouteNote, tonic::status::Status>>, impl core::future::future::Future, ()}]`  = note: required because it appears within the type `std::future::GenFuture<[static generator@<::async_stream::try_stream macros>:7:10: 11:11 stream:tonic::codec::decode::Streaming<routeguide::RouteNote>, __yield_tx:async_stream::yielder::Sender<std::result::Result<routeguide::RouteNote, tonic::status::Status>>, client:hyper::client::Client<hyper::client::connect::http::HttpConnector> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11, 't12, 't13, 't14> {tonic::codec::decode::Streaming<routeguide::RouteNote>, std::pin::Pin<&'r mut tonic::codec::decode::Streaming<routeguide::RouteNote>>, &'s mut std::pin::Pin<&'t0 mut tonic::codec::decode::Streaming<routeguide::RouteNote>>, std::pin::Pin<&'t1 mut tonic::codec::decode::Streaming<routeguide::RouteNote>>, futures_util::stream::next::Next<'t2, std::pin::Pin<&'t3 mut tonic::codec::decode::Streaming<routeguide::RouteNote>>>, futures_util::stream::next::Next<'t4, std::pin::Pin<&'t5 mut tonic::codec::decode::Streaming<routeguide::RouteNote>>>, (), std::option::Option<std::result::Result<routeguide::RouteNote, tonic::status::Status>>, std::result::Result<routeguide::RouteNote, tonic::status::Status>, std::result::Result<routeguide::RouteNote, tonic::status::Status>, tonic::status::Status, &'t6 mut async_stream::yielder::Sender<std::result::Result<routeguide::RouteNote, tonic::status::Status>>, async_stream::yielder::Sender<std::result::Result<routeguide::RouteNote, tonic::status::Status>>, std::result::Result<routeguide::RouteNote, tonic::status::Status>, impl core::future::future::Future, impl core::future::future::Future, (), routeguide::RouteNote, &'t7 hyper::client::Client<hyper::client::connect::http::HttpConnector>, hyper::client::Client<hyper::client::connect::http::HttpConnector>, &'t8 str, http::uri::Uri, hyper::client::ResponseFuture, hyper::client::ResponseFuture, (), std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>, hyper::body::body::Body, std::vec::Vec<u8>, &'t9 mut hyper::body::body::Body, hyper::body::body::Body, impl core::future::future::Future, impl core::future::future::Future, (), std::option::Option<std::result::Result<hyper::body::chunk::Chunk, hyper::error::Error>>, std::result::Result<hyper::body::chunk::Chunk, hyper::error::Error>, &'t12 mut std::vec::Vec<u8>, std::vec::Vec<u8>, std::result::Result<hyper::body::chunk::Chunk, hyper::error::Error>, [closure@<::async_stream::try_stream macros>:8:25: 8:54], std::result::Result<hyper::body::chunk::Chunk, tonic::status::Status>, &'t13 mut async_stream::yielder::Sender<std::result::Result<routeguide::RouteNote, tonic::status::Status>>, impl core::future::future::Future, (), std::string::String, routeguide::RouteNote, &'t14 mut async_stream::yielder::Sender<std::result::Result<routeguide::RouteNote, tonic::status::Status>>, impl core::future::future::Future, ()}]>`
  = note: required because it appears within the type `impl core::future::future::Future`
  = note: required because it appears within the type `async_stream::async_stream::AsyncStream<std::result::Result<routeguide::RouteNote, tonic::status::Status>, impl core::future::future::Future>`
  = note: required for the cast to the object type `dyn futures_core::stream::Stream<Item = std::result::Result<routeguide::RouteNote, tonic::status::Status>> + std::marker::Send + std::marker::Sync`

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions