Skip to content

Commit c6eb817

Browse files
committed
fix(tonic): make Streaming Sync again
The boxed `Decoder` inside `Streaming` need not be `Sync` since #804. Unfortunately, that makes `Streaming` non-`Sync`, meaning that all the generated `tonic` futures cannot be awaited in `Sync` futures. In fact, the only times we use the `Decoder`, we have a `&mut` unique reference to it, so we are guaranteed not to require synchronization. The `sync_wrapper` crate encodes this reasoning, allowing us to safely make the `Streaming` type `Sync` regardless of whether the contained `Decoder` is `Sync` or not.
1 parent 689a86d commit c6eb817

File tree

2 files changed

+5
-3
lines changed

2 files changed

+5
-3
lines changed

tonic/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ zstd = { version = "0.13.0", optional = true }
9494

9595
# channel
9696
hyper-timeout = {version = "0.5", optional = true}
97+
sync_wrapper = "1.0.2"
9798

9899
[dev-dependencies]
99100
bencher = "0.1.5"

tonic/src/codec/decode.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use std::{
1111
task::ready,
1212
task::{Context, Poll},
1313
};
14+
use sync_wrapper::SyncWrapper;
1415
use tokio_stream::Stream;
1516
use tracing::{debug, trace};
1617

@@ -19,7 +20,7 @@ use tracing::{debug, trace};
1920
/// This will wrap some inner [`Body`] and [`Decoder`] and provide an interface
2021
/// to fetch the message stream and trailing metadata
2122
pub struct Streaming<T> {
22-
decoder: Box<dyn Decoder<Item = T, Error = Status> + Send + 'static>,
23+
decoder: SyncWrapper<Box<dyn Decoder<Item = T, Error = Status> + Send + 'static>>,
2324
inner: StreamingInner,
2425
}
2526

@@ -367,8 +368,8 @@ impl<T> Streaming<T> {
367368
}
368369

369370
fn decode_chunk(&mut self) -> Result<Option<T>, Status> {
370-
match self.inner.decode_chunk(self.decoder.buffer_settings())? {
371-
Some(mut decode_buf) => match self.decoder.decode(&mut decode_buf)? {
371+
match self.inner.decode_chunk(self.decoder.get_mut().buffer_settings())? {
372+
Some(mut decode_buf) => match self.decoder.get_mut().decode(&mut decode_buf)? {
372373
Some(msg) => {
373374
self.inner.state = State::ReadHeader;
374375
Ok(Some(msg))

0 commit comments

Comments
 (0)