Skip to content

Add basic SocketChannelProvider implementation #144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions src/main/java/org/tarantool/ReconnectingSocketProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package org.tarantool;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.channels.SocketChannel;

/**
* Basic SocketChannelProvider implementation with the ability of reconnecting after failure.
* To be used with {@link TarantoolClientImpl}.
*/
public abstract class ReconnectingSocketProvider implements SocketChannelProvider {
/** Timeout to establish socket connection with an individual server. */
private int timeout; // 0 is infinite.
/** Limit of retries. */
private int retriesLimit = -1; // No-limit.

/**
* @return Maximum amount of time to wait for a socket connection establishment
* with an individual server.
*/
public int getTimeout() {
return timeout;
}

/**
* Sets maximum amount of time to wait for a socket connection establishment
* with an individual server.
*
* Zero means infinite timeout.
*
* @param timeout Timeout value, ms.
* @return {@code this}.
* @throws IllegalArgumentException If timeout is negative.
*/
public ReconnectingSocketProvider setTimeout(int timeout) {
if (timeout < 0) {
throw new IllegalArgumentException("timeout is negative");
}
this.timeout = timeout;
return this;
}

/**
* @return Maximum reconnect attempts to make before raising exception.
*/
public int getRetriesLimit() {
return retriesLimit;
}

/**
* Sets maximum amount of reconnect attempts to be made before an exception is raised.
* The retry count is maintained by a {@link #get(int, Throwable)} caller
* when a socket level connection was established.
*
* Negative value means unlimited.
*
* @param retriesLimit Limit of retries to use.
* @return {@code this}.
*/
public ReconnectingSocketProvider setRetriesLimit(int retriesLimit) {
this.retriesLimit = retriesLimit;
return this;
}

/**
* Provides a decision on whether retries limit is hit.
*
* @param retries Current count of retries.
* @return {@code true} if retries are exhausted.
*/
private boolean areRetriesExhausted(int retries) {
int limit = getRetriesLimit();
if (limit < 0)
return false;
return retries >= limit;
}

/**
* Return a configured socket address where a Tarantool instance is listening to
* @return {@link java.net.InetSocketAddress}
*/
abstract InetSocketAddress getSocketAddress();

/** {@inheritDoc} */
@Override
public SocketChannel get(int retryNumber, Throwable lastError) {
if (areRetriesExhausted(retryNumber)) {
throw new CommunicationException("Connection retries exceeded.", lastError);
}
try (SocketChannel channel = SocketChannel.open()) {
InetSocketAddress addr = getSocketAddress();
channel.socket().connect(addr, timeout);
return channel;
} catch (SocketTimeoutException e) {
throw new CommunicationException("Connection timed out", e);
} catch (IOException e) {
throw new CommunicationException("Failed to establish a connection", e);
}
}
}
59 changes: 59 additions & 0 deletions src/main/java/org/tarantool/ReconnectingSocketProviderImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.tarantool;

import java.net.InetSocketAddress;

public class ReconnectingSocketProviderImpl extends ReconnectingSocketProvider {

private String host;
private int port;

/**
* Returns the IP address or hostname of a Tarantool server
* @return {@link java.lang.String}
*/
public String getHost() {
return host;
}

public ReconnectingSocketProviderImpl setHost(String host) {
if (host == null || host.isEmpty()) {
throw new IllegalArgumentException("Tarantool server host is empty");
}
this.host = host;
return this;
}

/**
* Returns the Tarantool server port
* @return {@code int}
*/
public int getPort() {
return port;
}

public void setPort(int port) {
if (port <= 0) {
throw new IllegalArgumentException("Tarantool server port is less or equal to 0");
}
this.port = port;
}

public ReconnectingSocketProviderImpl(String host, int port) {
if (host == null || host.isEmpty()) {
throw new IllegalArgumentException("Tarantool server host is empty");
}
if (port <= 0) {
throw new IllegalArgumentException("Tarantool server port is less or equal to 0");
}
this.host = host;
this.port = port;
}

/**
* {@inheritDoc}
*/
@Override
public InetSocketAddress getSocketAddress() {
return new InetSocketAddress(this.host, this.port);
}
}
118 changes: 8 additions & 110 deletions src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
package org.tarantool;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.util.Arrays;

