Skip to content

Commit dc309b8

Browse files
authored
Simplify SubscriptionConnection (#719)
* simplify SubscriptionConnection * fmt * update pre-existing juniper_warp::subscriptions * use struct instead of tuple * fmt * update juniper_warp
1 parent 59419f1 commit dc309b8

File tree

3 files changed

+88
-93
lines changed

3 files changed

+88
-93
lines changed

juniper/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,8 @@ pub use crate::{
186186
marker::{self, GraphQLUnion},
187187
scalars::{EmptyMutation, EmptySubscription, ID},
188188
subscriptions::{
189-
GraphQLSubscriptionType, GraphQLSubscriptionValue, SubscriptionConnection,
190-
SubscriptionCoordinator,
189+
ExecutionOutput, GraphQLSubscriptionType, GraphQLSubscriptionValue,
190+
SubscriptionConnection, SubscriptionCoordinator,
191191
},
192192
},
193193
validation::RuleError,

juniper/src/types/subscriptions.rs

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,37 @@
11
use futures::{future, stream};
2+
use serde::Serialize;
23

34
use crate::{
4-
http::{GraphQLRequest, GraphQLResponse},
5+
http::GraphQLRequest,
56
parser::Spanning,
67
types::base::{is_excluded, merge_key_into, GraphQLType, GraphQLValue},
7-
Arguments, BoxFuture, DefaultScalarValue, Executor, FieldError, Object, ScalarValue, Selection,
8-
Value, ValuesStream,
8+
Arguments, BoxFuture, DefaultScalarValue, ExecutionError, Executor, FieldError, Object,
9+
ScalarValue, Selection, Value, ValuesStream,
910
};
1011

12+
/// Represents the result of executing a GraphQL operation (after parsing and validating has been
13+
/// done).
14+
#[derive(Debug, Serialize)]
15+
pub struct ExecutionOutput<S> {
16+
/// The output data.
17+
pub data: Value<S>,
18+
19+
/// The errors that occurred. Note that the presence of errors does not mean there is no data.
20+
/// The output can have both data and errors.
21+
#[serde(bound(serialize = "S: ScalarValue"))]
22+
pub errors: Vec<ExecutionError<S>>,
23+
}
24+
25+
impl<S> ExecutionOutput<S> {
26+
/// Creates execution output from data, with no errors.
27+
pub fn from_data(data: Value<S>) -> Self {
28+
Self {
29+
data,
30+
errors: vec![],
31+
}
32+
}
33+
}
34+
1135
/// Global subscription coordinator trait.
1236
///
1337
/// With regular queries we could get away with not having some in-between
@@ -33,7 +57,7 @@ where
3357
{
3458
/// Type of [`SubscriptionConnection`]s this [`SubscriptionCoordinator`]
3559
/// returns
36-
type Connection: SubscriptionConnection<'a, S>;
60+
type Connection: SubscriptionConnection<S>;
3761

3862
/// Type of error while trying to spawn [`SubscriptionConnection`]
3963
type Error;
@@ -58,7 +82,7 @@ where
5882
///
5983
/// It can be treated as [`futures::Stream`] yielding [`GraphQLResponse`]s in
6084
/// server integration crates.
61-
pub trait SubscriptionConnection<'a, S>: futures::Stream<Item = GraphQLResponse<'a, S>> {}
85+
pub trait SubscriptionConnection<S>: futures::Stream<Item = ExecutionOutput<S>> {}
6286

6387
/// Extension of [`GraphQLValue`] trait with asynchronous [subscription][1] execution logic.
6488
/// It should be used with [`GraphQLValue`] in order to implement [subscription][1] resolvers on

juniper_subscriptions/src/lib.rs

Lines changed: 57 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ use std::{
1919

2020
use futures::{future, stream, FutureExt as _, Stream, StreamExt as _, TryFutureExt as _};
2121
use juniper::{
22-
http::{GraphQLRequest, GraphQLResponse},
23-
BoxFuture, ExecutionError, GraphQLError, GraphQLSubscriptionType, GraphQLTypeAsync, Object,
24-
ScalarValue, SubscriptionConnection, SubscriptionCoordinator, Value, ValuesStream,
22+
http::GraphQLRequest, BoxFuture, ExecutionError, ExecutionOutput, GraphQLError,
23+
GraphQLSubscriptionType, GraphQLTypeAsync, Object, ScalarValue, SubscriptionConnection,
24+
SubscriptionCoordinator, Value, ValuesStream,
2525
};
2626

2727
/// Simple [`SubscriptionCoordinator`] implementation:
@@ -88,8 +88,8 @@ where
8888

8989
/// Simple [`SubscriptionConnection`] implementation.
9090
///
91-
/// Resolves `Value<ValuesStream>` into `Stream<Item = GraphQLResponse>` using the following
92-
/// logic:
91+
/// Resolves `Value<ValuesStream>` into `Stream<Item = ExecutionOutput<S>>` using
92+
/// the following logic:
9393
///
9494
/// [`Value::Null`] - returns [`Value::Null`] once
9595
/// [`Value::Scalar`] - returns `Ok` value or [`Value::Null`] and errors vector
@@ -98,7 +98,7 @@ where
9898
/// [`Value::Object`] - waits while each field of the [`Object`] is returned, then yields the whole object
9999
/// `Value::Object<Value::Object<_>>` - returns [`Value::Null`] if [`Value::Object`] consists of sub-objects
100100
pub struct Connection<'a, S> {
101-
stream: Pin<Box<dyn Stream<Item = GraphQLResponse<'a, S>> + Send + 'a>>,
101+
stream: Pin<Box<dyn Stream<Item = ExecutionOutput<S>> + Send + 'a>>,
102102
}
103103

104104
impl<'a, S> Connection<'a, S>
@@ -113,16 +113,13 @@ where
113113
}
114114
}
115115

