Skip to content

Commit 01489e2

Browse files
committed
Add StreamResponse type
1 parent 4868cf2 commit 01489e2

File tree

8 files changed

+116
-69
lines changed

8 files changed

+116
-69
lines changed

async-graphql-actix-web/src/lib.rs

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,15 @@ use actix_web::body::BodyStream;
88
use actix_web::dev::{Payload, PayloadStream};
99
use actix_web::http::StatusCode;
1010
use actix_web::{http, web, Error, FromRequest, HttpRequest, HttpResponse, Responder};
11-
use async_graphql::http::StreamBody;
11+
use async_graphql::http::{multipart_stream, StreamBody};
1212
use async_graphql::{
1313
IntoQueryBuilder, IntoQueryBuilderOpts, ParseRequestError, QueryBuilder, QueryResponse,
14+
StreamResponse,
1415
};
15-
use bytes::{buf::BufExt, Buf, Bytes};
1616
use futures::channel::mpsc;
1717
use futures::future::Ready;
18-
use futures::{Future, SinkExt, Stream, StreamExt, TryFutureExt};
18+
use futures::{Future, SinkExt, StreamExt, TryFutureExt};
19+
use std::convert::Infallible;
1920
use std::pin::Pin;
2021
pub use subscription::WSSubscription;
2122

@@ -94,43 +95,30 @@ impl Responder for GQLResponse {
9495
}
9596

9697
/// Responder for GraphQL response stream
97-
pub struct GQLResponseStream<S: Stream<Item = async_graphql::Result<QueryResponse>>>(S);
98+
pub struct GQLResponseStream(StreamResponse);
9899

