Skip to content

Commit bd6453e

Browse files
committed
update pre-existing juniper_warp::subscriptions
1 parent 7e01f74 commit bd6453e

File tree

1 file changed

+19
-6
lines changed

1 file changed

+19
-6
lines changed

juniper_warp/src/lib.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -419,11 +419,20 @@ pub mod subscriptions {
419419

420420
use anyhow::anyhow;
421421
use futures::{channel::mpsc, Future, StreamExt as _, TryFutureExt as _, TryStreamExt as _};
422-
use juniper::{http::GraphQLRequest, InputValue, ScalarValue, SubscriptionCoordinator as _};
422+
use juniper::{
423+
http::GraphQLRequest, ExecutionError, InputValue, ScalarValue,
424+
SubscriptionCoordinator as _, Value,
425+
};
423426
use juniper_subscriptions::Coordinator;
424427
use serde::{Deserialize, Serialize};
425428
use warp::ws::Message;
426429

430+
#[derive(Serialize)]
431+
struct DataPayload<'a, S: ScalarValue> {
432+
data: &'a Value<S>,
433+
errors: &'a Vec<ExecutionError<S>>,
434+
}
435+
427436
/// Listen to incoming messages and do one of the following:
428437
/// - execute subscription and return values from stream
429438
/// - stop stream and close ws connection
@@ -547,15 +556,19 @@ pub mod subscriptions {
547556
};
548557

549558
values_stream
550-
.take_while(move |response| {
559+
.take_while(move |(data, errors)| {
551560
let request_id = request_id.clone();
552561
let should_stop = state.should_stop.load(Ordering::Relaxed)
553562
|| got_close_signal.load(Ordering::Relaxed);
554563
if !should_stop {
555-
let mut response_text = serde_json::to_string(
556-
&response,
557-
)
558-
.unwrap_or("Error deserializing response".to_owned());
564+
let mut response_text =
565+
serde_json::to_string(&DataPayload {
566+
data,
567+
errors,
568+
})
569+
.unwrap_or(
570+
"Error deserializing response".to_owned(),
571+
);
559572

560573
response_text = format!(
561574
r#"{{"type":"data","id":"{}","payload":{} }}"#,

0 commit comments

Comments
 (0)