/**
* Basic reconnection strategy that changes addresses in a round-robin fashion.
* To be used with {@link TarantoolClientImpl}.
*/
public class RoundRobinSocketProviderImpl implements SocketChannelProvider {
/** Timeout to establish socket connection with an individual server. */
private int timeout; // 0 is infinite.
/** Limit of retries. */
private int retriesLimit = -1; // No-limit.
public class RoundRobinSocketProviderImpl extends ReconnectingSocketProvider {
/** Server addresses as configured. */
private final String[] addrs;
/** Socket addresses. */
Expand Down Expand Up @@ -47,107 +41,24 @@ public String[] getAddresses() {
}

/**
* Sets maximum amount of time to wait for a socket connection establishment
* with an individual server.
*
* Zero means infinite timeout.
*
* @param timeout Timeout value, ms.
* @return {@code this}.
* @throws IllegalArgumentException If timeout is negative.
*/
public RoundRobinSocketProviderImpl setTimeout(int timeout) {
if (timeout < 0)
throw new IllegalArgumentException("timeout is negative.");

this.timeout = timeout;

return this;
}

/**
* @return Maximum amount of time to wait for a socket connection establishment
* with an individual server.
*/
public int getTimeout() {
return timeout;
}

/**
* Sets maximum amount of reconnect attempts to be made before an exception is raised.
* The retry count is maintained by a {@link #get(int, Throwable)} caller
* when a socket level connection was established.
*
* Negative value means unlimited.
*
* @param retriesLimit Limit of retries to use.
* @return {@code this}.
* @return Number of configured addresses.
*/
public RoundRobinSocketProviderImpl setRetriesLimit(int retriesLimit) {
this.retriesLimit = retriesLimit;

return this;
protected int getAddressCount() {
return sockAddrs.length;
}

/**
* @return Maximum reconnect attempts to make before raising exception.
* {@inheritDoc}
*/
public int getRetriesLimit() {
return retriesLimit;
}

/** {@inheritDoc} */
@Override
public SocketChannel get(int retryNumber, Throwable lastError) {
if (areRetriesExhausted(retryNumber)) {
throw new CommunicationException("Connection retries exceeded.", lastError);
}
int attempts = getAddressCount();
long deadline = System.currentTimeMillis() + timeout * attempts;
while (!Thread.currentThread().isInterrupted()) {
SocketChannel channel = null;
try {
channel = SocketChannel.open();
InetSocketAddress addr = getNextSocketAddress();
channel.socket().connect(addr, timeout);
return channel;
} catch (IOException e) {
if (channel != null) {
try {
channel.close();
} catch (IOException ignored) {
// No-op.
}
}
long now = System.currentTimeMillis();
if (deadline <= now) {
throw new CommunicationException("Connection time out.", e);
}
if (--attempts == 0) {
// Tried all addresses without any lack, but still have time.
attempts = getAddressCount();
try {
Thread.sleep((deadline - now) / attempts);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
}
}
}
throw new CommunicationException("Thread interrupted.", new InterruptedException());
}

/**
* @return Number of configured addresses.
*/
protected int getAddressCount() {
return sockAddrs.length;
public int getRetriesLimit() {
return super.getRetriesLimit() * getAddressCount();
}

/**
* @return Socket address to use for the next reconnection attempt.
*/
protected InetSocketAddress getNextSocketAddress() {
protected InetSocketAddress getSocketAddress() {
InetSocketAddress res = sockAddrs[pos];
pos = (pos + 1) % sockAddrs.length;
return res;
Expand All @@ -166,17 +77,4 @@ protected InetSocketAddress parseAddress(String addr) {
int port = (idx < 0) ? 3301 : Integer.parseInt(addr.substring(idx + 1));
return new InetSocketAddress(host, port);
}

/**
* Provides a decision on whether retries limit is hit.
*
* @param retries Current count of retries.
* @return {@code true} if retries are exhausted.
*/
private boolean areRetriesExhausted(int retries) {
int limit = getRetriesLimit();
if (limit < 0)
return false;
return retries >= limit;
}
}