99-
impl<S: Stream<Item = async_graphql::Result<QueryResponse>> + 'static> From<S>
100-
for GQLResponseStream<S>
101-
{
102-
fn from(stream: S) -> Self {
103-
GQLResponseStream(stream)
100+
impl From<StreamResponse> for GQLResponseStream {
101+
fn from(resp: StreamResponse) -> Self {
102+
GQLResponseStream(resp)
104103
}
105104
}
106105

107-
impl<S: Stream<Item = async_graphql::Result<QueryResponse>> + Unpin + 'static> Responder
108-
for GQLResponseStream<S>
109-
{
106+
impl Responder for GQLResponseStream {
110107
type Error = Error;
111108
type Future = Ready<Result<HttpResponse, Error>>;
112109

113-
fn respond_to(self, _req: &HttpRequest) -> Self::Future {
114-
let body = BodyStream::new(
115-
self.0
116-
.map(|res| serde_json::to_vec(&async_graphql::http::GQLResponse(res)).unwrap())
117-
.map(|data| {
118-
Ok::<_, std::convert::Infallible>(
119-
Bytes::from(format!(
120-
"\r\n---\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n",
121-
data.len()
122-
))
123-
.chain(Bytes::from(data))
124-
.to_bytes(),
125-
)
126-
})
127-
.chain(futures::stream::once(futures::future::ok(
128-
Bytes::from_static(b"\r\n-----\r\n"),
129-
))),
130-
);
131-
let res = HttpResponse::build(StatusCode::OK)
132-
.content_type("multipart/mixed; boundary=\"-\"")
133-
.body(body);
134-
futures::future::ok(res)
110+
fn respond_to(self, req: &HttpRequest) -> Self::Future {
111+
match self.0 {
112+
StreamResponse::Single(resp) => GQLResponse(resp).respond_to(req),
113+
StreamResponse::Stream(stream) => {
114+
let body = BodyStream::new(
115+
multipart_stream(stream).map(|item| Result::<_, Infallible>::Ok(item)),
116+
);
117+
let res = HttpResponse::build(StatusCode::OK)
118+
.content_type("multipart/mixed; boundary=\"-\"")
119+
.body(body);
120+
futures::future::ok(res)
121+
}
122+
}
135123
}
136124
}

feature-comparison.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Comparing Features of Other Rust GraphQL Implementations
2424
| Field guard | 👍 | ⛔️ |
2525
| Multipart request(upload file) | 👍 | ⛔️ |
2626
| Subscription | 👍 | ⛔️ |
27+
| @defer/@stream | 👍 | ⛔️ |
2728
| Opentracing | 👍 | ⛔️ |
2829
| Apollo Federation | 👍 | ⛔️ |
2930
| Apollo Tracing | 👍 | ⛔️ |

src/http/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22
33
mod graphiql_source;
44
mod into_query_builder;
5+
mod multipart_stream;
56
mod playground_source;
67
mod stream_body;
78

89
use itertools::Itertools;
910

1011
pub use graphiql_source::graphiql_source;
12+
pub use multipart_stream::multipart_stream;
1113
pub use playground_source::playground_source;
1214
pub use stream_body::StreamBody;
1315

src/http/multipart_stream.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use crate::http::GQLResponse;
2+
use crate::{QueryResponse, Result};
3+
use bytes::{buf::BufExt, Buf, Bytes};
4+
use futures::{Stream, StreamExt};
5+
6+
/// Create a multipart response data stream.
7+
pub fn multipart_stream(
8+
s: impl Stream<Item = Result<QueryResponse>> + Unpin + 'static,
9+
) -> impl Stream<Item = Bytes> {
10+
s.map(|res| serde_json::to_vec(&GQLResponse(res)).unwrap())
11+
.map(|data| {
12+
Bytes::from(format!(
13+
"\r\n---\r\nContent-Type: application/json\r\nContent-Length: {}\r\n\r\n",
14+
data.len()
15+
))
16+
.chain(Bytes::from(data))
17+
.to_bytes()
18+
})
19+
.chain(futures::stream::once(async move {
20+
Bytes::from_static(b"\r\n-----\r\n")
21+
}))
22+
}

src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,9 @@ pub use error::{
142142
};
143143
pub use look_ahead::Lookahead;
144144
pub use parser::{Pos, Positioned, Value};
145-
pub use query::{IntoQueryBuilder, IntoQueryBuilderOpts, QueryBuilder, QueryResponse};
145+
pub use query::{
146+
IntoQueryBuilder, IntoQueryBuilderOpts, QueryBuilder, QueryResponse, StreamResponse,
147+
};
146148
pub use registry::CacheControl;
147149
pub use scalars::{Any, Json, ID};
148150
pub use schema::{Schema, SchemaBuilder, SchemaEnv};

src/query.rs

Lines changed: 61 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use futures::{Stream, StreamExt};
1313
use itertools::Itertools;
1414
use std::any::Any;
1515
use std::fs::File;
16+
use std::pin::Pin;
1617
use std::sync::atomic::AtomicUsize;
1718
use std::sync::Arc;
1819

@@ -97,6 +98,25 @@ impl QueryResponse {
9798
}
9899
}
99100

101+
/// Response for `Schema::execute_stream` and `QueryBuilder::execute_stream`
102+
pub enum StreamResponse {
103+
/// There is no `@defer` or `@stream` directive in the query, this is the final result.
104+
Single(Result<QueryResponse>),
105+
106+
/// Streaming responses.
107+
Stream(Pin<Box<dyn Stream<Item = Result<QueryResponse>> + Send + 'static>>),
108+
}
109+
110+
impl StreamResponse {
111+
/// Convert to a stream.
112+
pub fn into_stream(self) -> impl Stream<Item = Result<QueryResponse>> + Send + 'static {
113+
match self {
114+
StreamResponse::Single(resp) => Box::pin(futures::stream::once(async move { resp })),
115+
StreamResponse::Stream(stream) => stream,
116+
}
117+
}
118+
}
119+
100120
/// Query builder
101121
pub struct QueryBuilder {
102122
pub(crate) query_source: String,
@@ -158,43 +178,50 @@ impl QueryBuilder {
158178
/// Execute the query, returns a stream, the first result being the query result,
159179
/// followed by the incremental result. Only when there are `@defer` and `@stream` directives
160180
/// in the query will there be subsequent incremental results.
161-
pub fn execute_stream<Query, Mutation, Subscription>(
181+
pub async fn execute_stream<Query, Mutation, Subscription>(
162182
self,
163183
schema: &Schema<Query, Mutation, Subscription>,
164-
) -> impl Stream<Item = Result<QueryResponse>>
184+
) -> StreamResponse
165185
where
166186
Query: ObjectType + Send + Sync + 'static,
167187
Mutation: ObjectType + Send + Sync + 'static,
168188
Subscription: SubscriptionType + Send + Sync + 'static,
169189
{
170190
let schema = schema.clone();
171-
let stream = async_stream::try_stream! {
172-
let (first_resp, defer_list) = self.execute_first(&schema).await?;
173-
yield first_resp;
174-
175-
let mut current_defer_list = Vec::new();
176-
for fut in defer_list.futures.into_inner() {
177-
current_defer_list.push((defer_list.path_prefix.clone(), fut));
191+
match self.execute_first(&schema).await {
192+
Ok((first_resp, defer_list)) if defer_list.futures.lock().is_empty() => {
193+
return StreamResponse::Single(Ok(first_resp));
178194
}
195+
Err(err) => StreamResponse::Single(Err(err)),
196+
Ok((first_resp, defer_list)) => {
197+
let stream = async_stream::try_stream! {
198+
yield first_resp;
179199

180-
loop {
181-
let mut next_defer_list = Vec::new();
182-
for (path_prefix, defer) in current_defer_list {
183-
let (res, mut defer_list) = defer.await?;
200+
let mut current_defer_list = Vec::new();
184201
for fut in defer_list.futures.into_inner() {
185-
let mut next_path_prefix = path_prefix.clone();
186-
next_path_prefix.extend(defer_list.path_prefix.clone());
187-
next_defer_list.push((next_path_prefix, fut));
202+
current_defer_list.push((defer_list.path_prefix.clone(), fut));
188203
}
189-
yield res.apply_path_prefix(path_prefix);
190-
}
191-
if next_defer_list.is_empty() {
192-
break;
193-
}
194-
current_defer_list = next_defer_list;
204+
205+
loop {
206+
let mut next_defer_list = Vec::new();
207+
for (path_prefix, defer) in current_defer_list {
208+
let (res, mut defer_list) = defer.await?;
209+
for fut in defer_list.futures.into_inner() {
210+
let mut next_path_prefix = path_prefix.clone();
211+
next_path_prefix.extend(defer_list.path_prefix.clone());
212+
next_defer_list.push((next_path_prefix, fut));
213+
}
214+
yield res.apply_path_prefix(path_prefix);
215+
}
216+
if next_defer_list.is_empty() {
217+
break;
218+
}
219+
current_defer_list = next_defer_list;
220+
}
221+
};
222+
StreamResponse::Stream(Box::pin(stream))
195223
}
196-
};
197-
Box::pin(stream)
224+
}
198225
}
199226

200227
async fn execute_first<'a, Query, Mutation, Subscription>(
@@ -332,11 +359,16 @@ impl QueryBuilder {
332359
Mutation: ObjectType + Send + Sync + 'static,
333360
Subscription: SubscriptionType + Send + Sync + 'static,
334361
{
335-
let mut stream = self.execute_stream(schema);
336-
let mut resp = stream.next().await.unwrap()?;
337-
while let Some(resp_part) = stream.next().await.transpose()? {
338-
resp.merge(resp_part);
362+
let resp = self.execute_stream(schema).await;
363+
match resp {
364+
StreamResponse::Single(res) => res,
365+
StreamResponse::Stream(mut stream) => {
366+
let mut resp = stream.next().await.unwrap()?;
367+
while let Some(resp_part) = stream.next().await.transpose()? {
368+
resp.merge(resp_part);
369+
}
370+
Ok(resp)
371+
}
339372
}
340-
Ok(resp)
341373
}
342374
}

src/schema.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::context::Data;
22
use crate::extensions::{BoxExtension, Extension};
33
use crate::model::__DirectiveLocation;
44
use crate::parser::parse_query;
5-
use crate::query::QueryBuilder;
5+
use crate::query::{QueryBuilder, StreamResponse};
66
use crate::registry::{MetaDirective, MetaInputValue, Registry};
77
use crate::subscription::{create_connection, create_subscription_stream, SubscriptionTransport};
88
use crate::types::QueryRoot;
@@ -293,8 +293,8 @@ where
293293
/// Execute the query without create the `QueryBuilder`, returns a stream, the first result being the query result,
294294
/// followed by the incremental result. Only when there are `@defer` and `@stream` directives
295295
/// in the query will there be subsequent incremental results.
296-
pub fn execute_stream(&self, query_source: &str) -> impl Stream<Item = Result<QueryResponse>> {
297-
QueryBuilder::new(query_source).execute_stream(self)
296+
pub async fn execute_stream(&self, query_source: &str) -> StreamResponse {
297+
QueryBuilder::new(query_source).execute_stream(self).await
298298
}
299299

300300
/// Create subscription stream, typically called inside the `SubscriptionTransport::handle_request` method

tests/defer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ pub async fn test_defer() {
6363
})
6464
);
6565

66-
let mut stream = schema.execute_stream(&query);
66+
let mut stream = schema.execute_stream(&query).await.into_stream();
6767
assert_eq!(
6868
stream.next().await.unwrap().unwrap().data,
6969
serde_json::json!({
@@ -133,7 +133,7 @@ pub async fn test_stream() {
133133
})
134134
);
135135

136-
let mut stream = schema.execute_stream(&query);
136+
let mut stream = schema.execute_stream(&query).await.into_stream();
137137
assert_eq!(
138138
stream.next().await.unwrap().unwrap().data,
139139
serde_json::json!({

0 commit comments

Comments
 (0)