Skip to content

Commit b4f1d6d

Browse files
committed
Propagate a client thumbstone when a client creation fails
Closes: #30
1 parent 2b32e80 commit b4f1d6d

File tree

6 files changed

+251
-16
lines changed

6 files changed

+251
-16
lines changed

src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ protected InetSocketAddress getLastObtainedAddress() {
140140
@Override
141141
protected SocketChannel makeAttempt(int retryNumber, Throwable lastError) throws IOException {
142142
if (retryNumber > getAddressCount()) {
143-
throwFatalError("No more connection addresses are left.");
143+
throwFatalError("No more connection addresses are left.", lastError);
144144
}
145145

146146
int retriesLimit = getRetriesLimit();
@@ -165,7 +165,7 @@ protected SocketChannel makeAttempt(int retryNumber, Throwable lastError) throws
165165
@Override
166166
public void setRetriesLimit(int retriesLimit) {
167167
if (retriesLimit == 0) {
168-
throwFatalError("Retries count should be at least 1 or more");
168+
throwFatalError("Retries count should be at least 1 or more", null);
169169
}
170170
super.setRetriesLimit(retriesLimit);
171171
}
@@ -212,8 +212,8 @@ public void refreshAddresses(Collection<String> addresses) {
212212
updateAddressList(addresses);
213213
}
214214

215-
private void throwFatalError(String message) {
216-
throw new CommunicationException(message);
215+
private void throwFatalError(String message, Throwable lastError) {
216+
throw new CommunicationException(message, lastError);
217217
}
218218

219219
}

src/main/java/org/tarantool/TarantoolClientImpl.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@
2828

2929
public class TarantoolClientImpl extends TarantoolBase<Future<?>> implements TarantoolClient {
3030

31-
public static final CommunicationException NOT_INIT_EXCEPTION
32-
= new CommunicationException("Not connected, initializing connection");
33-
3431
protected TarantoolClientConfig config;
3532
protected long operationTimeout;
3633

@@ -101,7 +98,6 @@ public TarantoolClientImpl(SocketChannelProvider socketProvider, TarantoolClient
10198
}
10299

103100
private void initClient(SocketChannelProvider socketProvider, TarantoolClientConfig config) {
104-
this.thumbstone = NOT_INIT_EXCEPTION;
105101
this.config = config;
106102
this.initialRequestSize = config.defaultRequestSize;
107103
this.operationTimeout = config.operationExpiryTimeMillis;
@@ -130,8 +126,8 @@ private void startConnector(long initTimeoutMillis) {
130126
CommunicationException e = new CommunicationException(
131127
initTimeoutMillis +
132128
"ms is exceeded when waiting for client initialization. " +
133-
"You could configure init timeout in TarantoolConfig"
134-
);
129+
"You could configure init timeout in TarantoolConfig",
130+
thumbstone);
135131

136132
close(e);
137133
throw e;
@@ -147,7 +143,7 @@ protected void reconnect(Throwable lastError) {
147143
int retryNumber = 0;
148144
while (!Thread.currentThread().isInterrupted()) {
149145
try {
150-
channel = socketProvider.get(retryNumber++, lastError == NOT_INIT_EXCEPTION ? null : lastError);
146+
channel = socketProvider.get(retryNumber++, lastError);
151147
} catch (Exception e) {
152148
closeChannel(channel);
153149
lastError = e;

src/main/java/org/tarantool/protocol/ReadableViaSelectorChannel.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@ public int read(ByteBuffer buffer) throws IOException {
3737
count = n = channel.read(buffer);
3838

3939
if (n < 0) {
40-
throw new CommunicationException("Channel read failed " + n);
40+
throw new CommunicationException("Channel read failed " + formatReadBytes(n));
4141
}
4242

4343
while (buffer.remaining() > 0) {
4444
selector.select();
4545
n = channel.read(buffer);
4646
if (n < 0) {
47-
throw new CommunicationException("Channel read failed: " + n);
47+
throw new CommunicationException("Channel read failed: " + formatReadBytes(n));
4848
}
4949
count += n;
5050
}
@@ -61,4 +61,15 @@ public void close() throws IOException {
6161
selector.close();
6262
channel.close();
6363
}
64+
65+
/**
66+
* Formats the bytes count to a human readable message.
67+
*
68+
* @param bytes number of bytes
69+
*
70+
* @return formatted message
71+
*/
72+
private String formatReadBytes(int bytes) {
73+
return bytes < 0 ? "EOF" : bytes + " bytes";
74+
}
6475
}

src/test/java/org/tarantool/ClientReconnectClusterIT.java

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import static org.junit.jupiter.api.Assertions.assertEquals;
44
import static org.junit.jupiter.api.Assertions.assertThrows;
5+
import static org.junit.jupiter.api.Assertions.assertTrue;
6+
import static org.tarantool.TestUtils.findCause;
57
import static org.tarantool.TestUtils.makeDefaultClusterClientConfig;
68
import static org.tarantool.TestUtils.makeDiscoveryFunction;
79

@@ -13,6 +15,7 @@
1315
import org.junit.jupiter.api.DisplayName;
1416
import org.junit.jupiter.api.Test;
1517

18+
import java.net.ConnectException;
1619
import java.time.Duration;
1720
import java.util.Arrays;
1821
import java.util.Collections;
@@ -393,6 +396,73 @@ void testDelayFunctionResultFetch() {
393396
expectDisconnected(client, spaceId, pkId);
394397
}
395398

399+
@Test
400+
void testRoundRobinSocketProviderRefusedByFakeReason() {
401+
stopInstancesAndAwait(SRV1);
402+
stopInstancesAndAwait(SRV2);
403+
stopInstancesAndAwait(SRV3);
404+
405+
RuntimeException error = new RuntimeException("Fake error");
406+
TarantoolClusterClientConfig config = makeDefaultClusterClientConfig();
407+
config.initTimeoutMillis = 1000;
408+
Throwable exception = assertThrows(
409+
CommunicationException.class,
410+
() -> {
411+
new TarantoolClusterClient(
412+
config,
413+
TestUtils.wrapByErroredProvider(new RoundRobinSocketProviderImpl(
414+
"localhost:" + PORTS[0],
415+
"localhost:" + PORTS[1],
416+
"localhost:" + PORTS[2]
417+
), error)
418+
);
419+
}
420+
);
421+
assertTrue(findCause(exception, error));
422+
}
423+
424+
@Test
425+
void testRoundRobinSocketProviderRefused() {
426+
stopInstancesAndAwait(SRV1);
427+
stopInstancesAndAwait(SRV2);
428+
stopInstancesAndAwait(SRV3);
429+
430+
TarantoolClusterClientConfig config = makeDefaultClusterClientConfig();
431+
config.initTimeoutMillis = 1000;
432+
Throwable exception = assertThrows(
433+
CommunicationException.class,
434+
() -> {
435+
new TarantoolClusterClient(
436+
config,
437+
new RoundRobinSocketProviderImpl("localhost:" + PORTS[0])
438+
);
439+
}
440+
);
441+
assertTrue(findCause(exception, ConnectException.class));
442+
}
443+
444+
@Test
445+
void testRoundRobinSocketProviderRefusedAfterConnect() {
446+
final TarantoolClientImpl client = makeClusterClient(
447+
"localhost:" + PORTS[0],
448+
"localhost:" + PORTS[1],
449+
"localhost:" + PORTS[2]
450+
);
451+
452+
client.ping();
453+
stopInstancesAndAwait(SRV1);
454+
455+
client.ping();
456+
stopInstancesAndAwait(SRV2);
457+
458+
client.ping();
459+
stopInstancesAndAwait(SRV3);
460+
461+
CommunicationException exception = assertThrows(CommunicationException.class, client::ping);
462+
Throwable origin = exception.getCause();
463+
assertEquals(origin, client.getThumbstone());
464+
}
465+
396466
private void tryAwait(CyclicBarrier barrier) {
397467
try {
398468
barrier.await(6000, TimeUnit.MILLISECONDS);

src/test/java/org/tarantool/ClientReconnectIT.java

Lines changed: 96 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
88
import static org.junit.jupiter.api.Assertions.assertTrue;
99
import static org.junit.jupiter.api.Assertions.fail;
10+
import static org.tarantool.TestUtils.findCause;
1011
import static org.tarantool.TestUtils.makeDefaultClientConfig;
1112
import static org.tarantool.TestUtils.makeTestClient;
1213

@@ -16,6 +17,7 @@
1617
import org.junit.jupiter.api.Test;
1718
import org.junit.jupiter.api.function.Executable;
1819

20+
import java.net.ConnectException;
1921
import java.nio.channels.SocketChannel;
2022
import java.time.Duration;
2123
import java.util.Collections;
@@ -27,6 +29,7 @@
2729
import java.util.concurrent.TimeUnit;
2830
import java.util.concurrent.atomic.AtomicBoolean;
2931
import java.util.concurrent.atomic.AtomicInteger;
32+
import java.util.concurrent.atomic.AtomicReference;
3033
import java.util.concurrent.atomic.AtomicReferenceArray;
3134
import java.util.concurrent.locks.LockSupport;
3235

@@ -377,7 +380,7 @@ public void run() {
377380
/**
378381
* Verify that we don't exceed a file descriptor limit (and so likely don't
379382
* leak file descriptors) when trying to connect to an existing node with
380-
* wrong authentification credentials.
383+
* wrong authentication credentials.
381384
* <p>
382385
* The test sets SO_LINGER to 0 for outgoing connections to avoid producing
383386
* many TIME_WAIT sockets, because an available port range can be
@@ -412,13 +415,103 @@ public void testReconnectWrongAuth() throws Exception {
412415
client.close();
413416
}
414417

415-
private TestSocketChannelProvider makeZeroLingerProvider() {
418+
@Test
419+
void testFirstConnectionRefused() {
420+
RuntimeException error = new RuntimeException("Fake error");
421+
TarantoolClientConfig config = makeDefaultClientConfig();
422+
config.initTimeoutMillis = 100;
423+
Throwable exception = assertThrows(
424+
CommunicationException.class,
425+
() -> new TarantoolClientImpl(makeErroredProvider(error), config)
426+
);
427+
assertTrue(findCause(exception, error));
428+
}
429+
430+
@Test
431+
void testConnectionRefusedAfterConnect() {
432+
TarantoolClientImpl client = new TarantoolClientImpl(makeErroredProvider(null), makeDefaultClientConfig());
433+
client.ping();
434+
435+
testHelper.stopInstance();
436+
CommunicationException exception = assertThrows(CommunicationException.class, client::ping);
437+
438+
Throwable origin = exception.getCause();
439+
assertEquals(origin, client.getThumbstone());
440+
441+
testHelper.startInstance();
442+
}
443+
444+
@Test
445+
void testSocketProviderRefusedByFakeReason() {
446+
TarantoolClientConfig config = makeDefaultClientConfig();
447+
RuntimeException error = new RuntimeException("Fake error");
448+
config.initTimeoutMillis = 1000;
449+
450+
SingleSocketChannelProviderImpl socketProvider = new SingleSocketChannelProviderImpl("localhost:3301");
451+
452+
testHelper.stopInstance();
453+
Throwable exception = assertThrows(
454+
CommunicationException.class,
455+
() -> new TarantoolClientImpl(TestUtils.wrapByErroredProvider(socketProvider, error), config)
456+
);
457+
testHelper.startInstance();
458+
assertTrue(findCause(exception, error));
459+
}
460+
461+
@Test
462+
void testSingleSocketProviderRefused() {
463+
testHelper.stopInstance();
464+
465+
TarantoolClientConfig config = makeDefaultClientConfig();
466+
config.initTimeoutMillis = 1000;
467+
468+
SingleSocketChannelProviderImpl socketProvider = new SingleSocketChannelProviderImpl("localhost:3301");
469+
470+
Throwable exception = assertThrows(
471+
CommunicationException.class,
472+
() -> new TarantoolClientImpl(socketProvider, config)
473+
);
474+
testHelper.startInstance();
475+
assertTrue(findCause(exception, ConnectException.class));
476+
}
477+
478+
@Test
479+
void testSingleSocketProviderRefusedAfterConnect() {
480+
TarantoolClientImpl client = new TarantoolClientImpl(socketChannelProvider, makeDefaultClientConfig());
481+
482+
client.ping();
483+
testHelper.stopInstance();
484+
485+
CommunicationException exception = assertThrows(CommunicationException.class, client::ping);
486+
Throwable origin = exception.getCause();
487+
assertEquals(origin, client.getThumbstone());
488+
489+
testHelper.startInstance();
490+
}
491+
492+
private SocketChannelProvider makeZeroLingerProvider() {
416493
return new TestSocketChannelProvider(
417494
TarantoolTestHelper.HOST, TarantoolTestHelper.PORT, RESTART_TIMEOUT
418495
).setSoLinger(0);
419496
}
420497

421-
TarantoolClient makeClient(SocketChannelProvider provider) {
498+
private SocketChannelProvider makeErroredProvider(RuntimeException error) {
499+
return new SocketChannelProvider() {
500+
private final SocketChannelProvider delegate = makeZeroLingerProvider();
501+
private AtomicReference<RuntimeException> errorReference = new AtomicReference<>(error);
502+
503+
@Override
504+
public SocketChannel get(int retryNumber, Throwable lastError) {
505+
RuntimeException rawError = errorReference.get();
506+
if (rawError != null) {
507+
throw rawError;
508+
}
509+
return delegate.get(retryNumber, lastError);
510+
}
511+
};
512+
}
513+
514+
private TarantoolClient makeClient(SocketChannelProvider provider) {
422515
return new TarantoolClientImpl(provider, makeDefaultClientConfig());
423516
}
424517

0 commit comments

Comments
 (0)