Skip to content
This repository was archived by the owner on Oct 28, 2024. It is now read-only.

Avoid an open Peer with a closed Client #65

Merged
merged 1 commit into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions lib/src/peer.dart
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ class Peer implements Client, Server {
/// they're responses.
final _clientIncomingForwarder = StreamController(sync: true);

final _done = Completer<void>();
Future<void> _done;
@override
Future get done => _done.future;
Future get done => _done ??= Future.wait([_client.done, _server.done]);
@override
bool get isClosed => _done.isCompleted;
bool get isClosed => _client.isClosed || _server.isClosed;

@override
ErrorCallback get onUnhandledError => _server?.onUnhandledError;
Expand Down Expand Up @@ -142,15 +142,15 @@ class Peer implements Client, Server {
_serverIncomingForwarder.add(message);
}
}, onError: (error, stackTrace) {
_done.completeError(error, stackTrace);
_channel.sink.close();
}, onDone: () {
if (!_done.isCompleted) _done.complete();
close();
});
_serverIncomingForwarder.addError(error, stackTrace);
}, onDone: close);
return done;
}

@override
Future close() => Future.wait([_client.close(), _server.close()]);
Future close() {
_client.close();
_server.close();
return done;
}
}
15 changes: 15 additions & 0 deletions test/peer_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,21 @@ void main() {
await peer.close();
});

test('considered closed with misbehaving StreamChannel', () async {
// If a StreamChannel does not enforce the guarantees stated in it's
// contract - specifically that "Closing the sink causes the stream to close
// before it emits any more events." - The `Peer` should still understand
// when it has been closed manually.
var channel = StreamChannel(
StreamController().stream,
StreamController(),
);
var peer = json_rpc.Peer.withoutJson(channel);
unawaited(peer.listen());
unawaited(peer.close());
expect(peer.isClosed, true);
});

group('like a server,', () {
test('can receive a call and return a response', () {
expect(outgoing.first,
Expand Down