From c3a58b334debac912c8165f332d62727223a64ce Mon Sep 17 00:00:00 2001 From: Rapsssito Date: Sun, 27 Jun 2021 12:21:44 +0200 Subject: [PATCH 01/15] Add examples --- examples/tcpsockets/App.js | 6 ++- examples/tcpsockets/examples/drain-event.js | 50 +++++++++++++++++++++ examples/tcpsockets/examples/main.js | 8 +++- examples/tcpsockets/ios/Podfile.lock | 4 +- src/Socket.js | 1 + 5 files changed, 64 insertions(+), 5 deletions(-) create mode 100644 examples/tcpsockets/examples/drain-event.js diff --git a/examples/tcpsockets/App.js b/examples/tcpsockets/App.js index f5aa808..344fb1d 100644 --- a/examples/tcpsockets/App.js +++ b/examples/tcpsockets/App.js @@ -7,7 +7,7 @@ import React from 'react'; import { ScrollView, StyleSheet, Text, View } from 'react-native'; -import { init, server, client } from './examples/echo'; +import { init, server, client } from './examples/drain-event'; class App extends React.Component { /** @@ -60,6 +60,10 @@ class App extends React.Component { this.updateChatter('Opened client on ' + JSON.stringify(client.address())); }); + client.on('drain', () => { + this.updateChatter('Client drained'); + }); + client.on('data', (data) => { this.updateChatter('Client received: ' + data); }); diff --git a/examples/tcpsockets/examples/drain-event.js b/examples/tcpsockets/examples/drain-event.js new file mode 100644 index 0000000..4df0415 --- /dev/null +++ b/examples/tcpsockets/examples/drain-event.js @@ -0,0 +1,50 @@ +const net = require('net'); +const PORT = Number(9 + (Math.random() * 999).toFixed(0)); + +const server = new net.Server(); +const client = new net.Socket(); + +function init() { + server.listen({ port: PORT, host: '127.0.0.1', reuseAddress: true }); + + client.connect( + // @ts-ignore + { + port: PORT, + host: '127.0.0.1', + localAddress: '127.0.0.1', + reuseAddress: true, + // localPort: 20000, + // interface: "wifi", + // tls: true + }, + () => { + let i = 0; + const MAX_ITER = 1000000; + write(); + function write() { + let ok = true; + do { + i++; + if (i === 0) { + // Last time! + client.write(''+i+','); + } else { + // See if we should continue, or wait. + // Don't pass the callback, because we're not done yet. + ok = client.write(''+i+','); + } + } while (i < MAX_ITER && ok); + if (!ok) { + // Had to stop early! + // Write some more once it drains. + client.once('drain', write); + } else { + client.destroy(); + } + } + } + ); +} + +module.exports = { init, server, client }; diff --git a/examples/tcpsockets/examples/main.js b/examples/tcpsockets/examples/main.js index 5609389..2d10761 100644 --- a/examples/tcpsockets/examples/main.js +++ b/examples/tcpsockets/examples/main.js @@ -1,11 +1,11 @@ // Execute this file using NodeJS -const { init, server, client } = require('./echo'); +const { init, server, client } = require('./drain-event'); server.on('connection', (socket) => { console.log('Client connected to server on ' + JSON.stringify(socket.address())); socket.on('data', (data) => { - console.log('Server client received: ' + data); + // console.log('Server client received: ' + data); }); socket.on('error', (error) => { @@ -29,6 +29,10 @@ client.on('connect', () => { console.log('Opened client on ' + JSON.stringify(client.address())); }); +client.on('drain', () => { + console.log('Client drained'); +}); + client.on('data', (data) => { console.log('Client received: ' + data); }); diff --git a/examples/tcpsockets/ios/Podfile.lock b/examples/tcpsockets/ios/Podfile.lock index 3942a33..8e0972a 100644 --- a/examples/tcpsockets/ios/Podfile.lock +++ b/examples/tcpsockets/ios/Podfile.lock @@ -186,7 +186,7 @@ PODS: - React-cxxreact (= 0.63.2) - React-jsi (= 0.63.2) - React-jsinspector (0.63.2) - - react-native-tcp-socket (5.1.0): + - react-native-tcp-socket (5.2.1): - CocoaAsyncSocket - React-Core - React-RCTActionSheet (0.63.2): @@ -361,7 +361,7 @@ SPEC CHECKSUMS: React-jsi: 54245e1d5f4b690dec614a73a3795964eeef13a8 React-jsiexecutor: 8ca588cc921e70590820ce72b8789b02c67cce38 React-jsinspector: b14e62ebe7a66e9231e9581279909f2fc3db6606 - react-native-tcp-socket: 334094926111f66bd6b305859aac4d07745b8d81 + react-native-tcp-socket: d05b8acbf1c045ff5b6a92ec6ca91960c94387d1 React-RCTActionSheet: 910163b6b09685a35c4ebbc52b66d1bfbbe39fc5 React-RCTAnimation: 9a883bbe1e9d2e158d4fb53765ed64c8dc2200c6 React-RCTBlob: 39cf0ece1927996c4466510e25d2105f67010e13 diff --git a/src/Socket.js b/src/Socket.js index ab89e4f..a1780ed 100644 --- a/src/Socket.js +++ b/src/Socket.js @@ -280,6 +280,7 @@ export default class Socket extends EventEmitter { } } ); + return true; } ref() { From a045bb546a9e05f5814703d893affdd27d25931d Mon Sep 17 00:00:00 2001 From: Rapsssito Date: Thu, 8 Jul 2021 17:25:30 +0200 Subject: [PATCH 02/15] Switch to event style TCP writes --- ios/TcpSocketClient.h | 6 +++--- ios/TcpSocketClient.m | 21 +++++++++------------ ios/TcpSockets.m | 17 +++++++++++++---- src/Socket.js | 29 +++++++++++++++++++---------- 4 files changed, 44 insertions(+), 29 deletions(-) diff --git a/ios/TcpSocketClient.h b/ios/TcpSocketClient.h index 17825be..fa61915 100644 --- a/ios/TcpSocketClient.h +++ b/ios/TcpSocketClient.h @@ -27,6 +27,7 @@ typedef enum RCTTCPError RCTTCPError; - (void)onData:(NSNumber *)clientID data:(NSData *)data; - (void)onClose:(TcpSocketClient*)client withError:(NSError *)err; - (void)onError:(TcpSocketClient*)client withError:(NSError *)err; +- (void)onWrittenData:(TcpSocketClient*)client msgId:(NSNumber *)msgId; - (NSNumber*)getNextId; @end @@ -56,8 +57,7 @@ typedef enum RCTTCPError RCTTCPError; ///--------------------------------------------------------------------------------------- /** * Connects to a host and port - * - * @param port + * @param port port * @param host ip address * @param options NSDictionary which can have @"localAddress" and @"localPort" to specify the local interface * @return true if connected, false if there was an error @@ -83,7 +83,7 @@ typedef enum RCTTCPError RCTTCPError; * write data * */ -- (void)writeData:(NSData*) data callback:(RCTResponseSenderBlock) callback; +- (void)writeData:(NSData*) data msgId:(NSNumber*)msgId; /** * end client diff --git a/ios/TcpSocketClient.m b/ios/TcpSocketClient.m index a3d9b15..39c48d2 100644 --- a/ios/TcpSocketClient.m +++ b/ios/TcpSocketClient.m @@ -14,7 +14,7 @@ @interface TcpSocketClient() BOOL _checkValidity; NSString *_certPath; GCDAsyncSocket *_tcpSocket; - NSMutableDictionary *_pendingSends; + NSMutableDictionary *_pendingSends; NSLock *_lock; long _sendTag; } @@ -188,18 +188,18 @@ - (BOOL)listen:(NSDictionary *)options error:(NSError **)error return isListening; } -- (void)setPendingSend:(RCTResponseSenderBlock)callback forKey:(NSNumber *)key +- (void)setPendingSend:(NSNumber *)msgId forKey:(NSNumber *)key { [_lock lock]; @try { - [_pendingSends setObject:callback forKey:key]; + [_pendingSends setObject:msgId forKey:key]; } @finally { [_lock unlock]; } } -- (RCTResponseSenderBlock)getPendingSend:(NSNumber *)key +- (NSNumber*)getPendingSend:(NSNumber *)key { [_lock lock]; @try { @@ -224,19 +224,16 @@ - (void)dropPendingSend:(NSNumber *)key - (void)socket:(GCDAsyncSocket *)sock didWriteDataWithTag:(long)msgTag { NSNumber* tagNum = [NSNumber numberWithLong:msgTag]; - RCTResponseSenderBlock callback = [self getPendingSend:tagNum]; - if (callback) { - callback(@[]); + NSNumber* msgId = [self getPendingSend:tagNum]; + if (msgId) { + [_clientDelegate onWrittenData:self msgId:msgId]; [self dropPendingSend:tagNum]; } } -- (void) writeData:(NSData *)data - callback:(RCTResponseSenderBlock)callback +- (void) writeData:(NSData *)data msgId:(NSNumber*)msgId { - if (callback) { - [self setPendingSend:callback forKey:@(_sendTag)]; - } + [self setPendingSend:msgId forKey:@(_sendTag)]; [_tcpSocket writeData:data withTimeout:-1 tag:_sendTag]; _sendTag++; diff --git a/ios/TcpSockets.m b/ios/TcpSockets.m index 5295841..2c508e0 100644 --- a/ios/TcpSockets.m +++ b/ios/TcpSockets.m @@ -24,7 +24,8 @@ @implementation TcpSockets @"connection", @"data", @"close", - @"error"]; + @"error", + @"written"]; } - (void)startObserving { @@ -82,15 +83,15 @@ - (TcpSocketClient *)createSocket:(nonnull NSNumber*)cId } RCT_EXPORT_METHOD(write:(nonnull NSNumber*)cId - string:(NSString *)base64String - callback:(RCTResponseSenderBlock)callback) { + string:(nonnull NSString*)base64String + callback:(nonnull NSNumber*)msgId) { TcpSocketClient* client = [self findClient:cId]; if (!client) return; // iOS7+ // TODO: use https://github.com/nicklockwood/Base64 for compatibility with earlier iOS versions NSData *data = [[NSData alloc] initWithBase64EncodedString:base64String options:0]; - [client writeData:data callback:callback]; + [client writeData:data msgId:msgId]; } RCT_EXPORT_METHOD(end:(nonnull NSNumber*)cId) { @@ -135,6 +136,14 @@ - (TcpSocketClient *)createSocket:(nonnull NSNumber*)cId [client setKeepAlive:enable initialDelay:initialDelay]; } +- (void)onWrittenData:(TcpSocketClient*) client msgId:(NSNumber *)msgId +{ + [self sendEventWithName:@"written" body:@{ + @"id": client.id, + @"msgId": msgId, + }]; +} + - (void)onConnect:(TcpSocketClient*) client { GCDAsyncSocket * socket = [client getSocket]; diff --git a/src/Socket.js b/src/Socket.js index a1780ed..4cdf918 100644 --- a/src/Socket.js +++ b/src/Socket.js @@ -54,6 +54,8 @@ export default class Socket extends EventEmitter { this._state = STATE.DISCONNECTED; /** @private */ this._encoding = undefined; + /** @private */ + this._msgId = 0; this.localAddress = undefined; this.localPort = undefined; this.remoteAddress = undefined; @@ -264,22 +266,23 @@ export default class Socket extends EventEmitter { const self = this; if (this._state === STATE.DISCONNECTED) throw new Error('Socket is not connected.'); - callback = callback || (() => {}); const generatedBuffer = this._generateSendBuffer(buffer, encoding); - Sockets.write( - this._id, - generatedBuffer.toString('base64'), - /** - * @param {string} err - */ - function(err) { + const currentMsgId = this._msgId++ % Number.MAX_SAFE_INTEGER; + // @ts-ignore + const writtenListener = this.on('written', (msgId) => { + // Callback equivalent + if (msgId === currentMsgId) { if (self._timeout) self._activateTimer(); if (callback) { - if (err) return callback(err); + // TODO + // if (evt.err) return callback(evt.err); callback(null); } + // @ts-ignore + this.removeListener(writtenListener); } - ); + }); + Sockets.write(this._id, generatedBuffer.toString('base64'), currentMsgId); return true; } @@ -317,6 +320,11 @@ export default class Socket extends EventEmitter { this._setConnected(evt.connection); this.emit('connect'); }); + this._writtenListener = this._eventEmitter.addListener('written', (evt) => { + if (evt.id !== this._id) return; + // @ts-ignore + this.emit('written', evt.msgId); + }); } /** @@ -327,6 +335,7 @@ export default class Socket extends EventEmitter { this._errorListener?.remove(); this._closeListener?.remove(); this._connectListener?.remove(); + this._writtenListener?.remove(); } /** From b98f3d25e137414eab4c756ccf3a94d3e5352654 Mon Sep 17 00:00:00 2001 From: Rapsssito Date: Thu, 8 Jul 2021 18:22:10 +0200 Subject: [PATCH 03/15] Add drain event --- examples/tcpsockets/examples/main.js | 2 +- src/Socket.js | 50 +++++++++++++++++++--------- 2 files changed, 35 insertions(+), 17 deletions(-) diff --git a/examples/tcpsockets/examples/main.js b/examples/tcpsockets/examples/main.js index 2d10761..3dca5b1 100644 --- a/examples/tcpsockets/examples/main.js +++ b/examples/tcpsockets/examples/main.js @@ -5,7 +5,7 @@ server.on('connection', (socket) => { console.log('Client connected to server on ' + JSON.stringify(socket.address())); socket.on('data', (data) => { - // console.log('Server client received: ' + data); + console.log('Server client received: ' + data); }); socket.on('error', (error) => { diff --git a/src/Socket.js b/src/Socket.js index 4cdf918..78a388d 100644 --- a/src/Socket.js +++ b/src/Socket.js @@ -34,7 +34,7 @@ const STATE = { * tlsCert?: any, * }} ConnectionOptions * - * @extends {EventEmitter<'connect' | 'timeout' | 'data' | 'error' | 'close', any>} + * @extends {EventEmitter<'connect' | 'timeout' | 'data' | 'error' | 'close' | 'drain', any>} */ export default class Socket extends EventEmitter { /** @@ -56,6 +56,11 @@ export default class Socket extends EventEmitter { this._encoding = undefined; /** @private */ this._msgId = 0; + /** @private */ + this._lastRcvMsgId = Number.MAX_SAFE_INTEGER - 1; + /** @private */ + this._lastSentMsgId = 0; + this.writableNeedDrain = false; this.localAddress = undefined; this.localPort = undefined; this.remoteAddress = undefined; @@ -267,23 +272,36 @@ export default class Socket extends EventEmitter { if (this._state === STATE.DISCONNECTED) throw new Error('Socket is not connected.'); const generatedBuffer = this._generateSendBuffer(buffer, encoding); - const currentMsgId = this._msgId++ % Number.MAX_SAFE_INTEGER; - // @ts-ignore - const writtenListener = this.on('written', (msgId) => { - // Callback equivalent - if (msgId === currentMsgId) { - if (self._timeout) self._activateTimer(); - if (callback) { - // TODO - // if (evt.err) return callback(evt.err); - callback(null); + const currentMsgId = this._msgId; + this._msgId = (this._msgId + 1) % Number.MAX_SAFE_INTEGER; + const writtenListener = this.on( + // @ts-ignore + 'written', + (msgId) => { + // Callback equivalent + if (msgId === currentMsgId) { + // @ts-ignore + this.removeListener(writtenListener); + this._lastRcvMsgId = msgId; + if (self._timeout) self._activateTimer(); + if (this.writableNeedDrain && this._lastSentMsgId == msgId) { + this.writableNeedDrain = true; + this.emit('drain'); + } + if (callback) { + // TODO + // if (evt.err) return callback(evt.err); + callback(null); + } } - // @ts-ignore - this.removeListener(writtenListener); - } - }); + }, + this + ); + const ok = (this._lastRcvMsgId + 1) % Number.MAX_SAFE_INTEGER == currentMsgId; + if (!ok) this.writableNeedDrain = true; + this._lastSentMsgId = currentMsgId; Sockets.write(this._id, generatedBuffer.toString('base64'), currentMsgId); - return true; + return ok; } ref() { From f0948139cec7915290919703df91e0a8a5e88837 Mon Sep 17 00:00:00 2001 From: Rapsssito Date: Thu, 8 Jul 2021 18:39:35 +0200 Subject: [PATCH 04/15] Fix this.writableNeedDrain --- src/Socket.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Socket.js b/src/Socket.js index 78a388d..597a4cc 100644 --- a/src/Socket.js +++ b/src/Socket.js @@ -285,7 +285,7 @@ export default class Socket extends EventEmitter { this._lastRcvMsgId = msgId; if (self._timeout) self._activateTimer(); if (this.writableNeedDrain && this._lastSentMsgId == msgId) { - this.writableNeedDrain = true; + this.writableNeedDrain = false; this.emit('drain'); } if (callback) { From 99e5b165459aeb50e837356cd593ee4fe913f7cc Mon Sep 17 00:00:00 2001 From: Rapsssito Date: Mon, 23 Aug 2021 15:09:17 +0200 Subject: [PATCH 05/15] Solve ts incompatibility --- src/Socket.js | 44 ++++++++++++++++++++------------------------ 1 file changed, 20 insertions(+), 24 deletions(-) diff --git a/src/Socket.js b/src/Socket.js index 597a4cc..95f1d4b 100644 --- a/src/Socket.js +++ b/src/Socket.js @@ -46,6 +46,8 @@ export default class Socket extends EventEmitter { this._id = undefined; /** @private */ this._eventEmitter = nativeEventEmitter; + /** @type {EventEmitter<'written', any>} @private */ + this._msgEvtEmitter = new EventEmitter(); /** @type {number} @private */ this._timeoutMsecs = 0; /** @private */ @@ -274,29 +276,24 @@ export default class Socket extends EventEmitter { const generatedBuffer = this._generateSendBuffer(buffer, encoding); const currentMsgId = this._msgId; this._msgId = (this._msgId + 1) % Number.MAX_SAFE_INTEGER; - const writtenListener = this.on( - // @ts-ignore - 'written', - (msgId) => { - // Callback equivalent - if (msgId === currentMsgId) { - // @ts-ignore - this.removeListener(writtenListener); - this._lastRcvMsgId = msgId; - if (self._timeout) self._activateTimer(); - if (this.writableNeedDrain && this._lastSentMsgId == msgId) { - this.writableNeedDrain = false; - this.emit('drain'); - } - if (callback) { - // TODO - // if (evt.err) return callback(evt.err); - callback(null); - } + const msgEvtHandler = (/** @type {number} */ msgId) => { + // Callback equivalent + if (msgId === currentMsgId) { + this._msgEvtEmitter.removeListener('written', msgEvtHandler); + this._lastRcvMsgId = msgId; + if (self._timeout) self._activateTimer(); + if (this.writableNeedDrain && this._lastSentMsgId == msgId) { + this.writableNeedDrain = false; + this.emit('drain'); } - }, - this - ); + if (callback) { + // TODO + // if (evt.err) return callback(evt.err); + callback(null); + } + } + }; + this._msgEvtEmitter.on('written', msgEvtHandler, this); const ok = (this._lastRcvMsgId + 1) % Number.MAX_SAFE_INTEGER == currentMsgId; if (!ok) this.writableNeedDrain = true; this._lastSentMsgId = currentMsgId; @@ -340,8 +337,7 @@ export default class Socket extends EventEmitter { }); this._writtenListener = this._eventEmitter.addListener('written', (evt) => { if (evt.id !== this._id) return; - // @ts-ignore - this.emit('written', evt.msgId); + this._msgEvtEmitter.emit('written', evt.msgId); }); } From c92889948492f41776891358cb17d7b9cfcfb01d Mon Sep 17 00:00:00 2001 From: Rapsssito Date: Mon, 23 Aug 2021 15:56:59 +0200 Subject: [PATCH 06/15] Add Android --- .../react/tcpsocket/TcpEventListener.java | 106 ++++++++++++++ .../react/tcpsocket/TcpReceiverTask.java | 28 +--- .../react/tcpsocket/TcpSocketClient.java | 6 +- .../react/tcpsocket/TcpSocketModule.java | 130 +++--------------- .../react/tcpsocket/TcpSocketServer.java | 9 +- 5 files changed, 133 insertions(+), 146 deletions(-) create mode 100644 android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java new file mode 100644 index 0000000..0981a3c --- /dev/null +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java @@ -0,0 +1,106 @@ +package com.asterinet.react.tcpsocket; + +import android.util.Base64; + +import com.facebook.react.bridge.Arguments; +import com.facebook.react.bridge.ReactContext; +import com.facebook.react.bridge.WritableMap; +import com.facebook.react.modules.core.DeviceEventManagerModule; + +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; + +public class TcpEventListener { + + private final DeviceEventManagerModule.RCTDeviceEventEmitter rctEvtEmitter; + + public TcpEventListener(final ReactContext reactContext) { + rctEvtEmitter = reactContext.getJSModule(DeviceEventManagerModule.RCTDeviceEventEmitter.class); + } + + public void onConnection(Integer serverId, Integer clientId, Socket socket) { + WritableMap eventParams = Arguments.createMap(); + eventParams.putInt("id", serverId); + + WritableMap infoParams = Arguments.createMap(); + infoParams.putInt("id", clientId); + + WritableMap connectionParams = Arguments.createMap(); + final InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); + + connectionParams.putString("localAddress", socket.getLocalAddress().getHostAddress()); + connectionParams.putInt("localPort", socket.getLocalPort()); + connectionParams.putString("remoteAddress", remoteAddress.getAddress().getHostAddress()); + connectionParams.putInt("remotePort", socket.getPort()); + connectionParams.putString("remoteFamily", remoteAddress.getAddress() instanceof Inet6Address ? "IPv6" : "IPv4"); + + infoParams.putMap("connection", connectionParams); + eventParams.putMap("info", infoParams); + + sendEvent("connection", eventParams); + } + + public void onConnect(Integer id, TcpSocketClient client) { + WritableMap eventParams = Arguments.createMap(); + eventParams.putInt("id", id); + WritableMap connectionParams = Arguments.createMap(); + final Socket socket = client.getSocket(); + final InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); + + connectionParams.putString("localAddress", socket.getLocalAddress().getHostAddress()); + connectionParams.putInt("localPort", socket.getLocalPort()); + connectionParams.putString("remoteAddress", remoteAddress.getAddress().getHostAddress()); + connectionParams.putInt("remotePort", socket.getPort()); + connectionParams.putString("remoteFamily", remoteAddress.getAddress() instanceof Inet6Address ? "IPv6" : "IPv4"); + eventParams.putMap("connection", connectionParams); + sendEvent("connect", eventParams); + } + + public void onListen(Integer id, TcpSocketServer server) { + WritableMap eventParams = Arguments.createMap(); + eventParams.putInt("id", id); + WritableMap connectionParams = Arguments.createMap(); + final ServerSocket serverSocket = server.getServerSocket(); + final InetAddress address = serverSocket.getInetAddress(); + + connectionParams.putString("localAddress", serverSocket.getInetAddress().getHostAddress()); + connectionParams.putInt("localPort", serverSocket.getLocalPort()); + connectionParams.putString("localFamily", address instanceof Inet6Address ? "IPv6" : "IPv4"); + eventParams.putMap("connection", connectionParams); + sendEvent("listening", eventParams); + } + + public void onData(Integer id, byte[] data) { + WritableMap eventParams = Arguments.createMap(); + eventParams.putInt("id", id); + eventParams.putString("data", Base64.encodeToString(data, Base64.NO_WRAP)); + + sendEvent("data", eventParams); + } + + public void onClose(Integer id, String error) { + if (error != null) { + onError(id, error); + } + WritableMap eventParams = Arguments.createMap(); + eventParams.putInt("id", id); + eventParams.putBoolean("hadError", error != null); + + sendEvent("close", eventParams); + } + + public void onError(Integer id, String error) { + WritableMap eventParams = Arguments.createMap(); + eventParams.putInt("id", id); + eventParams.putString("error", error); + + sendEvent("error", eventParams); + } + + private void sendEvent(String eventName, WritableMap params) { + rctEvtEmitter.emit(eventName, params); + } +} diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpReceiverTask.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpReceiverTask.java index 764c879..7bdee33 100644 --- a/android/src/main/java/com/asterinet/react/tcpsocket/TcpReceiverTask.java +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpReceiverTask.java @@ -5,7 +5,6 @@ import java.io.BufferedInputStream; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.Arrays; import java.net.Socket; @@ -14,27 +13,26 @@ * notifies it's listener when data is received. This is not threadsafe, the listener * should handle synchronicity. */ -class TcpReceiverTask extends AsyncTask, Void, Void> { +class TcpReceiverTask extends AsyncTask, Void, Void> { /** * An infinite loop to block and read data from the socket. */ @SafeVarargs @Override - protected final Void doInBackground(Pair... params) { + protected final Void doInBackground(Pair... params) { if (params.length > 1) { throw new IllegalArgumentException("This task is only for a single socket/listener pair."); } TcpSocketClient clientSocket = params[0].first; - OnDataReceivedListener receiverListener = params[0].second; + TcpEventListener receiverListener = params[0].second; int socketId = clientSocket.getId(); Socket socket = clientSocket.getSocket(); byte[] buffer = new byte[8192]; - int bufferCount; try { BufferedInputStream in = new BufferedInputStream(socket.getInputStream()); while (!isCancelled() && !socket.isClosed()) { - bufferCount = in.read(buffer); + int bufferCount = in.read(buffer); if (bufferCount > 0) { receiverListener.onData(socketId, Arrays.copyOfRange(buffer, 0, bufferCount)); } else if (bufferCount == -1) { @@ -49,22 +47,4 @@ protected final Void doInBackground(Pair mNetworkMap = new ConcurrentHashMap<>(); private final CurrentNetwork currentNetwork = new CurrentNetwork(); private final ExecutorService executorService = Executors.newFixedThreadPool(N_THREADS); + private TcpEventListener tcpEvtListener; public TcpSocketModule(ReactApplicationContext reactContext) { super(reactContext); mReactContext = reactContext; } + @Override + public void initialize() { + super.initialize(); + tcpEvtListener = new TcpEventListener(mReactContext); + } + @Override public @NonNull String getName() { return TAG; } - private void sendEvent(String eventName, WritableMap params) { - mReactContext - .getJSModule(DeviceEventManagerModule.RCTDeviceEventEmitter.class) - .emit(eventName, params); - } - /** * Creates a TCP Socket and establish a connection with the given host * @@ -76,7 +69,7 @@ public void connect(@NonNull final Integer cId, @NonNull final String host, @Non @Override public void run() { if (socketMap.get(cId) != null) { - onError(cId, TAG + "createSocket called twice with the same id."); + tcpEvtListener.onError(cId, TAG + "createSocket called twice with the same id."); return; } try { @@ -84,12 +77,12 @@ public void run() { final String localAddress = options.hasKey("localAddress") ? options.getString("localAddress") : null; final String iface = options.hasKey("interface") ? options.getString("interface") : null; selectNetwork(iface, localAddress); - TcpSocketClient client = new TcpSocketClient(TcpSocketModule.this, cId, null); + TcpSocketClient client = new TcpSocketClient(tcpEvtListener, cId, null); socketMap.put(cId, client); client.connect(mReactContext, host, port, options, currentNetwork.getNetwork()); - onConnect(cId, client); + tcpEvtListener.onConnect(cId, client); } catch (Exception e) { - onError(cId, e.getMessage()); + tcpEvtListener.onError(cId, e.getMessage()); } } })); @@ -112,7 +105,7 @@ public void run() { if (callback != null) { callback.invoke(e.toString()); } - onError(cId, e.toString()); + tcpEvtListener.onError(cId, e.toString()); } } })); @@ -159,11 +152,11 @@ public void listen(final Integer cId, final ReadableMap options) { @Override public void run() { try { - TcpSocketServer server = new TcpSocketServer(socketMap, TcpSocketModule.this, cId, options); + TcpSocketServer server = new TcpSocketServer(socketMap, tcpEvtListener, cId, options); socketMap.put(cId, server); - onListen(cId, server); + tcpEvtListener.onListen(cId, server); } catch (Exception uhe) { - onError(cId, uhe.getMessage()); + tcpEvtListener.onError(cId, uhe.getMessage()); } } })); @@ -176,7 +169,7 @@ public void setNoDelay(@NonNull final Integer cId, final boolean noDelay) { try { client.setNoDelay(noDelay); } catch (IOException e) { - onError(cId, e.getMessage()); + tcpEvtListener.onError(cId, e.getMessage()); } } @@ -187,7 +180,7 @@ public void setKeepAlive(@NonNull final Integer cId, final boolean enable, final try { client.setKeepAlive(enable, initialDelay); } catch (IOException e) { - onError(cId, e.getMessage()); + tcpEvtListener.onError(cId, e.getMessage()); } } @@ -253,93 +246,6 @@ private void selectNetwork(@Nullable final String iface, @Nullable final String mNetworkMap.put(iface + ipAddress, currentNetwork.getNetwork()); } - // TcpReceiverTask.OnDataReceivedListener - - @Override - public void onConnect(Integer id, TcpSocketClient client) { - WritableMap eventParams = Arguments.createMap(); - eventParams.putInt("id", id); - WritableMap connectionParams = Arguments.createMap(); - final Socket socket = client.getSocket(); - final InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); - - connectionParams.putString("localAddress", socket.getLocalAddress().getHostAddress()); - connectionParams.putInt("localPort", socket.getLocalPort()); - connectionParams.putString("remoteAddress", remoteAddress.getAddress().getHostAddress()); - connectionParams.putInt("remotePort", socket.getPort()); - connectionParams.putString("remoteFamily", remoteAddress.getAddress() instanceof Inet6Address ? "IPv6" : "IPv4"); - eventParams.putMap("connection", connectionParams); - sendEvent("connect", eventParams); - } - - @Override - public void onListen(Integer id, TcpSocketServer server) { - WritableMap eventParams = Arguments.createMap(); - eventParams.putInt("id", id); - WritableMap connectionParams = Arguments.createMap(); - final ServerSocket serverSocket = server.getServerSocket(); - final InetAddress address = serverSocket.getInetAddress(); - - connectionParams.putString("localAddress", serverSocket.getInetAddress().getHostAddress()); - connectionParams.putInt("localPort", serverSocket.getLocalPort()); - connectionParams.putString("localFamily", address instanceof Inet6Address ? "IPv6" : "IPv4"); - eventParams.putMap("connection", connectionParams); - sendEvent("listening", eventParams); - } - - @Override - public void onData(Integer id, byte[] data) { - WritableMap eventParams = Arguments.createMap(); - eventParams.putInt("id", id); - eventParams.putString("data", Base64.encodeToString(data, Base64.NO_WRAP)); - - sendEvent("data", eventParams); - } - - @Override - public void onClose(Integer id, String error) { - if (error != null) { - onError(id, error); - } - WritableMap eventParams = Arguments.createMap(); - eventParams.putInt("id", id); - eventParams.putBoolean("hadError", error != null); - - sendEvent("close", eventParams); - } - - @Override - public void onError(Integer id, String error) { - WritableMap eventParams = Arguments.createMap(); - eventParams.putInt("id", id); - eventParams.putString("error", error); - - sendEvent("error", eventParams); - } - - @Override - public void onConnection(Integer serverId, Integer clientId, Socket socket) { - WritableMap eventParams = Arguments.createMap(); - eventParams.putInt("id", serverId); - - WritableMap infoParams = Arguments.createMap(); - infoParams.putInt("id", clientId); - - WritableMap connectionParams = Arguments.createMap(); - final InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); - - connectionParams.putString("localAddress", socket.getLocalAddress().getHostAddress()); - connectionParams.putInt("localPort", socket.getLocalPort()); - connectionParams.putString("remoteAddress", remoteAddress.getAddress().getHostAddress()); - connectionParams.putInt("remotePort", socket.getPort()); - connectionParams.putString("remoteFamily", remoteAddress.getAddress() instanceof Inet6Address ? "IPv6" : "IPv4"); - - infoParams.putMap("connection", connectionParams); - eventParams.putMap("info", infoParams); - - sendEvent("connection", eventParams); - } - private TcpSocketClient getTcpClient(final int id) { TcpSocket socket = socketMap.get(id); if (socket == null) { diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketServer.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketServer.java index 5e955a7..9793d0b 100644 --- a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketServer.java +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketServer.java @@ -7,7 +7,6 @@ import java.io.IOException; import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ConcurrentHashMap; @@ -16,7 +15,7 @@ public final class TcpSocketServer extends TcpSocket { private ServerSocket serverSocket; - private TcpReceiverTask.OnDataReceivedListener mReceiverListener; + private final TcpEventListener mReceiverListener; private int clientSocketIds; private final ExecutorService executorService; private final ConcurrentHashMap socketClients; @@ -44,7 +43,7 @@ protected Void doInBackground(Object[] objects) { }; - public TcpSocketServer(final ConcurrentHashMap socketClients, final TcpReceiverTask.OnDataReceivedListener receiverListener, final Integer id, + public TcpSocketServer(final ConcurrentHashMap socketClients, final TcpEventListener receiverListener, final Integer id, final ReadableMap options) throws IOException { super(id); this.executorService = Executors.newFixedThreadPool(1); @@ -106,8 +105,4 @@ public void close() { mReceiverListener.onClose(getId(), e.getMessage()); } } - - public int getListeningPort() { - return (serverSocket == null) ? -1 : serverSocket.getLocalPort(); - } } From d2c5936e51b2acf9c18c5a90ef72dae0b4e4521d Mon Sep 17 00:00:00 2001 From: Rapsssito Date: Tue, 24 Aug 2021 11:54:30 +0200 Subject: [PATCH 07/15] Add Java event --- .../react/tcpsocket/TcpEventListener.java | 30 ++++++++++++------- .../react/tcpsocket/TcpSocketModule.java | 14 ++++----- examples/tcpsockets/android/gradlew | 0 examples/tcpsockets/android/gradlew.bat | 0 4 files changed, 25 insertions(+), 19 deletions(-) mode change 100644 => 100755 examples/tcpsockets/android/gradlew mode change 100644 => 100755 examples/tcpsockets/android/gradlew.bat diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java index 0981a3c..1462e3a 100644 --- a/android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java @@ -21,7 +21,7 @@ public TcpEventListener(final ReactContext reactContext) { rctEvtEmitter = reactContext.getJSModule(DeviceEventManagerModule.RCTDeviceEventEmitter.class); } - public void onConnection(Integer serverId, Integer clientId, Socket socket) { + public void onConnection(int serverId, int clientId, Socket socket) { WritableMap eventParams = Arguments.createMap(); eventParams.putInt("id", serverId); @@ -29,7 +29,7 @@ public void onConnection(Integer serverId, Integer clientId, Socket socket) { infoParams.putInt("id", clientId); WritableMap connectionParams = Arguments.createMap(); - final InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); + InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); connectionParams.putString("localAddress", socket.getLocalAddress().getHostAddress()); connectionParams.putInt("localPort", socket.getLocalPort()); @@ -43,12 +43,12 @@ public void onConnection(Integer serverId, Integer clientId, Socket socket) { sendEvent("connection", eventParams); } - public void onConnect(Integer id, TcpSocketClient client) { + public void onConnect(int id, TcpSocketClient client) { WritableMap eventParams = Arguments.createMap(); eventParams.putInt("id", id); WritableMap connectionParams = Arguments.createMap(); - final Socket socket = client.getSocket(); - final InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); + Socket socket = client.getSocket(); + InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); connectionParams.putString("localAddress", socket.getLocalAddress().getHostAddress()); connectionParams.putInt("localPort", socket.getLocalPort()); @@ -59,12 +59,12 @@ public void onConnect(Integer id, TcpSocketClient client) { sendEvent("connect", eventParams); } - public void onListen(Integer id, TcpSocketServer server) { + public void onListen(int id, TcpSocketServer server) { WritableMap eventParams = Arguments.createMap(); eventParams.putInt("id", id); WritableMap connectionParams = Arguments.createMap(); - final ServerSocket serverSocket = server.getServerSocket(); - final InetAddress address = serverSocket.getInetAddress(); + ServerSocket serverSocket = server.getServerSocket(); + InetAddress address = serverSocket.getInetAddress(); connectionParams.putString("localAddress", serverSocket.getInetAddress().getHostAddress()); connectionParams.putInt("localPort", serverSocket.getLocalPort()); @@ -73,7 +73,7 @@ public void onListen(Integer id, TcpSocketServer server) { sendEvent("listening", eventParams); } - public void onData(Integer id, byte[] data) { + public void onData(int id, byte[] data) { WritableMap eventParams = Arguments.createMap(); eventParams.putInt("id", id); eventParams.putString("data", Base64.encodeToString(data, Base64.NO_WRAP)); @@ -81,7 +81,15 @@ public void onData(Integer id, byte[] data) { sendEvent("data", eventParams); } - public void onClose(Integer id, String error) { + public void onWritten(int id, int msgId) { + WritableMap eventParams = Arguments.createMap(); + eventParams.putInt("id", id); + eventParams.putInt("msgId", msgId); + + sendEvent("written", eventParams); + } + + public void onClose(int id, String error) { if (error != null) { onError(id, error); } @@ -92,7 +100,7 @@ public void onClose(Integer id, String error) { sendEvent("close", eventParams); } - public void onError(Integer id, String error) { + public void onError(int id, String error) { WritableMap eventParams = Arguments.createMap(); eventParams.putInt("id", id); eventParams.putString("error", error); diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java index 9ccd17b..d87000a 100644 --- a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java @@ -13,7 +13,6 @@ import com.facebook.react.bridge.ReactContextBaseJavaModule; import com.facebook.react.bridge.ReactMethod; import com.facebook.react.bridge.ReadableMap; -import com.facebook.react.bridge.Callback; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; @@ -91,20 +90,19 @@ public void run() { @SuppressLint("StaticFieldLeak") @SuppressWarnings("unused") @ReactMethod - public void write(@NonNull final Integer cId, @NonNull final String base64String, @Nullable final Callback callback) { + public void write(final int cId, @NonNull final String base64String, final int msgId) { executorService.execute(new Thread(new Runnable() { @Override public void run() { TcpSocketClient socketClient = getTcpClient(cId); try { socketClient.write(Base64.decode(base64String, Base64.NO_WRAP)); - if (callback != null) { - callback.invoke(); - } + tcpEvtListener.onWritten(cId, msgId); } catch (IOException e) { - if (callback != null) { - callback.invoke(e.toString()); - } + // TODO + // if (callback != null) { + // callback.invoke(e.toString()); + //} tcpEvtListener.onError(cId, e.toString()); } } diff --git a/examples/tcpsockets/android/gradlew b/examples/tcpsockets/android/gradlew old mode 100644 new mode 100755 diff --git a/examples/tcpsockets/android/gradlew.bat b/examples/tcpsockets/android/gradlew.bat old mode 100644 new mode 100755 From d1bb026bb254633a1d75fdb70d9c3c34b3398c99 Mon Sep 17 00:00:00 2001 From: Rapsssito Date: Tue, 24 Aug 2021 12:55:13 +0200 Subject: [PATCH 08/15] Add error to callback --- .../react/tcpsocket/TcpEventListener.java | 5 ++- .../react/tcpsocket/TcpSocketModule.java | 7 ++--- examples/tcpsockets/examples/drain-event.js | 1 - examples/tcpsockets/examples/echo.js | 1 - src/Server.js | 8 ++++- src/Socket.js | 31 +++++++++++++------ 6 files changed, 34 insertions(+), 19 deletions(-) diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java index 1462e3a..dc28a1d 100644 --- a/android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java @@ -13,6 +13,8 @@ import java.net.ServerSocket; import java.net.Socket; +import javax.annotation.Nullable; + public class TcpEventListener { private final DeviceEventManagerModule.RCTDeviceEventEmitter rctEvtEmitter; @@ -81,10 +83,11 @@ public void onData(int id, byte[] data) { sendEvent("data", eventParams); } - public void onWritten(int id, int msgId) { + public void onWritten(int id, int msgId, @Nullable String error) { WritableMap eventParams = Arguments.createMap(); eventParams.putInt("id", id); eventParams.putInt("msgId", msgId); + eventParams.putString("err", error); sendEvent("written", eventParams); } diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java index d87000a..64dbc90 100644 --- a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java @@ -97,12 +97,9 @@ public void run() { TcpSocketClient socketClient = getTcpClient(cId); try { socketClient.write(Base64.decode(base64String, Base64.NO_WRAP)); - tcpEvtListener.onWritten(cId, msgId); + tcpEvtListener.onWritten(cId, msgId, null); } catch (IOException e) { - // TODO - // if (callback != null) { - // callback.invoke(e.toString()); - //} + tcpEvtListener.onWritten(cId, msgId, e.toString()); tcpEvtListener.onError(cId, e.toString()); } } diff --git a/examples/tcpsockets/examples/drain-event.js b/examples/tcpsockets/examples/drain-event.js index 4df0415..d3a47c0 100644 --- a/examples/tcpsockets/examples/drain-event.js +++ b/examples/tcpsockets/examples/drain-event.js @@ -8,7 +8,6 @@ function init() { server.listen({ port: PORT, host: '127.0.0.1', reuseAddress: true }); client.connect( - // @ts-ignore { port: PORT, host: '127.0.0.1', diff --git a/examples/tcpsockets/examples/echo.js b/examples/tcpsockets/examples/echo.js index 2d61064..56868ed 100644 --- a/examples/tcpsockets/examples/echo.js +++ b/examples/tcpsockets/examples/echo.js @@ -12,7 +12,6 @@ function init() { server.listen({ port: PORT, host: '127.0.0.1', reuseAddress: true }); client.connect( - // @ts-ignore { port: PORT, host: '127.0.0.1', diff --git a/src/Server.js b/src/Server.js index 89cd608..0a5bf3b 100644 --- a/src/Server.js +++ b/src/Server.js @@ -7,7 +7,13 @@ import Socket from './Socket'; import { nativeEventEmitter, getNextId } from './Globals'; /** - * @extends {EventEmitter<'connection' | 'listening' | 'error' | 'close', any>} + * @typedef {object} ServerEvents + * @property {() => void} close + * @property {(socket: Socket) => void} connection + * @property {() => void} listening + * @property {(err: Error) => void} error + * + * @extends {EventEmitter} */ export default class Server extends EventEmitter { /** diff --git a/src/Socket.js b/src/Socket.js index 95f1d4b..5600987 100644 --- a/src/Socket.js +++ b/src/Socket.js @@ -34,7 +34,15 @@ const STATE = { * tlsCert?: any, * }} ConnectionOptions * - * @extends {EventEmitter<'connect' | 'timeout' | 'data' | 'error' | 'close' | 'drain', any>} + * @typedef {object} SocketEvents + * @property {(had_error: boolean) => void} close + * @property {() => void} connect + * @property {(data: Buffer | string) => void} data + * @property {() => void} drain + * @property {(err: Error) => void} error + * @property {() => void} timeout + * + * @extends {EventEmitter} */ export default class Socket extends EventEmitter { /** @@ -263,21 +271,24 @@ export default class Socket extends EventEmitter { /** * Sends data on the socket. The second parameter specifies the encoding in the case of a string — it defaults to UTF8 encoding. * + * Returns `true` if the entire data was flushed successfully to the kernel buffer. Returns `false` if all or part of the data + * was queued in user memory. `'drain'` will be emitted when the buffer is again free. + * * The optional callback parameter will be executed when the data is finally written out, which may not be immediately. * * @param {string | Buffer | Uint8Array} buffer * @param {BufferEncoding} [encoding] - * @param {(error: string | null) => void} [callback] + * @param {(err?: Error) => void} [cb] */ - write(buffer, encoding, callback) { + write(buffer, encoding, cb) { const self = this; if (this._state === STATE.DISCONNECTED) throw new Error('Socket is not connected.'); const generatedBuffer = this._generateSendBuffer(buffer, encoding); const currentMsgId = this._msgId; this._msgId = (this._msgId + 1) % Number.MAX_SAFE_INTEGER; - const msgEvtHandler = (/** @type {number} */ msgId) => { - // Callback equivalent + const msgEvtHandler = (/** @type {{id: number, msgId: number, err?: string}} */ evt) => { + const { msgId, err } = evt; if (msgId === currentMsgId) { this._msgEvtEmitter.removeListener('written', msgEvtHandler); this._lastRcvMsgId = msgId; @@ -286,13 +297,13 @@ export default class Socket extends EventEmitter { this.writableNeedDrain = false; this.emit('drain'); } - if (callback) { - // TODO - // if (evt.err) return callback(evt.err); - callback(null); + if (cb) { + if (err) cb(new Error(err)); + else cb(); } } }; + // Callback equivalent with better performance this._msgEvtEmitter.on('written', msgEvtHandler, this); const ok = (this._lastRcvMsgId + 1) % Number.MAX_SAFE_INTEGER == currentMsgId; if (!ok) this.writableNeedDrain = true; @@ -337,7 +348,7 @@ export default class Socket extends EventEmitter { }); this._writtenListener = this._eventEmitter.addListener('written', (evt) => { if (evt.id !== this._id) return; - this._msgEvtEmitter.emit('written', evt.msgId); + this._msgEvtEmitter.emit('written', evt); }); } From 5c334c9148803f9ee5b8c658ac8bf59e5c880a6a Mon Sep 17 00:00:00 2001 From: Rapsssito Date: Tue, 24 Aug 2021 13:11:00 +0200 Subject: [PATCH 09/15] Update documentation --- README.md | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 31c09e6..86978c7 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ React Native TCP socket API for Android, iOS & macOS with **client SSL/TLS suppo - [Server](#server) - [SSL Client](#ssl-client) - [API](#api) - - [TcpSocket](#tcpsocket) + - [Socket](#socket) - [`createConnection()`](#createconnection) - [Server](#server-1) - [`listen()`](#listen) @@ -218,7 +218,7 @@ _Note: In order to use self-signed certificates make sure to [update your metro. ## API Here are listed all methods implemented in `react-native-tcp-socket`, their functionalities are equivalent to those provided by Node's [net](https://nodejs.org/api/net.html) (more info on [#41](https://github.com/Rapsssito/react-native-tcp-socket/issues/41)). However, the **methods whose interface differs from Node are marked in bold**. -### TcpSocket +### Socket * **Methods:** * **[`TcpSocket.createConnection(options[, callback])`](#createconnection)** * [`address()`](https://nodejs.org/api/net.html#net_socket_address) @@ -229,7 +229,11 @@ Here are listed all methods implemented in `react-native-tcp-socket`, their func * [`setNoDelay([noDelay])`](https://nodejs.org/api/net.html#net_socket_setnodelay_nodelay) * [`setTimeout(timeout[, callback])`](https://nodejs.org/api/net.html#net_socket_settimeout_timeout_callback) * [`write(data[, encoding][, callback])`](https://nodejs.org/api/net.html#net_socket_write_data_encoding_callback) + * `ref()` - _Will not have any effect_ + * `unref()` - _Will not have any effect_ * **Properties:** + * Inherited from [`Stream.Writable`](https://nodejs.org/api/stream.html#stream_class_stream_writable) + * [`writableNeedDrain`](https://nodejs.org/api/stream.html#stream_writable_writableneeddrain) * [`remoteAddress`](https://nodejs.org/api/net.html#net_socket_remoteaddress) * [`remoteFamily`](https://nodejs.org/api/net.html#net_socket_remotefamily) * [`remotePort`](https://nodejs.org/api/net.html#net_socket_remoteport) @@ -239,7 +243,9 @@ Here are listed all methods implemented in `react-native-tcp-socket`, their func * [`'close'`](https://nodejs.org/api/net.html#net_event_close_1) * [`'connect'`](https://nodejs.org/api/net.html#net_event_connect) * [`'data'`](https://nodejs.org/api/net.html#net_event_data) + * [`'drain'`](https://nodejs.org/api/net.html#net_event_drain) * [`'error'`](https://nodejs.org/api/net.html#net_event_error_1) + * [`'timeout'`](https://nodejs.org/api/net.html#net_event_timeout) #### `createConnection()` `createConnection(options[, callback])` creates a TCP connection using the given [`options`](#createconnection-options). From 9dc0772a4b2eb40aa2ff8abc5f75f90a02ae9302 Mon Sep 17 00:00:00 2001 From: Rapsssito Date: Wed, 25 Aug 2021 10:27:24 +0200 Subject: [PATCH 10/15] Switch to Java Runnable --- .../react/tcpsocket/TcpReceiverTask.java | 26 ++++++------- .../react/tcpsocket/TcpSocketClient.java | 39 ++++++++++--------- .../react/tcpsocket/TcpSocketModule.java | 14 +++++++ 3 files changed, 48 insertions(+), 31 deletions(-) diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpReceiverTask.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpReceiverTask.java index 7bdee33..dd12a3e 100644 --- a/android/src/main/java/com/asterinet/react/tcpsocket/TcpReceiverTask.java +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpReceiverTask.java @@ -9,29 +9,31 @@ import java.net.Socket; /** - * This is a specialized AsyncTask that receives data from a socket in the background, and + * This is a specialized Runnable that receives data from a socket in the background, and * notifies it's listener when data is received. This is not threadsafe, the listener * should handle synchronicity. */ -class TcpReceiverTask extends AsyncTask, Void, Void> { +public class TcpReceiverTask implements Runnable { + + private final TcpSocketClient clientSocket; + private final TcpEventListener receiverListener; + + public TcpReceiverTask(TcpSocketClient clientSocket, TcpEventListener receiverListener) { + this.clientSocket = clientSocket; + this.receiverListener = receiverListener; + } + /** * An infinite loop to block and read data from the socket. */ - @SafeVarargs @Override - protected final Void doInBackground(Pair... params) { - if (params.length > 1) { - throw new IllegalArgumentException("This task is only for a single socket/listener pair."); - } - - TcpSocketClient clientSocket = params[0].first; - TcpEventListener receiverListener = params[0].second; + public void run() { int socketId = clientSocket.getId(); Socket socket = clientSocket.getSocket(); byte[] buffer = new byte[8192]; try { BufferedInputStream in = new BufferedInputStream(socket.getInputStream()); - while (!isCancelled() && !socket.isClosed()) { + while (!socket.isClosed()) { int bufferCount = in.read(buffer); if (bufferCount > 0) { receiverListener.onData(socketId, Arrays.copyOfRange(buffer, 0, bufferCount)); @@ -43,8 +45,6 @@ protected final Void doInBackground(Pair... p if (receiverListener != null && !socket.isClosed()) { receiverListener.onError(socketId, ioe.getMessage()); } - this.cancel(false); } - return null; } } \ No newline at end of file diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketClient.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketClient.java index b04d999..bcbfe88 100644 --- a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketClient.java +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketClient.java @@ -2,7 +2,6 @@ import android.content.Context; import android.net.Network; -import android.util.Pair; import com.facebook.react.bridge.ReadableMap; @@ -14,6 +13,7 @@ import java.security.GeneralSecurityException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import javax.net.SocketFactory; import javax.net.ssl.SSLSocket; @@ -24,20 +24,15 @@ class TcpSocketClient extends TcpSocket { private final ExecutorService executorService; - private final TcpReceiverTask receiverTask; + private final TcpEventListener receiverListener; + private Future receiverFuture; private Socket socket; - private final TcpEventListener mReceiverListener; TcpSocketClient(@NonNull final TcpEventListener receiverListener, @NonNull final Integer id, @Nullable final Socket socket) { super(id); - this.executorService = Executors.newFixedThreadPool(1); + executorService = Executors.newFixedThreadPool(1); this.socket = socket; - receiverTask = new TcpReceiverTask(); - mReceiverListener = receiverListener; - } - - ExecutorService getExecutorService() { - return this.executorService; + this.receiverListener = receiverListener; } public Socket getSocket() { @@ -83,10 +78,9 @@ public void connect(@NonNull final Context context, @NonNull final String addres startListening(); } - @SuppressWarnings("WeakerAccess") public void startListening() { - //noinspection unchecked - receiverTask.executeOnExecutor(getExecutorService(), new Pair<>(this, mReceiverListener)); + Runnable receiverTask = new TcpReceiverTask(this, receiverListener); + receiverFuture = executorService.submit(receiverTask); } /** @@ -107,19 +101,20 @@ public void write(final byte[] data) throws IOException { */ public void destroy() { try { - if (receiverTask != null && !receiverTask.isCancelled()) { + if (receiverFuture != null && !receiverFuture.isCancelled()) { // stop the receiving task - receiverTask.cancel(true); - getExecutorService().shutdown(); + receiverFuture.cancel(true); + executorService.shutdown(); + receiverFuture = null; } // close the socket if (socket != null && !socket.isClosed()) { socket.close(); - mReceiverListener.onClose(getId(), null); + receiverListener.onClose(getId(), null); socket = null; } } catch (IOException e) { - mReceiverListener.onClose(getId(), e.getMessage()); + receiverListener.onClose(getId(), e.getMessage()); } } @@ -143,4 +138,12 @@ public void setKeepAlive(final boolean enable, final int initialDelay) throws IO // `initialDelay` is ignored socket.setKeepAlive(enable); } + + public void pause() { + + } + + public void resume() { + + } } diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java index 64dbc90..53913d1 100644 --- a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java @@ -179,6 +179,20 @@ public void setKeepAlive(@NonNull final Integer cId, final boolean enable, final } } + @SuppressWarnings("unused") + @ReactMethod + public void pause(final int cId) { + TcpSocketClient client = getTcpClient(cId); + client.pause(); + } + + @SuppressWarnings("unused") + @ReactMethod + public void resume(final int cId) { + TcpSocketClient client = getTcpClient(cId); + client.resume(); + } + private void requestNetwork(final int transportType) throws InterruptedException { final NetworkRequest.Builder requestBuilder = new NetworkRequest.Builder(); requestBuilder.addTransportType(transportType); From 231eac5a8e0b3ac7986cafb27d5ad160bbeaadf0 Mon Sep 17 00:00:00 2001 From: Rapsssito Date: Fri, 27 Aug 2021 11:04:28 +0200 Subject: [PATCH 11/15] Fix event handling --- .../react/tcpsocket/TcpReceiverTask.java | 26 +++-- .../react/tcpsocket/TcpSocketClient.java | 43 ++++---- .../react/tcpsocket/TcpSocketModule.java | 32 +++--- examples/tcpsockets/examples/drain-event.js | 73 +++++++------- examples/tcpsockets/examples/echo.js | 35 +++---- src/Socket.js | 98 +++++++++++++++++-- 6 files changed, 197 insertions(+), 110 deletions(-) diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpReceiverTask.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpReceiverTask.java index dd12a3e..2954407 100644 --- a/android/src/main/java/com/asterinet/react/tcpsocket/TcpReceiverTask.java +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpReceiverTask.java @@ -1,12 +1,9 @@ package com.asterinet.react.tcpsocket; -import android.os.AsyncTask; -import android.util.Pair; - import java.io.BufferedInputStream; import java.io.IOException; -import java.util.Arrays; import java.net.Socket; +import java.util.Arrays; /** * This is a specialized Runnable that receives data from a socket in the background, and @@ -17,6 +14,7 @@ public class TcpReceiverTask implements Runnable { private final TcpSocketClient clientSocket; private final TcpEventListener receiverListener; + private boolean paused = false; public TcpReceiverTask(TcpSocketClient clientSocket, TcpEventListener receiverListener) { this.clientSocket = clientSocket; @@ -30,21 +28,37 @@ public TcpReceiverTask(TcpSocketClient clientSocket, TcpEventListener receiverLi public void run() { int socketId = clientSocket.getId(); Socket socket = clientSocket.getSocket(); - byte[] buffer = new byte[8192]; + byte[] buffer = new byte[16384]; try { BufferedInputStream in = new BufferedInputStream(socket.getInputStream()); while (!socket.isClosed()) { int bufferCount = in.read(buffer); + waitIfPaused(); if (bufferCount > 0) { receiverListener.onData(socketId, Arrays.copyOfRange(buffer, 0, bufferCount)); } else if (bufferCount == -1) { clientSocket.destroy(); } } - } catch (IOException ioe) { + } catch (IOException | InterruptedException ioe) { if (receiverListener != null && !socket.isClosed()) { receiverListener.onError(socketId, ioe.getMessage()); } } } + + public synchronized void pause() { + paused = true; + } + + public synchronized void resume() { + paused = false; + notify(); + } + + private synchronized void waitIfPaused() throws InterruptedException { + while (paused) { + wait(); + } + } } \ No newline at end of file diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketClient.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketClient.java index bcbfe88..d0c86f0 100644 --- a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketClient.java +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketClient.java @@ -6,14 +6,12 @@ import com.facebook.react.bridge.ReadableMap; import java.io.IOException; -import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.security.GeneralSecurityException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import javax.net.SocketFactory; import javax.net.ssl.SSLSocket; @@ -23,14 +21,17 @@ import androidx.annotation.Nullable; class TcpSocketClient extends TcpSocket { - private final ExecutorService executorService; + private final ExecutorService listenExecutor; + private final ExecutorService writeExecutor; private final TcpEventListener receiverListener; - private Future receiverFuture; + private final TcpReceiverTask receiverTask; private Socket socket; TcpSocketClient(@NonNull final TcpEventListener receiverListener, @NonNull final Integer id, @Nullable final Socket socket) { super(id); - executorService = Executors.newFixedThreadPool(1); + listenExecutor = Executors.newSingleThreadExecutor(); + writeExecutor = Executors.newSingleThreadExecutor(); + receiverTask = new TcpReceiverTask(this, receiverListener); this.socket = socket; this.receiverListener = receiverListener; } @@ -79,8 +80,7 @@ public void connect(@NonNull final Context context, @NonNull final String addres } public void startListening() { - Runnable receiverTask = new TcpReceiverTask(this, receiverListener); - receiverFuture = executorService.submit(receiverTask); + listenExecutor.execute(receiverTask); } /** @@ -88,12 +88,19 @@ public void startListening() { * * @param data data to be sent */ - public void write(final byte[] data) throws IOException { - if (socket == null) { - throw new IOException("Socket is not connected."); - } - OutputStream output = socket.getOutputStream(); - output.write(data); + public void write(final int msgId, final byte[] data) { + writeExecutor.execute(new Runnable() { + @Override + public void run() { + try { + socket.getOutputStream().write(data); + receiverListener.onWritten(getId(), msgId, null); + } catch (IOException e) { + receiverListener.onWritten(getId(), msgId, e.toString()); + receiverListener.onError(getId(), e.toString()); + } + } + }); } /** @@ -101,12 +108,6 @@ public void write(final byte[] data) throws IOException { */ public void destroy() { try { - if (receiverFuture != null && !receiverFuture.isCancelled()) { - // stop the receiving task - receiverFuture.cancel(true); - executorService.shutdown(); - receiverFuture = null; - } // close the socket if (socket != null && !socket.isClosed()) { socket.close(); @@ -140,10 +141,10 @@ public void setKeepAlive(final boolean enable, final int initialDelay) throws IO } public void pause() { - + receiverTask.pause(); } public void resume() { - + receiverTask.resume(); } } diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java index 53913d1..54b022c 100644 --- a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java @@ -64,7 +64,7 @@ String getName() { @SuppressWarnings("unused") @ReactMethod public void connect(@NonNull final Integer cId, @NonNull final String host, @NonNull final Integer port, @NonNull final ReadableMap options) { - executorService.execute(new Thread(new Runnable() { + executorService.execute(new Runnable() { @Override public void run() { if (socketMap.get(cId) != null) { @@ -84,40 +84,30 @@ public void run() { tcpEvtListener.onError(cId, e.getMessage()); } } - })); + }); } @SuppressLint("StaticFieldLeak") @SuppressWarnings("unused") @ReactMethod public void write(final int cId, @NonNull final String base64String, final int msgId) { - executorService.execute(new Thread(new Runnable() { - @Override - public void run() { - TcpSocketClient socketClient = getTcpClient(cId); - try { - socketClient.write(Base64.decode(base64String, Base64.NO_WRAP)); - tcpEvtListener.onWritten(cId, msgId, null); - } catch (IOException e) { - tcpEvtListener.onWritten(cId, msgId, e.toString()); - tcpEvtListener.onError(cId, e.toString()); - } - } - })); + TcpSocketClient socketClient = getTcpClient(cId); + byte[] data = Base64.decode(base64String, Base64.NO_WRAP); + socketClient.write(msgId, data); } @SuppressLint("StaticFieldLeak") @SuppressWarnings("unused") @ReactMethod public void end(final Integer cId) { - executorService.execute(new Thread(new Runnable() { + executorService.execute(new Runnable() { @Override public void run() { TcpSocketClient socketClient = getTcpClient(cId); socketClient.destroy(); socketMap.remove(cId); } - })); + }); } @SuppressWarnings("unused") @@ -129,21 +119,21 @@ public void destroy(final Integer cId) { @SuppressWarnings("unused") @ReactMethod public void close(final Integer cId) { - executorService.execute(new Thread(new Runnable() { + executorService.execute(new Runnable() { @Override public void run() { TcpSocketServer socketServer = getTcpServer(cId); socketServer.close(); socketMap.remove(cId); } - })); + }); } @SuppressLint("StaticFieldLeak") @SuppressWarnings("unused") @ReactMethod public void listen(final Integer cId, final ReadableMap options) { - executorService.execute(new Thread(new Runnable() { + executorService.execute(new Runnable() { @Override public void run() { try { @@ -154,7 +144,7 @@ public void run() { tcpEvtListener.onError(cId, uhe.getMessage()); } } - })); + }); } @SuppressWarnings("unused") diff --git a/examples/tcpsockets/examples/drain-event.js b/examples/tcpsockets/examples/drain-event.js index d3a47c0..45f832b 100644 --- a/examples/tcpsockets/examples/drain-event.js +++ b/examples/tcpsockets/examples/drain-event.js @@ -1,49 +1,50 @@ const net = require('net'); -const PORT = Number(9 + (Math.random() * 999).toFixed(0)); const server = new net.Server(); const client = new net.Socket(); function init() { - server.listen({ port: PORT, host: '127.0.0.1', reuseAddress: true }); - - client.connect( - { - port: PORT, - host: '127.0.0.1', - localAddress: '127.0.0.1', - reuseAddress: true, - // localPort: 20000, - // interface: "wifi", - // tls: true - }, - () => { - let i = 0; - const MAX_ITER = 1000000; - write(); - function write() { - let ok = true; - do { - i++; - if (i === 0) { - // Last time! - client.write(''+i+','); + server.listen({ port: 0, host: '127.0.0.1', reuseAddress: true }, () => { + const port = server.address()?.port; + if (!port) throw new Error('Server port not found'); + client.connect( + { + port: port, + host: '127.0.0.1', + localAddress: '127.0.0.1', + reuseAddress: true, + // localPort: 20000, + // interface: "wifi", + // tls: true + }, + () => { + let i = 0; + const MAX_ITER = 1000000; + write(); + function write() { + let ok = true; + do { + i++; + if (i === 0) { + // Last time! + client.write('' + i + ','); + } else { + // See if we should continue, or wait. + // Don't pass the callback, because we're not done yet. + ok = client.write('' + i + ','); + } + } while (i < MAX_ITER && ok); + if (!ok) { + // Had to stop early! + // Write some more once it drains. + client.once('drain', write); } else { - // See if we should continue, or wait. - // Don't pass the callback, because we're not done yet. - ok = client.write(''+i+','); + client.destroy(); } - } while (i < MAX_ITER && ok); - if (!ok) { - // Had to stop early! - // Write some more once it drains. - client.once('drain', write); - } else { - client.destroy(); } } - } - ); + ); + }); } module.exports = { init, server, client }; diff --git a/examples/tcpsockets/examples/echo.js b/examples/tcpsockets/examples/echo.js index 56868ed..09c8671 100644 --- a/examples/tcpsockets/examples/echo.js +++ b/examples/tcpsockets/examples/echo.js @@ -1,5 +1,4 @@ const net = require('net'); -const PORT = Number(9 + (Math.random() * 999).toFixed(0)); const server = new net.Server(); const client = new net.Socket(); @@ -9,22 +8,24 @@ function init() { socket.write('Echo server\r\n'); }); - server.listen({ port: PORT, host: '127.0.0.1', reuseAddress: true }); - - client.connect( - { - port: PORT, - host: '127.0.0.1', - localAddress: '127.0.0.1', - reuseAddress: true, - // localPort: 20000, - // interface: "wifi", - // tls: true - }, - () => { - client.write('Hello, server! Love, Client.'); - } - ); + server.listen({ port: 0, host: '127.0.0.1', reuseAddress: true }, () => { + const port = server.address()?.port; + if (!port) throw new Error('Server port not found'); + client.connect( + { + port: port, + host: '127.0.0.1', + localAddress: '127.0.0.1', + reuseAddress: true, + // localPort: 20000, + // interface: "wifi", + // tls: true + }, + () => { + client.write('Hello, server! Love, Client.'); + } + ); + }); client.on('data', () => { client.destroy(); // kill client after server's response diff --git a/src/Socket.js b/src/Socket.js index 5600987..436a2ca 100644 --- a/src/Socket.js +++ b/src/Socket.js @@ -34,6 +34,10 @@ const STATE = { * tlsCert?: any, * }} ConnectionOptions * + * @typedef {object} ReadableEvents + * @property {() => void} pause + * @property {() => void} resume + * * @typedef {object} SocketEvents * @property {(had_error: boolean) => void} close * @property {() => void} connect @@ -42,7 +46,7 @@ const STATE = { * @property {(err: Error) => void} error * @property {() => void} timeout * - * @extends {EventEmitter} + * @extends {EventEmitter} */ export default class Socket extends EventEmitter { /** @@ -70,7 +74,18 @@ export default class Socket extends EventEmitter { this._lastRcvMsgId = Number.MAX_SAFE_INTEGER - 1; /** @private */ this._lastSentMsgId = 0; + /** @private */ + this._paused = false; + /** @private */ + this._resuming = false; + /** @private */ + this._writeBufferSize = 0; + /** @type {{ id: number; data: string; }[]} @private */ + this._pausedDataEvents = []; + this.readableHighWaterMark = 16384; + this.writableHighWaterMark = 16384; this.writableNeedDrain = false; + this.bytesSent = 0; this.localAddress = undefined; this.localPort = undefined; this.remoteAddress = undefined; @@ -279,18 +294,22 @@ export default class Socket extends EventEmitter { * @param {string | Buffer | Uint8Array} buffer * @param {BufferEncoding} [encoding] * @param {(err?: Error) => void} [cb] + * + * @return {boolean} */ write(buffer, encoding, cb) { const self = this; if (this._state === STATE.DISCONNECTED) throw new Error('Socket is not connected.'); const generatedBuffer = this._generateSendBuffer(buffer, encoding); + this._writeBufferSize += generatedBuffer.byteLength; const currentMsgId = this._msgId; this._msgId = (this._msgId + 1) % Number.MAX_SAFE_INTEGER; const msgEvtHandler = (/** @type {{id: number, msgId: number, err?: string}} */ evt) => { const { msgId, err } = evt; if (msgId === currentMsgId) { this._msgEvtEmitter.removeListener('written', msgEvtHandler); + this._writeBufferSize -= generatedBuffer.byteLength; this._lastRcvMsgId = msgId; if (self._timeout) self._activateTimer(); if (this.writableNeedDrain && this._lastSentMsgId == msgId) { @@ -305,32 +324,93 @@ export default class Socket extends EventEmitter { }; // Callback equivalent with better performance this._msgEvtEmitter.on('written', msgEvtHandler, this); - const ok = (this._lastRcvMsgId + 1) % Number.MAX_SAFE_INTEGER == currentMsgId; + const ok = this._writeBufferSize < this.writableHighWaterMark; if (!ok) this.writableNeedDrain = true; this._lastSentMsgId = currentMsgId; Sockets.write(this._id, generatedBuffer.toString('base64'), currentMsgId); return ok; } + pause() { + this._paused = true; + Sockets.pause(this._id); + this.emit('pause'); + } + + resume() { + this._paused = false; + this.emit('resume'); + this._recoverDataEventsAfterPause(); + } + ref() { - console.warn('react-native-tcp-socket: TcpSocket.ref() method will have no effect.'); + console.warn('react-native-tcp-socket: Socket.ref() method will have no effect.'); } unref() { - console.warn('react-native-tcp-socket: TcpSocket.unref() method will have no effect.'); + console.warn('react-native-tcp-socket: Socket.unref() method will have no effect.'); } /** * @private */ - _registerEvents() { - this._unregisterEvents(); - this._dataListener = this._eventEmitter.addListener('data', (evt) => { - if (evt.id !== this._id) return; + async _recoverDataEventsAfterPause() { + if (this._resuming) return; + this._resuming = true; + while (this._pausedDataEvents.length > 0) { + // Concat all buffered events for better performance + const buffArray = []; + let readBytes = 0; + let i = 0; + for (; i < this._pausedDataEvents.length; i++) { + const evtData = Buffer.from(this._pausedDataEvents[i].data, 'base64'); + readBytes += evtData.byteLength; + if (readBytes <= this.readableHighWaterMark) { + buffArray.push(evtData); + } else { + const buffOffset = this.readableHighWaterMark - readBytes; + this._pausedDataEvents[i].data = evtData.slice(buffOffset).toString('base64'); + break; + } + } + // Generate new event with the concatenated events + const evt = { + id: this._pausedDataEvents[0].id, + data: Buffer.concat(buffArray).toString('base64'), + }; + // Clean the old events + this._pausedDataEvents = this._pausedDataEvents.slice(i); + this._onDeviceDataEvt(evt); + if (this._paused) { + this._resuming = false; + return; + } + } + this._resuming = false; + Sockets.resume(this._id); + } + + /** + * @private + */ + _onDeviceDataEvt = (/** @type {{ id: number; data: string; }} */ evt) => { + if (evt.id !== this._id) return; + if (!this._paused) { const bufferTest = Buffer.from(evt.data, 'base64'); const finalData = this._encoding ? bufferTest.toString(this._encoding) : bufferTest; this.emit('data', finalData); - }); + } else { + // If the socket is paused, save the data events for later + this._pausedDataEvents.push(evt); + } + }; + + /** + * @private + */ + _registerEvents() { + this._unregisterEvents(); + this._dataListener = this._eventEmitter.addListener('data', this._onDeviceDataEvt); this._errorListener = this._eventEmitter.addListener('error', (evt) => { if (evt.id !== this._id) return; this.destroy(); From d3efe002d3514c3849eb6ae865757d0c7cf8f62e Mon Sep 17 00:00:00 2001 From: Rapsssito Date: Fri, 27 Aug 2021 12:26:50 +0200 Subject: [PATCH 12/15] Add iOS compatibility --- examples/tcpsockets/App.js | 2 +- examples/tcpsockets/examples/drain-event.js | 50 ---------------- examples/tcpsockets/examples/main.js | 2 +- examples/tcpsockets/examples/pause-resume.js | 60 ++++++++++++++++++++ ios/TcpSocketClient.h | 4 ++ ios/TcpSocketClient.m | 21 +++++-- ios/TcpSockets.m | 14 +++++ src/Socket.js | 4 ++ 8 files changed, 100 insertions(+), 57 deletions(-) delete mode 100644 examples/tcpsockets/examples/drain-event.js create mode 100644 examples/tcpsockets/examples/pause-resume.js diff --git a/examples/tcpsockets/App.js b/examples/tcpsockets/App.js index 344fb1d..8c6774d 100644 --- a/examples/tcpsockets/App.js +++ b/examples/tcpsockets/App.js @@ -7,7 +7,7 @@ import React from 'react'; import { ScrollView, StyleSheet, Text, View } from 'react-native'; -import { init, server, client } from './examples/drain-event'; +import { init, server, client } from './examples/pause-resume'; class App extends React.Component { /** diff --git a/examples/tcpsockets/examples/drain-event.js b/examples/tcpsockets/examples/drain-event.js deleted file mode 100644 index 45f832b..0000000 --- a/examples/tcpsockets/examples/drain-event.js +++ /dev/null @@ -1,50 +0,0 @@ -const net = require('net'); - -const server = new net.Server(); -const client = new net.Socket(); - -function init() { - server.listen({ port: 0, host: '127.0.0.1', reuseAddress: true }, () => { - const port = server.address()?.port; - if (!port) throw new Error('Server port not found'); - client.connect( - { - port: port, - host: '127.0.0.1', - localAddress: '127.0.0.1', - reuseAddress: true, - // localPort: 20000, - // interface: "wifi", - // tls: true - }, - () => { - let i = 0; - const MAX_ITER = 1000000; - write(); - function write() { - let ok = true; - do { - i++; - if (i === 0) { - // Last time! - client.write('' + i + ','); - } else { - // See if we should continue, or wait. - // Don't pass the callback, because we're not done yet. - ok = client.write('' + i + ','); - } - } while (i < MAX_ITER && ok); - if (!ok) { - // Had to stop early! - // Write some more once it drains. - client.once('drain', write); - } else { - client.destroy(); - } - } - } - ); - }); -} - -module.exports = { init, server, client }; diff --git a/examples/tcpsockets/examples/main.js b/examples/tcpsockets/examples/main.js index 3dca5b1..0322b48 100644 --- a/examples/tcpsockets/examples/main.js +++ b/examples/tcpsockets/examples/main.js @@ -1,5 +1,5 @@ // Execute this file using NodeJS -const { init, server, client } = require('./drain-event'); +const { init, server, client } = require('./pause-resume'); server.on('connection', (socket) => { console.log('Client connected to server on ' + JSON.stringify(socket.address())); diff --git a/examples/tcpsockets/examples/pause-resume.js b/examples/tcpsockets/examples/pause-resume.js new file mode 100644 index 0000000..ea0b665 --- /dev/null +++ b/examples/tcpsockets/examples/pause-resume.js @@ -0,0 +1,60 @@ +const net = require('net'); + +const server = new net.Server(); +const client = new net.Socket(); + +function init() { + server.on('connection', (socket) => { + socket.on('data', (chunk) => { + console.log(`Received ${chunk.length} bytes of data.`); + console.log('Server client chunk start: ' + chunk.slice(0, 30)); + console.log('Server client chunk end: ' + chunk.slice(chunk.length - 30, chunk.length)); + socket.pause(); + console.log('There will be no additional data for 1 second.'); + setTimeout(() => { + console.log('Now data will start flowing again.'); + socket.resume(); + }, 1000); + }); + }); + + server.listen({ port: 0, host: '127.0.0.1', reuseAddress: true }, () => { + const port = server.address()?.port; + if (!port) throw new Error('Server port not found'); + client.connect( + { + port: port, + host: '127.0.0.1', + localAddress: '127.0.0.1', + reuseAddress: true, + // localPort: 20000, + // interface: "wifi", + // tls: true + }, + () => { + let i = 0; + const MAX_ITER = 300000; + write(); + async function write() { + let ok = true; + while (i < MAX_ITER && ok) { + i++; + const buff = ' ->' + i + '<- '; + ok = client.write(buff); + // await new Promise((resolve) => setTimeout(resolve, 50)); + // console.log('Bytes sent', ok, buff, client.bytesSent); + } + if (i >= MAX_ITER) { + client.destroy(); + } else if (!ok) { + // Had to stop early! + // Write some more once it drains. + client.once('drain', write); + } + } + } + ); + }); +} + +module.exports = { init, server, client }; diff --git a/ios/TcpSocketClient.h b/ios/TcpSocketClient.h index fa61915..109fcb2 100644 --- a/ios/TcpSocketClient.h +++ b/ios/TcpSocketClient.h @@ -99,4 +99,8 @@ typedef enum RCTTCPError RCTTCPError; - (void)setKeepAlive:(BOOL)enable initialDelay:(int)initialDelay; +- (void) pause; + +- (void) resume; + @end diff --git a/ios/TcpSocketClient.m b/ios/TcpSocketClient.m index 39c48d2..f956f8d 100644 --- a/ios/TcpSocketClient.m +++ b/ios/TcpSocketClient.m @@ -12,6 +12,7 @@ @interface TcpSocketClient() @private BOOL _tls; BOOL _checkValidity; + BOOL _paused; NSString *_certPath; GCDAsyncSocket *_tcpSocket; NSMutableDictionary *_pendingSends; @@ -47,6 +48,7 @@ - (id)initWithClientId:(NSNumber *)clientID andConfig:(id) if (self) { _id = clientID; _clientDelegate = aDelegate; + _paused = false; _pendingSends = [NSMutableDictionary dictionary]; _lock = [[NSLock alloc] init]; _tcpSocket = tcpSocket; @@ -182,7 +184,6 @@ - (BOOL)listen:(NSDictionary *)options error:(NSError **)error BOOL isListening = [_tcpSocket acceptOnInterface:host port:port error:error]; if (isListening == YES) { [_clientDelegate onListen: self]; - [_tcpSocket readDataWithTimeout:-1 tag:_id.longValue]; } return isListening; @@ -237,8 +238,6 @@ - (void) writeData:(NSData *)data msgId:(NSNumber*)msgId [_tcpSocket writeData:data withTimeout:-1 tag:_sendTag]; _sendTag++; - - [_tcpSocket readDataWithTimeout:-1 tag:_id.longValue]; } - (void)end @@ -251,6 +250,17 @@ - (void)destroy [_tcpSocket disconnect]; } +- (void)pause { + _paused = true; +} + +- (void)resume { + if (_paused) { + [_tcpSocket readDataWithTimeout:-1 tag:_id.longValue]; + } + _paused = false; +} + - (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag { if (!_clientDelegate) { RCTLogWarn(@"didReadData with nil clientDelegate for %@", [sock userData]); @@ -258,8 +268,9 @@ - (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)t } [_clientDelegate onData:@(tag) data:data]; - - [sock readDataWithTimeout:-1 tag:tag]; + if (!_paused) { + [sock readDataWithTimeout:-1 tag:tag]; + } } - (void)socket:(GCDAsyncSocket *)sock didAcceptNewSocket:(GCDAsyncSocket *)newSocket diff --git a/ios/TcpSockets.m b/ios/TcpSockets.m index 2c508e0..e21ff0e 100644 --- a/ios/TcpSockets.m +++ b/ios/TcpSockets.m @@ -136,6 +136,20 @@ - (TcpSocketClient *)createSocket:(nonnull NSNumber*)cId [client setKeepAlive:enable initialDelay:initialDelay]; } +RCT_EXPORT_METHOD(pause:(nonnull NSNumber*)cId) { + TcpSocketClient* client = [self findClient:cId]; + if (!client) return; + + [client pause]; +} + +RCT_EXPORT_METHOD(resume:(nonnull NSNumber*)cId) { + TcpSocketClient* client = [self findClient:cId]; + if (!client) return; + + [client resume]; +} + - (void)onWrittenData:(TcpSocketClient*) client msgId:(NSNumber *)msgId { [self sendEventWithName:@"written" body:@{ diff --git a/src/Socket.js b/src/Socket.js index 436a2ca..fb55e0d 100644 --- a/src/Socket.js +++ b/src/Socket.js @@ -358,6 +358,7 @@ export default class Socket extends EventEmitter { if (this._resuming) return; this._resuming = true; while (this._pausedDataEvents.length > 0) { + console.log('Paused events', this._pausedDataEvents.length); // Concat all buffered events for better performance const buffArray = []; let readBytes = 0; @@ -365,10 +366,12 @@ export default class Socket extends EventEmitter { for (; i < this._pausedDataEvents.length; i++) { const evtData = Buffer.from(this._pausedDataEvents[i].data, 'base64'); readBytes += evtData.byteLength; + console.log('Concatting', this.readableHighWaterMark, readBytes); if (readBytes <= this.readableHighWaterMark) { buffArray.push(evtData); } else { const buffOffset = this.readableHighWaterMark - readBytes; + buffArray.push(evtData.slice(0, buffOffset)); this._pausedDataEvents[i].data = evtData.slice(buffOffset).toString('base64'); break; } @@ -380,6 +383,7 @@ export default class Socket extends EventEmitter { }; // Clean the old events this._pausedDataEvents = this._pausedDataEvents.slice(i); + console.log('Unpaused events', this._pausedDataEvents.length); this._onDeviceDataEvt(evt); if (this._paused) { this._resuming = false; From 6f4280e4053acfc082fc269f4a180553a346ba15 Mon Sep 17 00:00:00 2001 From: Rapsssito Date: Fri, 27 Aug 2021 12:30:52 +0200 Subject: [PATCH 13/15] Add documentation --- README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/README.md b/README.md index 86978c7..297ffa8 100644 --- a/README.md +++ b/README.md @@ -229,7 +229,9 @@ Here are listed all methods implemented in `react-native-tcp-socket`, their func * [`setNoDelay([noDelay])`](https://nodejs.org/api/net.html#net_socket_setnodelay_nodelay) * [`setTimeout(timeout[, callback])`](https://nodejs.org/api/net.html#net_socket_settimeout_timeout_callback) * [`write(data[, encoding][, callback])`](https://nodejs.org/api/net.html#net_socket_write_data_encoding_callback) + * [`pause()`](https://nodejs.org/api/net.html#net_socket_pause) * `ref()` - _Will not have any effect_ + * [`resume()`](https://nodejs.org/api/net.html#net_socket_resume) * `unref()` - _Will not have any effect_ * **Properties:** * Inherited from [`Stream.Writable`](https://nodejs.org/api/stream.html#stream_class_stream_writable) @@ -240,6 +242,9 @@ Here are listed all methods implemented in `react-native-tcp-socket`, their func * [`localAddress`](https://nodejs.org/api/net.html#net_socket_localaddress) * [`localPort`](https://nodejs.org/api/net.html#net_socket_localport) * **Events:** + * Inherited from [`Stream.Readable`](https://nodejs.org/api/stream.html#stream_class_stream_readable) + * [`'pause'`](https://nodejs.org/api/stream.html#stream_event_pause) + * [`'resume'`](https://nodejs.org/api/stream.html#stream_event_resume) * [`'close'`](https://nodejs.org/api/net.html#net_event_close_1) * [`'connect'`](https://nodejs.org/api/net.html#net_event_connect) * [`'data'`](https://nodejs.org/api/net.html#net_event_data) From f472b145458eb1040593b33fb26a952c445cba00 Mon Sep 17 00:00:00 2001 From: Rapsssito Date: Fri, 27 Aug 2021 12:48:16 +0200 Subject: [PATCH 14/15] Update documentation --- examples/tcpsockets/App.js | 2 +- examples/tcpsockets/examples/README.md | 8 +++++++- examples/tcpsockets/examples/main.js | 2 +- src/Socket.js | 13 +++++++++---- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/examples/tcpsockets/App.js b/examples/tcpsockets/App.js index 8c6774d..9655462 100644 --- a/examples/tcpsockets/App.js +++ b/examples/tcpsockets/App.js @@ -7,7 +7,7 @@ import React from 'react'; import { ScrollView, StyleSheet, Text, View } from 'react-native'; -import { init, server, client } from './examples/pause-resume'; +import { init, server, client } from './examples/echo'; class App extends React.Component { /** diff --git a/examples/tcpsockets/examples/README.md b/examples/tcpsockets/examples/README.md index e83c0b5..0b6e4ca 100644 --- a/examples/tcpsockets/examples/README.md +++ b/examples/tcpsockets/examples/README.md @@ -8,8 +8,14 @@ Let us know if you find any issues. If you want to contribute or add a new examp ## Table of Contents - [Echo server](#echo-server) +- [Pause/Resume - Backpressure](#pauseresume---backpressure) ### [Echo server](echo.js) -An echo server just reflects a message received from a client to the same client. If we send a message saying "Hello, Server!", we will receive the same message, just like an echo. This example shows some basic TCP server and client interactions. \ No newline at end of file +An echo server just reflects a message received from a client to the same client. If we send a message saying "Hello, Server!", we will receive the same message, just like an echo. This example shows some basic TCP server and client interactions. + +### [Pause/Resume - Backpressure](pause-resume.js) +There is a general problem that occurs during data handling called **backpressure** and describes a buildup of data behind a buffer during data transfer. When the receiving end of the transfer has complex operations, or is slower for whatever reason, there is a tendency for data from the incoming source to accumulate, like a clog. + +To solve this problem, there must be a delegation system in place to ensure a smooth flow of data from one source to another and is often times referred to as flow control. In Node.js, streams have been the adopted solution and `react-native-tcp-socket` mimics the same functionality. If a call to `socket.write(chunk)` returns `false`, the `'drain'` event will be emitted when it is appropriate to resume writing data to the stream. diff --git a/examples/tcpsockets/examples/main.js b/examples/tcpsockets/examples/main.js index 0322b48..595e732 100644 --- a/examples/tcpsockets/examples/main.js +++ b/examples/tcpsockets/examples/main.js @@ -1,5 +1,5 @@ // Execute this file using NodeJS -const { init, server, client } = require('./pause-resume'); +const { init, server, client } = require('./echo'); server.on('connection', (socket) => { console.log('Client connected to server on ' + JSON.stringify(socket.address())); diff --git a/src/Socket.js b/src/Socket.js index fb55e0d..3ea9d43 100644 --- a/src/Socket.js +++ b/src/Socket.js @@ -46,7 +46,7 @@ const STATE = { * @property {(err: Error) => void} error * @property {() => void} timeout * - * @extends {EventEmitter} + * @extends {EventEmitter} */ export default class Socket extends EventEmitter { /** @@ -331,13 +331,21 @@ export default class Socket extends EventEmitter { return ok; } + /** + * Pauses the reading of data. That is, `'data'` events will not be emitted. Useful to throttle back an upload. + */ pause() { + if (this._paused) return; this._paused = true; Sockets.pause(this._id); this.emit('pause'); } + /** + * Resumes reading after a call to `socket.pause()`. + */ resume() { + if (!this._paused) return; this._paused = false; this.emit('resume'); this._recoverDataEventsAfterPause(); @@ -358,7 +366,6 @@ export default class Socket extends EventEmitter { if (this._resuming) return; this._resuming = true; while (this._pausedDataEvents.length > 0) { - console.log('Paused events', this._pausedDataEvents.length); // Concat all buffered events for better performance const buffArray = []; let readBytes = 0; @@ -366,7 +373,6 @@ export default class Socket extends EventEmitter { for (; i < this._pausedDataEvents.length; i++) { const evtData = Buffer.from(this._pausedDataEvents[i].data, 'base64'); readBytes += evtData.byteLength; - console.log('Concatting', this.readableHighWaterMark, readBytes); if (readBytes <= this.readableHighWaterMark) { buffArray.push(evtData); } else { @@ -383,7 +389,6 @@ export default class Socket extends EventEmitter { }; // Clean the old events this._pausedDataEvents = this._pausedDataEvents.slice(i); - console.log('Unpaused events', this._pausedDataEvents.length); this._onDeviceDataEvt(evt); if (this._paused) { this._resuming = false; From bc208007419c46245bb406d67d1120ef9f7c2b2c Mon Sep 17 00:00:00 2001 From: Rapsssito Date: Fri, 27 Aug 2021 12:49:40 +0200 Subject: [PATCH 15/15] Typo doc --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 297ffa8..fe08a26 100644 --- a/README.md +++ b/README.md @@ -234,7 +234,7 @@ Here are listed all methods implemented in `react-native-tcp-socket`, their func * [`resume()`](https://nodejs.org/api/net.html#net_socket_resume) * `unref()` - _Will not have any effect_ * **Properties:** - * Inherited from [`Stream.Writable`](https://nodejs.org/api/stream.html#stream_class_stream_writable) + * Inherited from [`Stream.Writable`](https://nodejs.org/api/stream.html#stream_class_stream_writable): * [`writableNeedDrain`](https://nodejs.org/api/stream.html#stream_writable_writableneeddrain) * [`remoteAddress`](https://nodejs.org/api/net.html#net_socket_remoteaddress) * [`remoteFamily`](https://nodejs.org/api/net.html#net_socket_remotefamily) @@ -242,7 +242,7 @@ Here are listed all methods implemented in `react-native-tcp-socket`, their func * [`localAddress`](https://nodejs.org/api/net.html#net_socket_localaddress) * [`localPort`](https://nodejs.org/api/net.html#net_socket_localport) * **Events:** - * Inherited from [`Stream.Readable`](https://nodejs.org/api/stream.html#stream_class_stream_readable) + * Inherited from [`Stream.Readable`](https://nodejs.org/api/stream.html#stream_class_stream_readable): * [`'pause'`](https://nodejs.org/api/stream.html#stream_event_pause) * [`'resume'`](https://nodejs.org/api/stream.html#stream_event_resume) * [`'close'`](https://nodejs.org/api/net.html#net_event_close_1)