Skip to content

Commit cab2ca2

Browse files
tvolkertcommit-bot@chromium.org
authored andcommitted
Update Socket to be a Stream<Uint8List>
Bug: #36900 Change-Id: I600c28aebbe35f9e650f969adf356dda4eb0cacd Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/104524 Commit-Queue: Todd Volkert <[email protected]> Reviewed-by: Lasse R.H. Nielsen <[email protected]>
1 parent a85f6ff commit cab2ca2

19 files changed

+77
-59
lines changed

CHANGELOG.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,28 @@
2828
* `File.openRead()`
2929
* `HttpRequest`
3030
* `HttpClientResponse`
31+
* `Socket`
32+
33+
**Possible errors and how to fix them**
34+
35+
* > The argument type 'Utf8Decoder' can't be assigned to the parameter type 'StreamTransformer<Uint8List, dynamic>'
36+
37+
> type 'Utf8Decoder' is not a subtype of type 'StreamTransformer' of 'streamTransformer'"
38+
39+
You can fix these call sites by updating your code to use
40+
`StreamTransformer.bind()` instead of `Stream.transform()`, like so:
41+
42+
*Before:* `stream.transform(utf8.decoder)`
43+
*After:* `utf8.decoder.bind(stream)`
44+
45+
* > The argument type 'IOSink' can't be assigned to the parameter type 'StreamConsumer<Uint8List>'
46+
47+
> type '_IOSinkImpl' is not a subtype of type 'StreamConsumer<Uint8List>' of 'streamConsumer'
48+
49+
You can fix these call sites by casting your stream instance to a `Stream<List<int>>` before calling `.pipe()` on the stream, like so:
50+
51+
*Before:* `stream.pipe(consumer)`
52+
*After:* `stream.cast<List<int>>().pipe(consumer)`
3153

3254
Finally, the following typed lists were updated to have their `sublist()`
3355
methods declare a return type that is the same as the source list:

DEPS

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ vars = {
8585
"fixnum_tag": "0.10.9",
8686
"glob_tag": "1.1.7",
8787
"html_tag" : "0.14.0+1",
88-
"http_io_rev": "0b05781c273a040ef521b5f7771dbc0356305872",
88+
"http_io_rev": "2fa188caf7937e313026557713f7feffedd4978b",
8989
"http_multi_server_tag" : "2.0.5",
9090
"http_parser_tag" : "3.1.3",
9191
"http_retry_tag": "0.1.1",

runtime/bin/socket_patch.dart

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1606,18 +1606,18 @@ class _SocketStreamConsumer extends StreamConsumer<List<int>> {
16061606
}
16071607
}
16081608

