Skip to content

feat: Implement backpressure handling #115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Aug 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ React Native TCP socket API for Android, iOS & macOS with **client SSL/TLS suppo
- [Server](#server)
- [SSL Client](#ssl-client)
- [API](#api)
- [TcpSocket](#tcpsocket)
- [Socket](#socket)
- [`createConnection()`](#createconnection)
- [Server](#server-1)
- [`listen()`](#listen)
Expand Down Expand Up @@ -218,7 +218,7 @@ _Note: In order to use self-signed certificates make sure to [update your metro.
## API
Here are listed all methods implemented in `react-native-tcp-socket`, their functionalities are equivalent to those provided by Node's [net](https://nodejs.org/api/net.html) (more info on [#41](https://github.com/Rapsssito/react-native-tcp-socket/issues/41)). However, the **methods whose interface differs from Node are marked in bold**.

### TcpSocket
### Socket
* **Methods:**
* **[`TcpSocket.createConnection(options[, callback])`](#createconnection)**
* [`address()`](https://nodejs.org/api/net.html#net_socket_address)
Expand All @@ -229,17 +229,28 @@ Here are listed all methods implemented in `react-native-tcp-socket`, their func
* [`setNoDelay([noDelay])`](https://nodejs.org/api/net.html#net_socket_setnodelay_nodelay)
* [`setTimeout(timeout[, callback])`](https://nodejs.org/api/net.html#net_socket_settimeout_timeout_callback)
* [`write(data[, encoding][, callback])`](https://nodejs.org/api/net.html#net_socket_write_data_encoding_callback)
* [`pause()`](https://nodejs.org/api/net.html#net_socket_pause)
* `ref()` - _Will not have any effect_
* [`resume()`](https://nodejs.org/api/net.html#net_socket_resume)
* `unref()` - _Will not have any effect_
* **Properties:**
* Inherited from [`Stream.Writable`](https://nodejs.org/api/stream.html#stream_class_stream_writable):
* [`writableNeedDrain`](https://nodejs.org/api/stream.html#stream_writable_writableneeddrain)
* [`remoteAddress`](https://nodejs.org/api/net.html#net_socket_remoteaddress)
* [`remoteFamily`](https://nodejs.org/api/net.html#net_socket_remotefamily)
* [`remotePort`](https://nodejs.org/api/net.html#net_socket_remoteport)
* [`localAddress`](https://nodejs.org/api/net.html#net_socket_localaddress)
* [`localPort`](https://nodejs.org/api/net.html#net_socket_localport)
* **Events:**
* Inherited from [`Stream.Readable`](https://nodejs.org/api/stream.html#stream_class_stream_readable):
* [`'pause'`](https://nodejs.org/api/stream.html#stream_event_pause)
* [`'resume'`](https://nodejs.org/api/stream.html#stream_event_resume)
* [`'close'`](https://nodejs.org/api/net.html#net_event_close_1)
* [`'connect'`](https://nodejs.org/api/net.html#net_event_connect)
* [`'data'`](https://nodejs.org/api/net.html#net_event_data)
* [`'drain'`](https://nodejs.org/api/net.html#net_event_drain)
* [`'error'`](https://nodejs.org/api/net.html#net_event_error_1)
* [`'timeout'`](https://nodejs.org/api/net.html#net_event_timeout)

#### `createConnection()`
`createConnection(options[, callback])` creates a TCP connection using the given [`options`](#createconnection-options).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package com.asterinet.react.tcpsocket;

import android.util.Base64;

import com.facebook.react.bridge.Arguments;
import com.facebook.react.bridge.ReactContext;
import com.facebook.react.bridge.WritableMap;
import com.facebook.react.modules.core.DeviceEventManagerModule;

import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;

import javax.annotation.Nullable;

public class TcpEventListener {

private final DeviceEventManagerModule.RCTDeviceEventEmitter rctEvtEmitter;

public TcpEventListener(final ReactContext reactContext) {
rctEvtEmitter = reactContext.getJSModule(DeviceEventManagerModule.RCTDeviceEventEmitter.class);
}

public void onConnection(int serverId, int clientId, Socket socket) {
WritableMap eventParams = Arguments.createMap();
eventParams.putInt("id", serverId);

WritableMap infoParams = Arguments.createMap();
infoParams.putInt("id", clientId);

WritableMap connectionParams = Arguments.createMap();
InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress();

connectionParams.putString("localAddress", socket.getLocalAddress().getHostAddress());
connectionParams.putInt("localPort", socket.getLocalPort());
connectionParams.putString("remoteAddress", remoteAddress.getAddress().getHostAddress());
connectionParams.putInt("remotePort", socket.getPort());
connectionParams.putString("remoteFamily", remoteAddress.getAddress() instanceof Inet6Address ? "IPv6" : "IPv4");

infoParams.putMap("connection", connectionParams);
eventParams.putMap("info", infoParams);

sendEvent("connection", eventParams);
}

public void onConnect(int id, TcpSocketClient client) {
WritableMap eventParams = Arguments.createMap();
eventParams.putInt("id", id);
WritableMap connectionParams = Arguments.createMap();
Socket socket = client.getSocket();
InetSocketAddress remoteAddress = (InetSocketAddress) socket.getRemoteSocketAddress();

connectionParams.putString("localAddress", socket.getLocalAddress().getHostAddress());
connectionParams.putInt("localPort", socket.getLocalPort());
connectionParams.putString("remoteAddress", remoteAddress.getAddress().getHostAddress());
connectionParams.putInt("remotePort", socket.getPort());
connectionParams.putString("remoteFamily", remoteAddress.getAddress() instanceof Inet6Address ? "IPv6" : "IPv4");
eventParams.putMap("connection", connectionParams);
sendEvent("connect", eventParams);
}

public void onListen(int id, TcpSocketServer server) {
WritableMap eventParams = Arguments.createMap();
eventParams.putInt("id", id);
WritableMap connectionParams = Arguments.createMap();
ServerSocket serverSocket = server.getServerSocket();
InetAddress address = serverSocket.getInetAddress();

connectionParams.putString("localAddress", serverSocket.getInetAddress().getHostAddress());
connectionParams.putInt("localPort", serverSocket.getLocalPort());
connectionParams.putString("localFamily", address instanceof Inet6Address ? "IPv6" : "IPv4");
eventParams.putMap("connection", connectionParams);
sendEvent("listening", eventParams);
}

public void onData(int id, byte[] data) {
WritableMap eventParams = Arguments.createMap();
eventParams.putInt("id", id);
eventParams.putString("data", Base64.encodeToString(data, Base64.NO_WRAP));

sendEvent("data", eventParams);
}

public void onWritten(int id, int msgId, @Nullable String error) {
WritableMap eventParams = Arguments.createMap();
eventParams.putInt("id", id);
eventParams.putInt("msgId", msgId);
eventParams.putString("err", error);

sendEvent("written", eventParams);
}

public void onClose(int id, String error) {
if (error != null) {
onError(id, error);
}
WritableMap eventParams = Arguments.createMap();
eventParams.putInt("id", id);
eventParams.putBoolean("hadError", error != null);

sendEvent("close", eventParams);
}

public void onError(int id, String error) {
WritableMap eventParams = Arguments.createMap();
eventParams.putInt("id", id);
eventParams.putString("error", error);

sendEvent("error", eventParams);
}

private void sendEvent(String eventName, WritableMap params) {
rctEvtEmitter.emit(eventName, params);
}
}
Original file line number Diff line number Diff line change
@@ -1,70 +1,64 @@
package com.asterinet.react.tcpsocket;

import android.os.AsyncTask;
import android.util.Pair;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.net.Socket;
import java.util.Arrays;

/**
* This is a specialized AsyncTask that receives data from a socket in the background, and
* This is a specialized Runnable that receives data from a socket in the background, and
* notifies it's listener when data is received. This is not threadsafe, the listener
* should handle synchronicity.
*/
class TcpReceiverTask extends AsyncTask<Pair<TcpSocketClient, TcpReceiverTask.OnDataReceivedListener>, Void, Void> {
public class TcpReceiverTask implements Runnable {

private final TcpSocketClient clientSocket;
private final TcpEventListener receiverListener;
private boolean paused = false;

public TcpReceiverTask(TcpSocketClient clientSocket, TcpEventListener receiverListener) {
this.clientSocket = clientSocket;
this.receiverListener = receiverListener;
}

/**
* An infinite loop to block and read data from the socket.
*/
@SafeVarargs
@Override
protected final Void doInBackground(Pair<TcpSocketClient, TcpReceiverTask.OnDataReceivedListener>... params) {
if (params.length > 1) {
throw new IllegalArgumentException("This task is only for a single socket/listener pair.");
}

TcpSocketClient clientSocket = params[0].first;
OnDataReceivedListener receiverListener = params[0].second;
public void run() {
int socketId = clientSocket.getId();
Socket socket = clientSocket.getSocket();
byte[] buffer = new byte[8192];
int bufferCount;
byte[] buffer = new byte[16384];
try {
BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
while (!isCancelled() && !socket.isClosed()) {
bufferCount = in.read(buffer);
while (!socket.isClosed()) {
int bufferCount = in.read(buffer);
waitIfPaused();
if (bufferCount > 0) {
receiverListener.onData(socketId, Arrays.copyOfRange(buffer, 0, bufferCount));
} else if (bufferCount == -1) {
clientSocket.destroy();
}
}
} catch (IOException ioe) {
} catch (IOException | InterruptedException ioe) {
if (receiverListener != null && !socket.isClosed()) {
receiverListener.onError(socketId, ioe.getMessage());
}
this.cancel(false);
}
return null;
}

/**
* Listener interface for receive events.
*/
@SuppressWarnings("WeakerAccess")
public interface OnDataReceivedListener {
void onConnection(Integer serverId, Integer clientId, Socket socket);

void onConnect(Integer id, TcpSocketClient client);

void onListen(Integer id, TcpSocketServer server);

void onData(Integer id, byte[] data);
public synchronized void pause() {
paused = true;
}

void onClose(Integer id, String error);
public synchronized void resume() {
paused = false;
notify();
}

void onError(Integer id, String error);
private synchronized void waitIfPaused() throws InterruptedException {
while (paused) {
wait();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@

import android.content.Context;
import android.net.Network;
import android.util.Pair;

import com.facebook.react.bridge.ReadableMap;

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
Expand All @@ -23,21 +21,19 @@
import androidx.annotation.Nullable;

class TcpSocketClient extends TcpSocket {
private final ExecutorService executorService;
private TcpReceiverTask receiverTask;
private final ExecutorService listenExecutor;
private final ExecutorService writeExecutor;
private final TcpEventListener receiverListener;
private final TcpReceiverTask receiverTask;
private Socket socket;
private TcpReceiverTask.OnDataReceivedListener mReceiverListener;

TcpSocketClient(@NonNull final TcpReceiverTask.OnDataReceivedListener receiverListener, @NonNull final Integer id, @Nullable final Socket socket) {
TcpSocketClient(@NonNull final TcpEventListener receiverListener, @NonNull final Integer id, @Nullable final Socket socket) {
super(id);
this.executorService = Executors.newFixedThreadPool(1);
listenExecutor = Executors.newSingleThreadExecutor();
writeExecutor = Executors.newSingleThreadExecutor();
receiverTask = new TcpReceiverTask(this, receiverListener);
this.socket = socket;
receiverTask = new TcpReceiverTask();
mReceiverListener = receiverListener;
}

ExecutorService getExecutorService() {
return this.executorService;
this.receiverListener = receiverListener;
}

public Socket getSocket() {
Expand Down Expand Up @@ -83,43 +79,43 @@ public void connect(@NonNull final Context context, @NonNull final String addres
startListening();
}

@SuppressWarnings("WeakerAccess")
public void startListening() {
//noinspection unchecked
receiverTask.executeOnExecutor(getExecutorService(), new Pair<>(this, mReceiverListener));
listenExecutor.execute(receiverTask);
}

/**
* Sends data from the socket
*
* @param data data to be sent
*/
public void write(final byte[] data) throws IOException {
if (socket == null) {
throw new IOException("Socket is not connected.");
}
OutputStream output = socket.getOutputStream();
output.write(data);
public void write(final int msgId, final byte[] data) {
writeExecutor.execute(new Runnable() {
@Override
public void run() {
try {
socket.getOutputStream().write(data);
receiverListener.onWritten(getId(), msgId, null);
} catch (IOException e) {
receiverListener.onWritten(getId(), msgId, e.toString());
receiverListener.onError(getId(), e.toString());
}
}
});
}

/**
* Shuts down the receiver task, closing the socket.
*/
public void destroy() {
try {
if (receiverTask != null && !receiverTask.isCancelled()) {
// stop the receiving task
receiverTask.cancel(true);
getExecutorService().shutdown();
}
// close the socket
if (socket != null && !socket.isClosed()) {
socket.close();
mReceiverListener.onClose(getId(), null);
receiverListener.onClose(getId(), null);
socket = null;
}
} catch (IOException e) {
mReceiverListener.onClose(getId(), e.getMessage());
receiverListener.onClose(getId(), e.getMessage());
}
}

Expand All @@ -143,4 +139,12 @@ public void setKeepAlive(final boolean enable, final int initialDelay) throws IO
// `initialDelay` is ignored
socket.setKeepAlive(enable);
}

public void pause() {
receiverTask.pause();
}

public void resume() {
receiverTask.resume();
}
}
Loading