diff --git a/README.md b/README.md index 31c09e6..fe08a26 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ React Native TCP socket API for Android, iOS & macOS with **client SSL/TLS suppo - [Server](#server) - [SSL Client](#ssl-client) - [API](#api) - - [TcpSocket](#tcpsocket) + - [Socket](#socket) - [`createConnection()`](#createconnection) - [Server](#server-1) - [`listen()`](#listen) @@ -218,7 +218,7 @@ _Note: In order to use self-signed certificates make sure to [update your metro. ## API Here are listed all methods implemented in `react-native-tcp-socket`, their functionalities are equivalent to those provided by Node's [net](https://nodejs.org/api/net.html) (more info on [#41](https://github.com/Rapsssito/react-native-tcp-socket/issues/41)). However, the **methods whose interface differs from Node are marked in bold**. -### TcpSocket +### Socket * **Methods:** * **[`TcpSocket.createConnection(options[, callback])`](#createconnection)** * [`address()`](https://nodejs.org/api/net.html#net_socket_address) @@ -229,17 +229,28 @@ Here are listed all methods implemented in `react-native-tcp-socket`, their func * [`setNoDelay([noDelay])`](https://nodejs.org/api/net.html#net_socket_setnodelay_nodelay) * [`setTimeout(timeout[, callback])`](https://nodejs.org/api/net.html#net_socket_settimeout_timeout_callback) * [`write(data[, encoding][, callback])`](https://nodejs.org/api/net.html#net_socket_write_data_encoding_callback) + * [`pause()`](https://nodejs.org/api/net.html#net_socket_pause) + * `ref()` - _Will not have any effect_ + * [`resume()`](https://nodejs.org/api/net.html#net_socket_resume) + * `unref()` - _Will not have any effect_ * **Properties:** + * Inherited from [`Stream.Writable`](https://nodejs.org/api/stream.html#stream_class_stream_writable): + * [`writableNeedDrain`](https://nodejs.org/api/stream.html#stream_writable_writableneeddrain) * [`remoteAddress`](https://nodejs.org/api/net.html#net_socket_remoteaddress) * [`remoteFamily`](https://nodejs.org/api/net.html#net_socket_remotefamily) * [`remotePort`](https://nodejs.org/api/net.html#net_socket_remoteport) * [`localAddress`](https://nodejs.org/api/net.html#net_socket_localaddress) * [`localPort`](https://nodejs.org/api/net.html#net_socket_localport) * **Events:** + * Inherited from [`Stream.Readable`](https://nodejs.org/api/stream.html#stream_class_stream_readable): + * [`'pause'`](https://nodejs.org/api/stream.html#stream_event_pause) + * [`'resume'`](https://nodejs.org/api/stream.html#stream_event_resume) * [`'close'`](https://nodejs.org/api/net.html#net_event_close_1) * [`'connect'`](https://nodejs.org/api/net.html#net_event_connect) * [`'data'`](https://nodejs.org/api/net.html#net_event_data) + * [`'drain'`](https://nodejs.org/api/net.html#net_event_drain) * [`'error'`](https://nodejs.org/api/net.html#net_event_error_1) + * [`'timeout'`](https://nodejs.org/api/net.html#net_event_timeout) #### `createConnection()` `createConnection(options[, callback])` creates a TCP connection using the given [`options`](#createconnection-options). diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java new file mode 100644 index 0000000..dc28a1d --- /dev/null +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpEventListener.java @@ -0,0 +1,117 @@ +package com.asterinet.react.tcpsocket; + +import android.util.Base64; + +import com.facebook.react.bridge.Arguments; +import com.facebook.react.bridge.ReactContext; +import com.facebook.react.bridge.WritableMap; +import com.facebook.react.modules.core.DeviceEventManagerModule; + +import java.net.Inet6Address; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; + +import javax.annotation.Nullable; + +public class TcpEventListener { + + private final DeviceEventManagerModule.RCTDeviceEventEmitter rctEvtEmitter; + + public TcpEventListener(final ReactContext reactContext) { + rctEvtEmitter = reactContext.getJSModule(DeviceEventManagerModule.RCTDeviceEventEmitter.class); + } + + public void onConnection(int serverId, int clientId, Socket socket) { + WritableMap eventParams = Arguments.createMap(); + eventParams.putInt("id", serverId); + + WritableMap infoParams = Arguments.createMap(); + infoParams.putInt("id", clientId); + + WritableMap connectionParams = Arguments.createMap(); + InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); + + connectionParams.putString("localAddress", socket.getLocalAddress().getHostAddress()); + connectionParams.putInt("localPort", socket.getLocalPort()); + connectionParams.putString("remoteAddress", remoteAddress.getAddress().getHostAddress()); + connectionParams.putInt("remotePort", socket.getPort()); + connectionParams.putString("remoteFamily", remoteAddress.getAddress() instanceof Inet6Address ? "IPv6" : "IPv4"); + + infoParams.putMap("connection", connectionParams); + eventParams.putMap("info", infoParams); + + sendEvent("connection", eventParams); + } + + public void onConnect(int id, TcpSocketClient client) { + WritableMap eventParams = Arguments.createMap(); + eventParams.putInt("id", id); + WritableMap connectionParams = Arguments.createMap(); + Socket socket = client.getSocket(); + InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); + + connectionParams.putString("localAddress", socket.getLocalAddress().getHostAddress()); + connectionParams.putInt("localPort", socket.getLocalPort()); + connectionParams.putString("remoteAddress", remoteAddress.getAddress().getHostAddress()); + connectionParams.putInt("remotePort", socket.getPort()); + connectionParams.putString("remoteFamily", remoteAddress.getAddress() instanceof Inet6Address ? "IPv6" : "IPv4"); + eventParams.putMap("connection", connectionParams); + sendEvent("connect", eventParams); + } + + public void onListen(int id, TcpSocketServer server) { + WritableMap eventParams = Arguments.createMap(); + eventParams.putInt("id", id); + WritableMap connectionParams = Arguments.createMap(); + ServerSocket serverSocket = server.getServerSocket(); + InetAddress address = serverSocket.getInetAddress(); + + connectionParams.putString("localAddress", serverSocket.getInetAddress().getHostAddress()); + connectionParams.putInt("localPort", serverSocket.getLocalPort()); + connectionParams.putString("localFamily", address instanceof Inet6Address ? "IPv6" : "IPv4"); + eventParams.putMap("connection", connectionParams); + sendEvent("listening", eventParams); + } + + public void onData(int id, byte[] data) { + WritableMap eventParams = Arguments.createMap(); + eventParams.putInt("id", id); + eventParams.putString("data", Base64.encodeToString(data, Base64.NO_WRAP)); + + sendEvent("data", eventParams); + } + + public void onWritten(int id, int msgId, @Nullable String error) { + WritableMap eventParams = Arguments.createMap(); + eventParams.putInt("id", id); + eventParams.putInt("msgId", msgId); + eventParams.putString("err", error); + + sendEvent("written", eventParams); + } + + public void onClose(int id, String error) { + if (error != null) { + onError(id, error); + } + WritableMap eventParams = Arguments.createMap(); + eventParams.putInt("id", id); + eventParams.putBoolean("hadError", error != null); + + sendEvent("close", eventParams); + } + + public void onError(int id, String error) { + WritableMap eventParams = Arguments.createMap(); + eventParams.putInt("id", id); + eventParams.putString("error", error); + + sendEvent("error", eventParams); + } + + private void sendEvent(String eventName, WritableMap params) { + rctEvtEmitter.emit(eventName, params); + } +} diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpReceiverTask.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpReceiverTask.java index 764c879..2954407 100644 --- a/android/src/main/java/com/asterinet/react/tcpsocket/TcpReceiverTask.java +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpReceiverTask.java @@ -1,70 +1,64 @@ package com.asterinet.react.tcpsocket; -import android.os.AsyncTask; -import android.util.Pair; - import java.io.BufferedInputStream; import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Arrays; import java.net.Socket; +import java.util.Arrays; /** - * This is a specialized AsyncTask that receives data from a socket in the background, and + * This is a specialized Runnable that receives data from a socket in the background, and * notifies it's listener when data is received. This is not threadsafe, the listener * should handle synchronicity. */ -class TcpReceiverTask extends AsyncTask, Void, Void> { +public class TcpReceiverTask implements Runnable { + + private final TcpSocketClient clientSocket; + private final TcpEventListener receiverListener; + private boolean paused = false; + + public TcpReceiverTask(TcpSocketClient clientSocket, TcpEventListener receiverListener) { + this.clientSocket = clientSocket; + this.receiverListener = receiverListener; + } + /** * An infinite loop to block and read data from the socket. */ - @SafeVarargs @Override - protected final Void doInBackground(Pair... params) { - if (params.length > 1) { - throw new IllegalArgumentException("This task is only for a single socket/listener pair."); - } - - TcpSocketClient clientSocket = params[0].first; - OnDataReceivedListener receiverListener = params[0].second; + public void run() { int socketId = clientSocket.getId(); Socket socket = clientSocket.getSocket(); - byte[] buffer = new byte[8192]; - int bufferCount; + byte[] buffer = new byte[16384]; try { BufferedInputStream in = new BufferedInputStream(socket.getInputStream()); - while (!isCancelled() && !socket.isClosed()) { - bufferCount = in.read(buffer); + while (!socket.isClosed()) { + int bufferCount = in.read(buffer); + waitIfPaused(); if (bufferCount > 0) { receiverListener.onData(socketId, Arrays.copyOfRange(buffer, 0, bufferCount)); } else if (bufferCount == -1) { clientSocket.destroy(); } } - } catch (IOException ioe) { + } catch (IOException | InterruptedException ioe) { if (receiverListener != null && !socket.isClosed()) { receiverListener.onError(socketId, ioe.getMessage()); } - this.cancel(false); } - return null; } - /** - * Listener interface for receive events. - */ - @SuppressWarnings("WeakerAccess") - public interface OnDataReceivedListener { - void onConnection(Integer serverId, Integer clientId, Socket socket); - - void onConnect(Integer id, TcpSocketClient client); - - void onListen(Integer id, TcpSocketServer server); - - void onData(Integer id, byte[] data); + public synchronized void pause() { + paused = true; + } - void onClose(Integer id, String error); + public synchronized void resume() { + paused = false; + notify(); + } - void onError(Integer id, String error); + private synchronized void waitIfPaused() throws InterruptedException { + while (paused) { + wait(); + } } } \ No newline at end of file diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketClient.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketClient.java index 81a4e09..d0c86f0 100644 --- a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketClient.java +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketClient.java @@ -2,12 +2,10 @@ import android.content.Context; import android.net.Network; -import android.util.Pair; import com.facebook.react.bridge.ReadableMap; import java.io.IOException; -import java.io.OutputStream; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; @@ -23,21 +21,19 @@ import androidx.annotation.Nullable; class TcpSocketClient extends TcpSocket { - private final ExecutorService executorService; - private TcpReceiverTask receiverTask; + private final ExecutorService listenExecutor; + private final ExecutorService writeExecutor; + private final TcpEventListener receiverListener; + private final TcpReceiverTask receiverTask; private Socket socket; - private TcpReceiverTask.OnDataReceivedListener mReceiverListener; - TcpSocketClient(@NonNull final TcpReceiverTask.OnDataReceivedListener receiverListener, @NonNull final Integer id, @Nullable final Socket socket) { + TcpSocketClient(@NonNull final TcpEventListener receiverListener, @NonNull final Integer id, @Nullable final Socket socket) { super(id); - this.executorService = Executors.newFixedThreadPool(1); + listenExecutor = Executors.newSingleThreadExecutor(); + writeExecutor = Executors.newSingleThreadExecutor(); + receiverTask = new TcpReceiverTask(this, receiverListener); this.socket = socket; - receiverTask = new TcpReceiverTask(); - mReceiverListener = receiverListener; - } - - ExecutorService getExecutorService() { - return this.executorService; + this.receiverListener = receiverListener; } public Socket getSocket() { @@ -83,10 +79,8 @@ public void connect(@NonNull final Context context, @NonNull final String addres startListening(); } - @SuppressWarnings("WeakerAccess") public void startListening() { - //noinspection unchecked - receiverTask.executeOnExecutor(getExecutorService(), new Pair<>(this, mReceiverListener)); + listenExecutor.execute(receiverTask); } /** @@ -94,12 +88,19 @@ public void startListening() { * * @param data data to be sent */ - public void write(final byte[] data) throws IOException { - if (socket == null) { - throw new IOException("Socket is not connected."); - } - OutputStream output = socket.getOutputStream(); - output.write(data); + public void write(final int msgId, final byte[] data) { + writeExecutor.execute(new Runnable() { + @Override + public void run() { + try { + socket.getOutputStream().write(data); + receiverListener.onWritten(getId(), msgId, null); + } catch (IOException e) { + receiverListener.onWritten(getId(), msgId, e.toString()); + receiverListener.onError(getId(), e.toString()); + } + } + }); } /** @@ -107,19 +108,14 @@ public void write(final byte[] data) throws IOException { */ public void destroy() { try { - if (receiverTask != null && !receiverTask.isCancelled()) { - // stop the receiving task - receiverTask.cancel(true); - getExecutorService().shutdown(); - } // close the socket if (socket != null && !socket.isClosed()) { socket.close(); - mReceiverListener.onClose(getId(), null); + receiverListener.onClose(getId(), null); socket = null; } } catch (IOException e) { - mReceiverListener.onClose(getId(), e.getMessage()); + receiverListener.onClose(getId(), e.getMessage()); } } @@ -143,4 +139,12 @@ public void setKeepAlive(final boolean enable, final int initialDelay) throws IO // `initialDelay` is ignored socket.setKeepAlive(enable); } + + public void pause() { + receiverTask.pause(); + } + + public void resume() { + receiverTask.resume(); + } } diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java index 8757888..54b022c 100644 --- a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketModule.java @@ -9,21 +9,12 @@ import android.util.Base64; import android.net.Network; -import com.facebook.react.bridge.Arguments; import com.facebook.react.bridge.ReactApplicationContext; import com.facebook.react.bridge.ReactContextBaseJavaModule; import com.facebook.react.bridge.ReactMethod; import com.facebook.react.bridge.ReadableMap; -import com.facebook.react.bridge.WritableMap; -import com.facebook.react.bridge.Callback; -import com.facebook.react.modules.core.DeviceEventManagerModule; import java.io.IOException; -import java.net.Inet6Address; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.ServerSocket; -import java.net.Socket; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -34,7 +25,7 @@ import androidx.annotation.NonNull; import androidx.annotation.Nullable; -public class TcpSocketModule extends ReactContextBaseJavaModule implements TcpReceiverTask.OnDataReceivedListener { +public class TcpSocketModule extends ReactContextBaseJavaModule { private static final String TAG = "TcpSockets"; private static final int N_THREADS = 2; private final ReactApplicationContext mReactContext; @@ -42,24 +33,25 @@ public class TcpSocketModule extends ReactContextBaseJavaModule implements TcpRe private final ConcurrentHashMap mNetworkMap = new ConcurrentHashMap<>(); private final CurrentNetwork currentNetwork = new CurrentNetwork(); private final ExecutorService executorService = Executors.newFixedThreadPool(N_THREADS); + private TcpEventListener tcpEvtListener; public TcpSocketModule(ReactApplicationContext reactContext) { super(reactContext); mReactContext = reactContext; } + @Override + public void initialize() { + super.initialize(); + tcpEvtListener = new TcpEventListener(mReactContext); + } + @Override public @NonNull String getName() { return TAG; } - private void sendEvent(String eventName, WritableMap params) { - mReactContext - .getJSModule(DeviceEventManagerModule.RCTDeviceEventEmitter.class) - .emit(eventName, params); - } - /** * Creates a TCP Socket and establish a connection with the given host * @@ -72,11 +64,11 @@ private void sendEvent(String eventName, WritableMap params) { @SuppressWarnings("unused") @ReactMethod public void connect(@NonNull final Integer cId, @NonNull final String host, @NonNull final Integer port, @NonNull final ReadableMap options) { - executorService.execute(new Thread(new Runnable() { + executorService.execute(new Runnable() { @Override public void run() { if (socketMap.get(cId) != null) { - onError(cId, TAG + "createSocket called twice with the same id."); + tcpEvtListener.onError(cId, TAG + "createSocket called twice with the same id."); return; } try { @@ -84,52 +76,38 @@ public void run() { final String localAddress = options.hasKey("localAddress") ? options.getString("localAddress") : null; final String iface = options.hasKey("interface") ? options.getString("interface") : null; selectNetwork(iface, localAddress); - TcpSocketClient client = new TcpSocketClient(TcpSocketModule.this, cId, null); + TcpSocketClient client = new TcpSocketClient(tcpEvtListener, cId, null); socketMap.put(cId, client); client.connect(mReactContext, host, port, options, currentNetwork.getNetwork()); - onConnect(cId, client); + tcpEvtListener.onConnect(cId, client); } catch (Exception e) { - onError(cId, e.getMessage()); + tcpEvtListener.onError(cId, e.getMessage()); } } - })); + }); } @SuppressLint("StaticFieldLeak") @SuppressWarnings("unused") @ReactMethod - public void write(@NonNull final Integer cId, @NonNull final String base64String, @Nullable final Callback callback) { - executorService.execute(new Thread(new Runnable() { - @Override - public void run() { - TcpSocketClient socketClient = getTcpClient(cId); - try { - socketClient.write(Base64.decode(base64String, Base64.NO_WRAP)); - if (callback != null) { - callback.invoke(); - } - } catch (IOException e) { - if (callback != null) { - callback.invoke(e.toString()); - } - onError(cId, e.toString()); - } - } - })); + public void write(final int cId, @NonNull final String base64String, final int msgId) { + TcpSocketClient socketClient = getTcpClient(cId); + byte[] data = Base64.decode(base64String, Base64.NO_WRAP); + socketClient.write(msgId, data); } @SuppressLint("StaticFieldLeak") @SuppressWarnings("unused") @ReactMethod public void end(final Integer cId) { - executorService.execute(new Thread(new Runnable() { + executorService.execute(new Runnable() { @Override public void run() { TcpSocketClient socketClient = getTcpClient(cId); socketClient.destroy(); socketMap.remove(cId); } - })); + }); } @SuppressWarnings("unused") @@ -141,32 +119,32 @@ public void destroy(final Integer cId) { @SuppressWarnings("unused") @ReactMethod public void close(final Integer cId) { - executorService.execute(new Thread(new Runnable() { + executorService.execute(new Runnable() { @Override public void run() { TcpSocketServer socketServer = getTcpServer(cId); socketServer.close(); socketMap.remove(cId); } - })); + }); } @SuppressLint("StaticFieldLeak") @SuppressWarnings("unused") @ReactMethod public void listen(final Integer cId, final ReadableMap options) { - executorService.execute(new Thread(new Runnable() { + executorService.execute(new Runnable() { @Override public void run() { try { - TcpSocketServer server = new TcpSocketServer(socketMap, TcpSocketModule.this, cId, options); + TcpSocketServer server = new TcpSocketServer(socketMap, tcpEvtListener, cId, options); socketMap.put(cId, server); - onListen(cId, server); + tcpEvtListener.onListen(cId, server); } catch (Exception uhe) { - onError(cId, uhe.getMessage()); + tcpEvtListener.onError(cId, uhe.getMessage()); } } - })); + }); } @SuppressWarnings("unused") @@ -176,7 +154,7 @@ public void setNoDelay(@NonNull final Integer cId, final boolean noDelay) { try { client.setNoDelay(noDelay); } catch (IOException e) { - onError(cId, e.getMessage()); + tcpEvtListener.onError(cId, e.getMessage()); } } @@ -187,10 +165,24 @@ public void setKeepAlive(@NonNull final Integer cId, final boolean enable, final try { client.setKeepAlive(enable, initialDelay); } catch (IOException e) { - onError(cId, e.getMessage()); + tcpEvtListener.onError(cId, e.getMessage()); } } + @SuppressWarnings("unused") + @ReactMethod + public void pause(final int cId) { + TcpSocketClient client = getTcpClient(cId); + client.pause(); + } + + @SuppressWarnings("unused") + @ReactMethod + public void resume(final int cId) { + TcpSocketClient client = getTcpClient(cId); + client.resume(); + } + private void requestNetwork(final int transportType) throws InterruptedException { final NetworkRequest.Builder requestBuilder = new NetworkRequest.Builder(); requestBuilder.addTransportType(transportType); @@ -253,93 +245,6 @@ private void selectNetwork(@Nullable final String iface, @Nullable final String mNetworkMap.put(iface + ipAddress, currentNetwork.getNetwork()); } - // TcpReceiverTask.OnDataReceivedListener - - @Override - public void onConnect(Integer id, TcpSocketClient client) { - WritableMap eventParams = Arguments.createMap(); - eventParams.putInt("id", id); - WritableMap connectionParams = Arguments.createMap(); - final Socket socket = client.getSocket(); - final InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); - - connectionParams.putString("localAddress", socket.getLocalAddress().getHostAddress()); - connectionParams.putInt("localPort", socket.getLocalPort()); - connectionParams.putString("remoteAddress", remoteAddress.getAddress().getHostAddress()); - connectionParams.putInt("remotePort", socket.getPort()); - connectionParams.putString("remoteFamily", remoteAddress.getAddress() instanceof Inet6Address ? "IPv6" : "IPv4"); - eventParams.putMap("connection", connectionParams); - sendEvent("connect", eventParams); - } - - @Override - public void onListen(Integer id, TcpSocketServer server) { - WritableMap eventParams = Arguments.createMap(); - eventParams.putInt("id", id); - WritableMap connectionParams = Arguments.createMap(); - final ServerSocket serverSocket = server.getServerSocket(); - final InetAddress address = serverSocket.getInetAddress(); - - connectionParams.putString("localAddress", serverSocket.getInetAddress().getHostAddress()); - connectionParams.putInt("localPort", serverSocket.getLocalPort()); - connectionParams.putString("localFamily", address instanceof Inet6Address ? "IPv6" : "IPv4"); - eventParams.putMap("connection", connectionParams); - sendEvent("listening", eventParams); - } - - @Override - public void onData(Integer id, byte[] data) { - WritableMap eventParams = Arguments.createMap(); - eventParams.putInt("id", id); - eventParams.putString("data", Base64.encodeToString(data, Base64.NO_WRAP)); - - sendEvent("data", eventParams); - } - - @Override - public void onClose(Integer id, String error) { - if (error != null) { - onError(id, error); - } - WritableMap eventParams = Arguments.createMap(); - eventParams.putInt("id", id); - eventParams.putBoolean("hadError", error != null); - - sendEvent("close", eventParams); - } - - @Override - public void onError(Integer id, String error) { - WritableMap eventParams = Arguments.createMap(); - eventParams.putInt("id", id); - eventParams.putString("error", error); - - sendEvent("error", eventParams); - } - - @Override - public void onConnection(Integer serverId, Integer clientId, Socket socket) { - WritableMap eventParams = Arguments.createMap(); - eventParams.putInt("id", serverId); - - WritableMap infoParams = Arguments.createMap(); - infoParams.putInt("id", clientId); - - WritableMap connectionParams = Arguments.createMap(); - final InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress(); - - connectionParams.putString("localAddress", socket.getLocalAddress().getHostAddress()); - connectionParams.putInt("localPort", socket.getLocalPort()); - connectionParams.putString("remoteAddress", remoteAddress.getAddress().getHostAddress()); - connectionParams.putInt("remotePort", socket.getPort()); - connectionParams.putString("remoteFamily", remoteAddress.getAddress() instanceof Inet6Address ? "IPv6" : "IPv4"); - - infoParams.putMap("connection", connectionParams); - eventParams.putMap("info", infoParams); - - sendEvent("connection", eventParams); - } - private TcpSocketClient getTcpClient(final int id) { TcpSocket socket = socketMap.get(id); if (socket == null) { diff --git a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketServer.java b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketServer.java index 5e955a7..9793d0b 100644 --- a/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketServer.java +++ b/android/src/main/java/com/asterinet/react/tcpsocket/TcpSocketServer.java @@ -7,7 +7,6 @@ import java.io.IOException; import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.concurrent.ConcurrentHashMap; @@ -16,7 +15,7 @@ public final class TcpSocketServer extends TcpSocket { private ServerSocket serverSocket; - private TcpReceiverTask.OnDataReceivedListener mReceiverListener; + private final TcpEventListener mReceiverListener; private int clientSocketIds; private final ExecutorService executorService; private final ConcurrentHashMap socketClients; @@ -44,7 +43,7 @@ protected Void doInBackground(Object[] objects) { }; - public TcpSocketServer(final ConcurrentHashMap socketClients, final TcpReceiverTask.OnDataReceivedListener receiverListener, final Integer id, + public TcpSocketServer(final ConcurrentHashMap socketClients, final TcpEventListener receiverListener, final Integer id, final ReadableMap options) throws IOException { super(id); this.executorService = Executors.newFixedThreadPool(1); @@ -106,8 +105,4 @@ public void close() { mReceiverListener.onClose(getId(), e.getMessage()); } } - - public int getListeningPort() { - return (serverSocket == null) ? -1 : serverSocket.getLocalPort(); - } } diff --git a/examples/tcpsockets/App.js b/examples/tcpsockets/App.js index f5aa808..9655462 100644 --- a/examples/tcpsockets/App.js +++ b/examples/tcpsockets/App.js @@ -60,6 +60,10 @@ class App extends React.Component { this.updateChatter('Opened client on ' + JSON.stringify(client.address())); }); + client.on('drain', () => { + this.updateChatter('Client drained'); + }); + client.on('data', (data) => { this.updateChatter('Client received: ' + data); }); diff --git a/examples/tcpsockets/android/gradlew b/examples/tcpsockets/android/gradlew old mode 100644 new mode 100755 diff --git a/examples/tcpsockets/android/gradlew.bat b/examples/tcpsockets/android/gradlew.bat old mode 100644 new mode 100755 diff --git a/examples/tcpsockets/examples/README.md b/examples/tcpsockets/examples/README.md index e83c0b5..0b6e4ca 100644 --- a/examples/tcpsockets/examples/README.md +++ b/examples/tcpsockets/examples/README.md @@ -8,8 +8,14 @@ Let us know if you find any issues. If you want to contribute or add a new examp ## Table of Contents - [Echo server](#echo-server) +- [Pause/Resume - Backpressure](#pauseresume---backpressure) ### [Echo server](echo.js) -An echo server just reflects a message received from a client to the same client. If we send a message saying "Hello, Server!", we will receive the same message, just like an echo. This example shows some basic TCP server and client interactions. \ No newline at end of file +An echo server just reflects a message received from a client to the same client. If we send a message saying "Hello, Server!", we will receive the same message, just like an echo. This example shows some basic TCP server and client interactions. + +### [Pause/Resume - Backpressure](pause-resume.js) +There is a general problem that occurs during data handling called **backpressure** and describes a buildup of data behind a buffer during data transfer. When the receiving end of the transfer has complex operations, or is slower for whatever reason, there is a tendency for data from the incoming source to accumulate, like a clog. + +To solve this problem, there must be a delegation system in place to ensure a smooth flow of data from one source to another and is often times referred to as flow control. In Node.js, streams have been the adopted solution and `react-native-tcp-socket` mimics the same functionality. If a call to `socket.write(chunk)` returns `false`, the `'drain'` event will be emitted when it is appropriate to resume writing data to the stream. diff --git a/examples/tcpsockets/examples/echo.js b/examples/tcpsockets/examples/echo.js index 2d61064..09c8671 100644 --- a/examples/tcpsockets/examples/echo.js +++ b/examples/tcpsockets/examples/echo.js @@ -1,5 +1,4 @@ const net = require('net'); -const PORT = Number(9 + (Math.random() * 999).toFixed(0)); const server = new net.Server(); const client = new net.Socket(); @@ -9,23 +8,24 @@ function init() { socket.write('Echo server\r\n'); }); - server.listen({ port: PORT, host: '127.0.0.1', reuseAddress: true }); - - client.connect( - // @ts-ignore - { - port: PORT, - host: '127.0.0.1', - localAddress: '127.0.0.1', - reuseAddress: true, - // localPort: 20000, - // interface: "wifi", - // tls: true - }, - () => { - client.write('Hello, server! Love, Client.'); - } - ); + server.listen({ port: 0, host: '127.0.0.1', reuseAddress: true }, () => { + const port = server.address()?.port; + if (!port) throw new Error('Server port not found'); + client.connect( + { + port: port, + host: '127.0.0.1', + localAddress: '127.0.0.1', + reuseAddress: true, + // localPort: 20000, + // interface: "wifi", + // tls: true + }, + () => { + client.write('Hello, server! Love, Client.'); + } + ); + }); client.on('data', () => { client.destroy(); // kill client after server's response diff --git a/examples/tcpsockets/examples/main.js b/examples/tcpsockets/examples/main.js index 5609389..595e732 100644 --- a/examples/tcpsockets/examples/main.js +++ b/examples/tcpsockets/examples/main.js @@ -29,6 +29,10 @@ client.on('connect', () => { console.log('Opened client on ' + JSON.stringify(client.address())); }); +client.on('drain', () => { + console.log('Client drained'); +}); + client.on('data', (data) => { console.log('Client received: ' + data); }); diff --git a/examples/tcpsockets/examples/pause-resume.js b/examples/tcpsockets/examples/pause-resume.js new file mode 100644 index 0000000..ea0b665 --- /dev/null +++ b/examples/tcpsockets/examples/pause-resume.js @@ -0,0 +1,60 @@ +const net = require('net'); + +const server = new net.Server(); +const client = new net.Socket(); + +function init() { + server.on('connection', (socket) => { + socket.on('data', (chunk) => { + console.log(`Received ${chunk.length} bytes of data.`); + console.log('Server client chunk start: ' + chunk.slice(0, 30)); + console.log('Server client chunk end: ' + chunk.slice(chunk.length - 30, chunk.length)); + socket.pause(); + console.log('There will be no additional data for 1 second.'); + setTimeout(() => { + console.log('Now data will start flowing again.'); + socket.resume(); + }, 1000); + }); + }); + + server.listen({ port: 0, host: '127.0.0.1', reuseAddress: true }, () => { + const port = server.address()?.port; + if (!port) throw new Error('Server port not found'); + client.connect( + { + port: port, + host: '127.0.0.1', + localAddress: '127.0.0.1', + reuseAddress: true, + // localPort: 20000, + // interface: "wifi", + // tls: true + }, + () => { + let i = 0; + const MAX_ITER = 300000; + write(); + async function write() { + let ok = true; + while (i < MAX_ITER && ok) { + i++; + const buff = ' ->' + i + '<- '; + ok = client.write(buff); + // await new Promise((resolve) => setTimeout(resolve, 50)); + // console.log('Bytes sent', ok, buff, client.bytesSent); + } + if (i >= MAX_ITER) { + client.destroy(); + } else if (!ok) { + // Had to stop early! + // Write some more once it drains. + client.once('drain', write); + } + } + } + ); + }); +} + +module.exports = { init, server, client }; diff --git a/examples/tcpsockets/ios/Podfile.lock b/examples/tcpsockets/ios/Podfile.lock index 3942a33..8e0972a 100644 --- a/examples/tcpsockets/ios/Podfile.lock +++ b/examples/tcpsockets/ios/Podfile.lock @@ -186,7 +186,7 @@ PODS: - React-cxxreact (= 0.63.2) - React-jsi (= 0.63.2) - React-jsinspector (0.63.2) - - react-native-tcp-socket (5.1.0): + - react-native-tcp-socket (5.2.1): - CocoaAsyncSocket - React-Core - React-RCTActionSheet (0.63.2): @@ -361,7 +361,7 @@ SPEC CHECKSUMS: React-jsi: 54245e1d5f4b690dec614a73a3795964eeef13a8 React-jsiexecutor: 8ca588cc921e70590820ce72b8789b02c67cce38 React-jsinspector: b14e62ebe7a66e9231e9581279909f2fc3db6606 - react-native-tcp-socket: 334094926111f66bd6b305859aac4d07745b8d81 + react-native-tcp-socket: d05b8acbf1c045ff5b6a92ec6ca91960c94387d1 React-RCTActionSheet: 910163b6b09685a35c4ebbc52b66d1bfbbe39fc5 React-RCTAnimation: 9a883bbe1e9d2e158d4fb53765ed64c8dc2200c6 React-RCTBlob: 39cf0ece1927996c4466510e25d2105f67010e13 diff --git a/ios/TcpSocketClient.h b/ios/TcpSocketClient.h index 17825be..109fcb2 100644 --- a/ios/TcpSocketClient.h +++ b/ios/TcpSocketClient.h @@ -27,6 +27,7 @@ typedef enum RCTTCPError RCTTCPError; - (void)onData:(NSNumber *)clientID data:(NSData *)data; - (void)onClose:(TcpSocketClient*)client withError:(NSError *)err; - (void)onError:(TcpSocketClient*)client withError:(NSError *)err; +- (void)onWrittenData:(TcpSocketClient*)client msgId:(NSNumber *)msgId; - (NSNumber*)getNextId; @end @@ -56,8 +57,7 @@ typedef enum RCTTCPError RCTTCPError; ///--------------------------------------------------------------------------------------- /** * Connects to a host and port - * - * @param port + * @param port port * @param host ip address * @param options NSDictionary which can have @"localAddress" and @"localPort" to specify the local interface * @return true if connected, false if there was an error @@ -83,7 +83,7 @@ typedef enum RCTTCPError RCTTCPError; * write data * */ -- (void)writeData:(NSData*) data callback:(RCTResponseSenderBlock) callback; +- (void)writeData:(NSData*) data msgId:(NSNumber*)msgId; /** * end client @@ -99,4 +99,8 @@ typedef enum RCTTCPError RCTTCPError; - (void)setKeepAlive:(BOOL)enable initialDelay:(int)initialDelay; +- (void) pause; + +- (void) resume; + @end diff --git a/ios/TcpSocketClient.m b/ios/TcpSocketClient.m index a3d9b15..f956f8d 100644 --- a/ios/TcpSocketClient.m +++ b/ios/TcpSocketClient.m @@ -12,9 +12,10 @@ @interface TcpSocketClient() @private BOOL _tls; BOOL _checkValidity; + BOOL _paused; NSString *_certPath; GCDAsyncSocket *_tcpSocket; - NSMutableDictionary *_pendingSends; + NSMutableDictionary *_pendingSends; NSLock *_lock; long _sendTag; } @@ -47,6 +48,7 @@ - (id)initWithClientId:(NSNumber *)clientID andConfig:(id) if (self) { _id = clientID; _clientDelegate = aDelegate; + _paused = false; _pendingSends = [NSMutableDictionary dictionary]; _lock = [[NSLock alloc] init]; _tcpSocket = tcpSocket; @@ -182,24 +184,23 @@ - (BOOL)listen:(NSDictionary *)options error:(NSError **)error BOOL isListening = [_tcpSocket acceptOnInterface:host port:port error:error]; if (isListening == YES) { [_clientDelegate onListen: self]; - [_tcpSocket readDataWithTimeout:-1 tag:_id.longValue]; } return isListening; } -- (void)setPendingSend:(RCTResponseSenderBlock)callback forKey:(NSNumber *)key +- (void)setPendingSend:(NSNumber *)msgId forKey:(NSNumber *)key { [_lock lock]; @try { - [_pendingSends setObject:callback forKey:key]; + [_pendingSends setObject:msgId forKey:key]; } @finally { [_lock unlock]; } } -- (RCTResponseSenderBlock)getPendingSend:(NSNumber *)key +- (NSNumber*)getPendingSend:(NSNumber *)key { [_lock lock]; @try { @@ -224,24 +225,19 @@ - (void)dropPendingSend:(NSNumber *)key - (void)socket:(GCDAsyncSocket *)sock didWriteDataWithTag:(long)msgTag { NSNumber* tagNum = [NSNumber numberWithLong:msgTag]; - RCTResponseSenderBlock callback = [self getPendingSend:tagNum]; - if (callback) { - callback(@[]); + NSNumber* msgId = [self getPendingSend:tagNum]; + if (msgId) { + [_clientDelegate onWrittenData:self msgId:msgId]; [self dropPendingSend:tagNum]; } } -- (void) writeData:(NSData *)data - callback:(RCTResponseSenderBlock)callback +- (void) writeData:(NSData *)data msgId:(NSNumber*)msgId { - if (callback) { - [self setPendingSend:callback forKey:@(_sendTag)]; - } + [self setPendingSend:msgId forKey:@(_sendTag)]; [_tcpSocket writeData:data withTimeout:-1 tag:_sendTag]; _sendTag++; - - [_tcpSocket readDataWithTimeout:-1 tag:_id.longValue]; } - (void)end @@ -254,6 +250,17 @@ - (void)destroy [_tcpSocket disconnect]; } +- (void)pause { + _paused = true; +} + +- (void)resume { + if (_paused) { + [_tcpSocket readDataWithTimeout:-1 tag:_id.longValue]; + } + _paused = false; +} + - (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)tag { if (!_clientDelegate) { RCTLogWarn(@"didReadData with nil clientDelegate for %@", [sock userData]); @@ -261,8 +268,9 @@ - (void)socket:(GCDAsyncSocket *)sock didReadData:(NSData *)data withTag:(long)t } [_clientDelegate onData:@(tag) data:data]; - - [sock readDataWithTimeout:-1 tag:tag]; + if (!_paused) { + [sock readDataWithTimeout:-1 tag:tag]; + } } - (void)socket:(GCDAsyncSocket *)sock didAcceptNewSocket:(GCDAsyncSocket *)newSocket diff --git a/ios/TcpSockets.m b/ios/TcpSockets.m index 5295841..e21ff0e 100644 --- a/ios/TcpSockets.m +++ b/ios/TcpSockets.m @@ -24,7 +24,8 @@ @implementation TcpSockets @"connection", @"data", @"close", - @"error"]; + @"error", + @"written"]; } - (void)startObserving { @@ -82,15 +83,15 @@ - (TcpSocketClient *)createSocket:(nonnull NSNumber*)cId } RCT_EXPORT_METHOD(write:(nonnull NSNumber*)cId - string:(NSString *)base64String - callback:(RCTResponseSenderBlock)callback) { + string:(nonnull NSString*)base64String + callback:(nonnull NSNumber*)msgId) { TcpSocketClient* client = [self findClient:cId]; if (!client) return; // iOS7+ // TODO: use https://github.com/nicklockwood/Base64 for compatibility with earlier iOS versions NSData *data = [[NSData alloc] initWithBase64EncodedString:base64String options:0]; - [client writeData:data callback:callback]; + [client writeData:data msgId:msgId]; } RCT_EXPORT_METHOD(end:(nonnull NSNumber*)cId) { @@ -135,6 +136,28 @@ - (TcpSocketClient *)createSocket:(nonnull NSNumber*)cId [client setKeepAlive:enable initialDelay:initialDelay]; } +RCT_EXPORT_METHOD(pause:(nonnull NSNumber*)cId) { + TcpSocketClient* client = [self findClient:cId]; + if (!client) return; + + [client pause]; +} + +RCT_EXPORT_METHOD(resume:(nonnull NSNumber*)cId) { + TcpSocketClient* client = [self findClient:cId]; + if (!client) return; + + [client resume]; +} + +- (void)onWrittenData:(TcpSocketClient*) client msgId:(NSNumber *)msgId +{ + [self sendEventWithName:@"written" body:@{ + @"id": client.id, + @"msgId": msgId, + }]; +} + - (void)onConnect:(TcpSocketClient*) client { GCDAsyncSocket * socket = [client getSocket]; diff --git a/src/Server.js b/src/Server.js index 89cd608..0a5bf3b 100644 --- a/src/Server.js +++ b/src/Server.js @@ -7,7 +7,13 @@ import Socket from './Socket'; import { nativeEventEmitter, getNextId } from './Globals'; /** - * @extends {EventEmitter<'connection' | 'listening' | 'error' | 'close', any>} + * @typedef {object} ServerEvents + * @property {() => void} close + * @property {(socket: Socket) => void} connection + * @property {() => void} listening + * @property {(err: Error) => void} error + * + * @extends {EventEmitter} */ export default class Server extends EventEmitter { /** diff --git a/src/Socket.js b/src/Socket.js index ab89e4f..3ea9d43 100644 --- a/src/Socket.js +++ b/src/Socket.js @@ -34,7 +34,19 @@ const STATE = { * tlsCert?: any, * }} ConnectionOptions * - * @extends {EventEmitter<'connect' | 'timeout' | 'data' | 'error' | 'close', any>} + * @typedef {object} ReadableEvents + * @property {() => void} pause + * @property {() => void} resume + * + * @typedef {object} SocketEvents + * @property {(had_error: boolean) => void} close + * @property {() => void} connect + * @property {(data: Buffer | string) => void} data + * @property {() => void} drain + * @property {(err: Error) => void} error + * @property {() => void} timeout + * + * @extends {EventEmitter} */ export default class Socket extends EventEmitter { /** @@ -46,6 +58,8 @@ export default class Socket extends EventEmitter { this._id = undefined; /** @private */ this._eventEmitter = nativeEventEmitter; + /** @type {EventEmitter<'written', any>} @private */ + this._msgEvtEmitter = new EventEmitter(); /** @type {number} @private */ this._timeoutMsecs = 0; /** @private */ @@ -54,6 +68,24 @@ export default class Socket extends EventEmitter { this._state = STATE.DISCONNECTED; /** @private */ this._encoding = undefined; + /** @private */ + this._msgId = 0; + /** @private */ + this._lastRcvMsgId = Number.MAX_SAFE_INTEGER - 1; + /** @private */ + this._lastSentMsgId = 0; + /** @private */ + this._paused = false; + /** @private */ + this._resuming = false; + /** @private */ + this._writeBufferSize = 0; + /** @type {{ id: number; data: string; }[]} @private */ + this._pausedDataEvents = []; + this.readableHighWaterMark = 16384; + this.writableHighWaterMark = 16384; + this.writableNeedDrain = false; + this.bytesSent = 0; this.localAddress = undefined; this.localPort = undefined; this.remoteAddress = undefined; @@ -254,53 +286,140 @@ export default class Socket extends EventEmitter { /** * Sends data on the socket. The second parameter specifies the encoding in the case of a string — it defaults to UTF8 encoding. * + * Returns `true` if the entire data was flushed successfully to the kernel buffer. Returns `false` if all or part of the data + * was queued in user memory. `'drain'` will be emitted when the buffer is again free. + * * The optional callback parameter will be executed when the data is finally written out, which may not be immediately. * * @param {string | Buffer | Uint8Array} buffer * @param {BufferEncoding} [encoding] - * @param {(error: string | null) => void} [callback] + * @param {(err?: Error) => void} [cb] + * + * @return {boolean} */ - write(buffer, encoding, callback) { + write(buffer, encoding, cb) { const self = this; if (this._state === STATE.DISCONNECTED) throw new Error('Socket is not connected.'); - callback = callback || (() => {}); const generatedBuffer = this._generateSendBuffer(buffer, encoding); - Sockets.write( - this._id, - generatedBuffer.toString('base64'), - /** - * @param {string} err - */ - function(err) { + this._writeBufferSize += generatedBuffer.byteLength; + const currentMsgId = this._msgId; + this._msgId = (this._msgId + 1) % Number.MAX_SAFE_INTEGER; + const msgEvtHandler = (/** @type {{id: number, msgId: number, err?: string}} */ evt) => { + const { msgId, err } = evt; + if (msgId === currentMsgId) { + this._msgEvtEmitter.removeListener('written', msgEvtHandler); + this._writeBufferSize -= generatedBuffer.byteLength; + this._lastRcvMsgId = msgId; if (self._timeout) self._activateTimer(); - if (callback) { - if (err) return callback(err); - callback(null); + if (this.writableNeedDrain && this._lastSentMsgId == msgId) { + this.writableNeedDrain = false; + this.emit('drain'); + } + if (cb) { + if (err) cb(new Error(err)); + else cb(); } } - ); + }; + // Callback equivalent with better performance + this._msgEvtEmitter.on('written', msgEvtHandler, this); + const ok = this._writeBufferSize < this.writableHighWaterMark; + if (!ok) this.writableNeedDrain = true; + this._lastSentMsgId = currentMsgId; + Sockets.write(this._id, generatedBuffer.toString('base64'), currentMsgId); + return ok; + } + + /** + * Pauses the reading of data. That is, `'data'` events will not be emitted. Useful to throttle back an upload. + */ + pause() { + if (this._paused) return; + this._paused = true; + Sockets.pause(this._id); + this.emit('pause'); + } + + /** + * Resumes reading after a call to `socket.pause()`. + */ + resume() { + if (!this._paused) return; + this._paused = false; + this.emit('resume'); + this._recoverDataEventsAfterPause(); } ref() { - console.warn('react-native-tcp-socket: TcpSocket.ref() method will have no effect.'); + console.warn('react-native-tcp-socket: Socket.ref() method will have no effect.'); } unref() { - console.warn('react-native-tcp-socket: TcpSocket.unref() method will have no effect.'); + console.warn('react-native-tcp-socket: Socket.unref() method will have no effect.'); } /** * @private */ - _registerEvents() { - this._unregisterEvents(); - this._dataListener = this._eventEmitter.addListener('data', (evt) => { - if (evt.id !== this._id) return; + async _recoverDataEventsAfterPause() { + if (this._resuming) return; + this._resuming = true; + while (this._pausedDataEvents.length > 0) { + // Concat all buffered events for better performance + const buffArray = []; + let readBytes = 0; + let i = 0; + for (; i < this._pausedDataEvents.length; i++) { + const evtData = Buffer.from(this._pausedDataEvents[i].data, 'base64'); + readBytes += evtData.byteLength; + if (readBytes <= this.readableHighWaterMark) { + buffArray.push(evtData); + } else { + const buffOffset = this.readableHighWaterMark - readBytes; + buffArray.push(evtData.slice(0, buffOffset)); + this._pausedDataEvents[i].data = evtData.slice(buffOffset).toString('base64'); + break; + } + } + // Generate new event with the concatenated events + const evt = { + id: this._pausedDataEvents[0].id, + data: Buffer.concat(buffArray).toString('base64'), + }; + // Clean the old events + this._pausedDataEvents = this._pausedDataEvents.slice(i); + this._onDeviceDataEvt(evt); + if (this._paused) { + this._resuming = false; + return; + } + } + this._resuming = false; + Sockets.resume(this._id); + } + + /** + * @private + */ + _onDeviceDataEvt = (/** @type {{ id: number; data: string; }} */ evt) => { + if (evt.id !== this._id) return; + if (!this._paused) { const bufferTest = Buffer.from(evt.data, 'base64'); const finalData = this._encoding ? bufferTest.toString(this._encoding) : bufferTest; this.emit('data', finalData); - }); + } else { + // If the socket is paused, save the data events for later + this._pausedDataEvents.push(evt); + } + }; + + /** + * @private + */ + _registerEvents() { + this._unregisterEvents(); + this._dataListener = this._eventEmitter.addListener('data', this._onDeviceDataEvt); this._errorListener = this._eventEmitter.addListener('error', (evt) => { if (evt.id !== this._id) return; this.destroy(); @@ -316,6 +435,10 @@ export default class Socket extends EventEmitter { this._setConnected(evt.connection); this.emit('connect'); }); + this._writtenListener = this._eventEmitter.addListener('written', (evt) => { + if (evt.id !== this._id) return; + this._msgEvtEmitter.emit('written', evt); + }); } /** @@ -326,6 +449,7 @@ export default class Socket extends EventEmitter { this._errorListener?.remove(); this._closeListener?.remove(); this._connectListener?.remove(); + this._writtenListener?.remove(); } /**