1609-
class _Socket extends Stream<List<int>> implements Socket {
1609+
class _Socket extends Stream<Uint8List> implements Socket {
16101610
RawSocket _raw; // Set to null when the raw socket is closed.
16111611
bool _closed = false; // Set to true when the raw socket is closed.
1612-
StreamController<List<int>> _controller;
1612+
StreamController<Uint8List> _controller;
16131613
bool _controllerClosed = false;
16141614
_SocketStreamConsumer _consumer;
16151615
IOSink _sink;
16161616
var _subscription;
16171617
var _detachReady;
16181618

16191619
_Socket(this._raw) {
1620-
_controller = new StreamController<List<int>>(
1620+
_controller = new StreamController<Uint8List>(
16211621
sync: true,
16221622
onListen: _onSubscriptionStateChange,
16231623
onCancel: _onSubscriptionStateChange,
@@ -1647,7 +1647,7 @@ class _Socket extends Stream<List<int>> implements Socket {
16471647
// is Socket and not _NativeSocket.
16481648
_NativeSocket get _nativeSocket => (_raw as _RawSocket)._socket;
16491649

1650-
StreamSubscription<List<int>> listen(void onData(List<int> event),
1650+
StreamSubscription<Uint8List> listen(void onData(Uint8List event),
16511651
{Function onError, void onDone(), bool cancelOnError}) {
16521652
return _controller.stream.listen(onData,
16531653
onError: onError, onDone: onDone, cancelOnError: cancelOnError);

runtime/observatory/tests/service/tcp_socket_service_test.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ Future setupTCP() async {
1414
// to allow us to query them from the other isolate.
1515
var serverSocket = await io.ServerSocket.bind('127.0.0.1', 0);
1616
serverSocket.listen((s) {
17-
s.transform(utf8.decoder).listen(print);
17+
utf8.decoder.bind(s).listen(print);
1818
s.close();
1919
});
2020
var socket = await io.Socket.connect("127.0.0.1", serverSocket.port);

sdk/lib/_http/http_impl.dart

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2892,13 +2892,13 @@ class _HttpConnectionInfo implements HttpConnectionInfo {
28922892
}
28932893
}
28942894

2895-
class _DetachedSocket extends Stream<List<int>> implements Socket {
2896-
final Stream<List<int>> _incoming;
2895+
class _DetachedSocket extends Stream<Uint8List> implements Socket {
2896+
final Stream<Uint8List> _incoming;
28972897
final Socket _socket;
28982898

28992899
_DetachedSocket(this._socket, this._incoming);
29002900

2901-
StreamSubscription<List<int>> listen(void onData(List<int> event),
2901+
StreamSubscription<Uint8List> listen(void onData(Uint8List event),
29022902
{Function onError, void onDone(), bool cancelOnError}) {
29032903
return _incoming.listen(onData,
29042904
onError: onError, onDone: onDone, cancelOnError: cancelOnError);

sdk/lib/_http/http_parser.dart

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,9 @@ class _MessageType {
108108
* _HttpDetachedStreamSubscription is resumed, it'll deliver the data before
109109
* resuming the underlaying subscription.
110110
*/
111-
class _HttpDetachedStreamSubscription implements StreamSubscription<List<int>> {
112-
StreamSubscription<List<int>> _subscription;
113-
List<int> _injectData;
111+
class _HttpDetachedStreamSubscription implements StreamSubscription<Uint8List> {
112+
StreamSubscription<Uint8List> _subscription;
113+
Uint8List _injectData;
114114
bool _isCanceled = false;
115115
int _pauseCount = 1;
116116
Function _userOnData;
@@ -130,7 +130,7 @@ class _HttpDetachedStreamSubscription implements StreamSubscription<List<int>> {
130130
return _subscription.cancel();
131131
}
132132

133-
void onData(void handleData(List<int> data)) {
133+
void onData(void handleData(Uint8List data)) {
134134
_userOnData = handleData;
135135
_subscription.onData(handleData);
136136
}
@@ -182,13 +182,13 @@ class _HttpDetachedStreamSubscription implements StreamSubscription<List<int>> {
182182
}
183183
}
184184

185-
class _HttpDetachedIncoming extends Stream<List<int>> {
186-
final StreamSubscription<List<int>> subscription;
187-
final List<int> bufferedData;
185+
class _HttpDetachedIncoming extends Stream<Uint8List> {
186+
final StreamSubscription<Uint8List> subscription;
187+
final Uint8List bufferedData;
188188

189189
_HttpDetachedIncoming(this.subscription, this.bufferedData);
190190

191-
StreamSubscription<List<int>> listen(void onData(List<int> event),
191+
StreamSubscription<Uint8List> listen(void onData(Uint8List event),
192192
{Function onError, void onDone(), bool cancelOnError}) {
193193
if (subscription != null) {
194194
subscription
@@ -203,7 +203,7 @@ class _HttpDetachedIncoming extends Stream<List<int>> {
203203
..resume();
204204
} else {
205205
// TODO(26379): add test for this branch.
206-
return new Stream<List<int>>.fromIterable([bufferedData]).listen(onData,
206+
return new Stream<Uint8List>.fromIterable([bufferedData]).listen(onData,
207207
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
208208
}
209209
}
@@ -261,7 +261,7 @@ class _HttpParser extends Stream<_HttpIncoming> {
261261

262262
// The current incoming connection.
263263
_HttpIncoming _incoming;
264-
StreamSubscription<List<int>> _socketSubscription;
264+
StreamSubscription<Uint8List> _socketSubscription;
265265
bool _paused = true;
266266
bool _bodyPaused = false;
267267
StreamController<_HttpIncoming> _controller;
@@ -303,7 +303,7 @@ class _HttpParser extends Stream<_HttpIncoming> {
303303
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
304304
}
305305

306-
void listenToStream(Stream<List<int>> stream) {
306+
void listenToStream(Stream<Uint8List> stream) {
307307
// Listen to the stream and handle data accordingly. When a
308308
// _HttpIncoming is created, _dataPause, _dataResume, _dataDone is
309309
// given to provide a way of controlling the parser.
@@ -802,7 +802,7 @@ class _HttpParser extends Stream<_HttpIncoming> {
802802
}
803803
}
804804

805-
void _onData(List<int> buffer) {
805+
void _onData(Uint8List buffer) {
806806
_socketSubscription.pause();
807807
assert(_buffer == null);
808808
_buffer = buffer;
@@ -888,7 +888,7 @@ class _HttpParser extends Stream<_HttpIncoming> {
888888
return new _HttpDetachedIncoming(_socketSubscription, readUnparsedData());
889889
}
890890

891-
List<int> readUnparsedData() {
891+
Uint8List readUnparsedData() {
892892
if (_buffer == null) return null;
893893
if (_index == _buffer.length) return null;
894894
var result = _buffer.sublist(_index);

sdk/lib/_http/websocket_impl.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1128,7 +1128,7 @@ class _WebSocketImpl extends Stream with _ServiceObject implements WebSocket {
11281128
_deflate = deflate;
11291129

11301130
var transformer = new _WebSocketProtocolTransformer(_serverSide, _deflate);
1131-
_subscription = _socket.transform(transformer).listen((data) {
1131+
_subscription = transformer.bind(_socket).listen((data) {
11321132
if (data is _WebSocketPing) {
11331133
if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
11341134
} else if (data is _WebSocketPong) {

sdk/lib/io/socket.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -710,7 +710,7 @@ abstract class RawSocket implements Stream<RawSocketEvent> {
710710
* The [Socket] exposes both a [Stream] and a [IOSink] interface, making it
711711
* ideal for using together with other [Stream]s.
712712
*/
713-
abstract class Socket implements Stream<List<int>>, IOSink {
713+
abstract class Socket implements Stream<Uint8List>, IOSink {
714714
/**
715715
* Creates a new socket connection to the host and port and returns a [Future]
716716
* that will complete with either a [Socket] once connected or an error

tests/compiler/dart2js/analyses/api_allowed.json

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,8 +237,6 @@
237237
"Dynamic invocation of 'close'.": 1
238238
},
239239
"org-dartlang-sdk:///sdk/lib/_http/websocket_impl.dart": {
240-
"Dynamic invocation of 'transform'.": 1,
241-
"Dynamic invocation of 'listen'.": 1,
242240
"Dynamic access of 'address'.": 1,
243241
"Dynamic access of 'host'.": 1,
244242
"Dynamic access of 'port'.": 1,

tests/standalone_2/io/client_socket_add_close_no_error_test.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ void clientSocketAddCloseNoErrorTest() {
1515
var completer = new Completer();
1616
server.listen((socket) {
1717
// The socket is 'paused' until the future completes.
18-
completer.future.then((_) => socket.pipe(socket));
18+
completer.future.then((_) => socket.cast<List<int>>().pipe(socket));
1919
});
2020
Socket.connect("127.0.0.1", server.port).then((client) {
2121
const int SIZE = 1024 * 1024;

0 commit comments

Comments
 (0)