Skip to content

Commit 8d035c5

Browse files
committed
fix pre-existing bug
1 parent 0d3cb0c commit 8d035c5

File tree

2 files changed

+65
-6
lines changed

2 files changed

+65
-6
lines changed

juniper_graphql_ws/src/lib.rs

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -640,7 +640,7 @@ mod test {
640640
use juniper::{
641641
futures::sink::SinkExt,
642642
parser::{ParseError, Spanning, Token},
643-
DefaultScalarValue, EmptyMutation, FieldResult, InputValue, RootNode, Value,
643+
DefaultScalarValue, EmptyMutation, FieldError, FieldResult, InputValue, RootNode, Value,
644644
};
645645
use std::{convert::Infallible, io};
646646

@@ -678,6 +678,20 @@ mod test {
678678
)
679679
.boxed()
680680
}
681+
682+
/// error emits an error once, then never emits anything else.
683+
async fn error(context: &Context) -> BoxStream<'static, FieldResult<i32>> {
684+
stream::once(future::ready(Err(FieldError::new(
685+
"field error",
686+
Value::null(),
687+
))))
688+
.chain(
689+
tokio::time::delay_for(Duration::from_secs(10000))
690+
.map(|_| unreachable!())
691+
.into_stream(),
692+
)
693+
.boxed()
694+
}
681695
}
682696

683697
type ClientMessage = super::ClientMessage<DefaultScalarValue>;
@@ -1009,4 +1023,46 @@ mod test {
10091023
conn.next().await.unwrap()
10101024
);
10111025
}
1026+
1027+
#[tokio::test]
1028+
async fn test_subscription_field_error() {
1029+
let mut conn = Connection::new(
1030+
new_test_schema(),
1031+
ConnectionConfig::new(Context(1)).with_keep_alive_interval(Duration::from_secs(0)),
1032+
);
1033+
1034+
conn.send(ClientMessage::ConnectionInit {
1035+
payload: Variables::default(),
1036+
})
1037+
.await
1038+
.unwrap();
1039+
1040+
assert_eq!(ServerMessage::ConnectionAck, conn.next().await.unwrap());
1041+
1042+
conn.send(ClientMessage::Start {
1043+
id: "foo".to_string(),
1044+
payload: StartPayload {
1045+
query: "subscription Foo {error}".to_string(),
1046+
variables: Variables::default(),
1047+
operation_name: None,
1048+
},
1049+
})
1050+
.await
1051+
.unwrap();
1052+
1053+
match conn.next().await.unwrap() {
1054+
ServerMessage::Data {
1055+
id,
1056+
payload: DataPayload { data, errors },
1057+
} => {
1058+
assert_eq!(id, "foo");
1059+
assert_eq!(
1060+
data,
1061+
Value::Object([("error", Value::null())].iter().cloned().collect())
1062+
);
1063+
assert_eq!(errors.len(), 1);
1064+
}
1065+
msg @ _ => panic!("expected data, got: {:?}", msg),
1066+
}
1067+
}
10121068
}

juniper_subscriptions/src/lib.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -213,19 +213,22 @@ where
213213
}
214214

215215
if filled_count == obj_len {
216+
let mut errors = vec![];
216217
filled_count = 0;
217218
let new_vec = (0..obj_len).map(|_| None).collect::<Vec<_>>();
218219
let ready_vec = std::mem::replace(&mut ready_vec, new_vec);
219220
let ready_vec_iterator = ready_vec.into_iter().map(|el| {
220221
let (name, val) = el.unwrap();
221-
if let Ok(value) = val {
222-
(name, value)
223-
} else {
224-
(name, Value::Null)
222+
match val {
223+
Ok(value) => (name, value),
224+
Err(e) => {
225+
errors.push(e);
226+
(name, Value::Null)
227+
}
225228
}
226229
});
227230
let obj = Object::from_iter(ready_vec_iterator);
228-
Poll::Ready(Some((Value::Object(obj), vec![])))
231+
Poll::Ready(Some((Value::Object(obj), errors)))
229232
} else {
230233
Poll::Pending
231234
}

0 commit comments

Comments
 (0)