116-
impl<'a, S> SubscriptionConnection<'a, S> for Connection<'a, S> where
117-
S: ScalarValue + Send + Sync + 'a
118-
{
119-
}
116+
impl<'a, S> SubscriptionConnection<S> for Connection<'a, S> where S: ScalarValue + Send + Sync + 'a {}
120117

121118
impl<'a, S> Stream for Connection<'a, S>
122119
where
123120
S: ScalarValue + Send + Sync + 'a,
124121
{
125-
type Item = GraphQLResponse<'a, S>;
122+
type Item = ExecutionOutput<S>;
126123

127124
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
128125
// this is safe as stream is only mutated here and is not moved anywhere
@@ -132,7 +129,7 @@ where
132129
}
133130
}
134131

135-
/// Creates [`futures::Stream`] that yields [`GraphQLResponse`]s depending on the given [`Value`]:
132+
/// Creates [`futures::Stream`] that yields `ExecutionOutput<S>`s depending on the given [`Value`]:
136133
///
137134
/// [`Value::Null`] - returns [`Value::Null`] once
138135
/// [`Value::Scalar`] - returns `Ok` value or [`Value::Null`] and errors vector
@@ -143,23 +140,28 @@ where
143140
fn whole_responses_stream<'a, S>(
144141
stream: Value<ValuesStream<'a, S>>,
145142
errors: Vec<ExecutionError<S>>,
146-
) -> Pin<Box<dyn Stream<Item = GraphQLResponse<'a, S>> + Send + 'a>>
143+
) -> Pin<Box<dyn Stream<Item = ExecutionOutput<S>> + Send + 'a>>
147144
where
148145
S: ScalarValue + Send + Sync + 'a,
149146
{
150147
if !errors.is_empty() {
151-
return Box::pin(stream::once(future::ready(GraphQLResponse::from_result(
152-
Ok((Value::Null, errors)),
153-
))));
148+
return stream::once(future::ready(ExecutionOutput {
149+
data: Value::null(),
150+
errors,
151+
}))
152+
.boxed();
154153
}
155154

156155
match stream {
157-
Value::Null => Box::pin(stream::once(future::ready(GraphQLResponse::from_result(
158-
Ok((Value::Null, vec![])),
156+
Value::Null => Box::pin(stream::once(future::ready(ExecutionOutput::from_data(
157+
Value::null(),
159158
)))),
160159
Value::Scalar(s) => Box::pin(s.map(|res| match res {
161-
Ok(val) => GraphQLResponse::from_result(Ok((val, vec![]))),
162-
Err(err) => GraphQLResponse::from_result(Ok((Value::Null, vec![err]))),
160+
Ok(val) => ExecutionOutput::from_data(val),
161+
Err(err) => ExecutionOutput {
162+
data: Value::null(),
163+
errors: vec![err],
164+
},
163165
})),
164166
Value::List(list) => {
165167
let mut streams = vec![];
@@ -171,9 +173,8 @@ where
171173
Value::Object(mut object) => {
172174
let obj_len = object.field_count();
173175
if obj_len == 0 {
174-
return Box::pin(stream::once(future::ready(GraphQLResponse::from_result(
175-
Ok((Value::Null, vec![])),
176-
))));
176+
return stream::once(future::ready(ExecutionOutput::from_data(Value::null())))
177+
.boxed();
177178
}
178179

179180
let mut filled_count = 0;
@@ -182,7 +183,7 @@ where
182183
ready_vec.push(None);
183184
}
184185

185-
let stream = stream::poll_fn(move |mut ctx| -> Poll<Option<GraphQLResponse<'a, S>>> {
186+
let stream = stream::poll_fn(move |mut ctx| -> Poll<Option<ExecutionOutput<S>>> {
186187
let mut obj_iterator = object.iter_mut();
187188

188189
// Due to having to modify `ready_vec` contents (by-move pattern)
@@ -233,10 +234,7 @@ where
233234
}
234235
});
235236
let obj = Object::from_iter(ready_vec_iterator);
236-
Poll::Ready(Some(GraphQLResponse::from_result(Ok((
237-
Value::Object(obj),
238-
vec![],
239-
)))))
237+
Poll::Ready(Some(ExecutionOutput::from_data(Value::Object(obj))))
240238
} else {
241239
Poll::Pending
242240
}
@@ -256,9 +254,13 @@ mod whole_responses_stream {
256254

257255
#[tokio::test]
258256
async fn with_error() {
259-
let expected = vec![GraphQLResponse::<DefaultScalarValue>::error(
260-
FieldError::new("field error", Value::Null),
261-
)];
257+
let expected = vec![ExecutionOutput {
258+
data: Value::<DefaultScalarValue>::Null,
259+
errors: vec![ExecutionError::at_origin(FieldError::new(
260+
"field error",
261+
Value::Null,
262+
))],
263+
}];
262264
let expected = serde_json::to_string(&expected).unwrap();
263265

264266
let result = whole_responses_stream::<DefaultScalarValue>(
@@ -277,10 +279,9 @@ mod whole_responses_stream {
277279

278280
#[tokio::test]
279281
async fn value_null() {
280-
let expected = vec![GraphQLResponse::<DefaultScalarValue>::from_result(Ok((
281-
Value::Null,
282-
vec![],
283-
)))];
282+
let expected = vec![ExecutionOutput::from_data(
283+
Value::<DefaultScalarValue>::Null,
284+
)];
284285
let expected = serde_json::to_string(&expected).unwrap();
285286

286287
let result = whole_responses_stream::<DefaultScalarValue>(Value::Null, vec![])
@@ -296,26 +297,11 @@ mod whole_responses_stream {
296297
#[tokio::test]
297298
async fn value_scalar() {
298299
let expected = vec![
299-
GraphQLResponse::from_result(Ok((
300-
Value::Scalar(DefaultScalarValue::Int(1i32)),
301-
vec![],
302-
))),
303-
GraphQLResponse::from_result(Ok((
304-
Value::Scalar(DefaultScalarValue::Int(2i32)),
305-
vec![],
306-
))),
307-
GraphQLResponse::from_result(Ok((
308-
Value::Scalar(DefaultScalarValue::Int(3i32)),
309-
vec![],
310-
))),
311-
GraphQLResponse::from_result(Ok((
312-
Value::Scalar(DefaultScalarValue::Int(4i32)),
313-
vec![],
314-
))),
315-
GraphQLResponse::from_result(Ok((
316-
Value::Scalar(DefaultScalarValue::Int(5i32)),
317-
vec![],
318-
))),
300+
ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(1i32))),
301+
ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(2i32))),
302+
ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(3i32))),
303+
ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(4i32))),
304+
ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(5i32))),
319305
];
320306
let expected = serde_json::to_string(&expected).unwrap();
321307

@@ -340,19 +326,10 @@ mod whole_responses_stream {
340326
#[tokio::test]
341327
async fn value_list() {
342328
let expected = vec![
343-
GraphQLResponse::from_result(Ok((
344-
Value::Scalar(DefaultScalarValue::Int(1i32)),
345-
vec![],
346-
))),
347-
GraphQLResponse::from_result(Ok((
348-
Value::Scalar(DefaultScalarValue::Int(2i32)),
349-
vec![],
350-
))),
351-
GraphQLResponse::from_result(Ok((Value::Null, vec![]))),
352-
GraphQLResponse::from_result(Ok((
353-
Value::Scalar(DefaultScalarValue::Int(4i32)),
354-
vec![],
355-
))),
329+
ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(1i32))),
330+
ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(2i32))),
331+
ExecutionOutput::from_data(Value::null()),
332+
ExecutionOutput::from_data(Value::Scalar(DefaultScalarValue::Int(4i32))),
356333
];
357334
let expected = serde_json::to_string(&expected).unwrap();
358335

@@ -380,25 +357,19 @@ mod whole_responses_stream {
380357
#[tokio::test]
381358
async fn value_object() {
382359
let expected = vec![
383-
GraphQLResponse::from_result(Ok((
384-
Value::Object(Object::from_iter(
385-
vec![
386-
("one", Value::Scalar(DefaultScalarValue::Int(1i32))),
387-
("two", Value::Scalar(DefaultScalarValue::Int(1i32))),
388-
]
389-
.into_iter(),
390-
)),
391-
vec![],
360+
ExecutionOutput::from_data(Value::Object(Object::from_iter(
361+
vec![
362+
("one", Value::Scalar(DefaultScalarValue::Int(1i32))),
363+
("two", Value::Scalar(DefaultScalarValue::Int(1i32))),
364+
]
365+
.into_iter(),
392366
))),
393-
GraphQLResponse::from_result(Ok((
394-
Value::Object(Object::from_iter(
395-
vec![
396-
("one", Value::Scalar(DefaultScalarValue::Int(2i32))),
397-
("two", Value::Scalar(DefaultScalarValue::Int(2i32))),
398-
]
399-
.into_iter(),
400-
)),
401-
vec![],
367+
ExecutionOutput::from_data(Value::Object(Object::from_iter(
368+
vec![
369+
("one", Value::Scalar(DefaultScalarValue::Int(2i32))),
370+
("two", Value::Scalar(DefaultScalarValue::Int(2i32))),
371+
]
372+
.into_iter(),
402373
))),
403374
];
404375
let expected = serde_json::to_string(&expected).unwrap();

0 commit comments

Comments
 (0)