Skip to content

Commit 6e93879

Browse files
committed
Propagate a client thumbstone when a client creation fails
Closes: #30
1 parent 08e37a2 commit 6e93879

File tree

5 files changed

+242
-29
lines changed

5 files changed

+242
-29
lines changed

src/main/java/org/tarantool/TarantoolClientImpl.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@
2828

2929
public class TarantoolClientImpl extends TarantoolBase<Future<?>> implements TarantoolClient {
3030

31-
public static final CommunicationException NOT_INIT_EXCEPTION
32-
= new CommunicationException("Not connected, initializing connection");
33-
3431
protected TarantoolClientConfig config;
3532
protected long operationTimeout;
3633

@@ -101,7 +98,6 @@ public TarantoolClientImpl(SocketChannelProvider socketProvider, TarantoolClient
10198
}
10299

103100
private void initClient(SocketChannelProvider socketProvider, TarantoolClientConfig config) {
104-
this.thumbstone = NOT_INIT_EXCEPTION;
105101
this.config = config;
106102
this.initialRequestSize = config.defaultRequestSize;
107103
this.operationTimeout = config.operationExpiryTimeMillis;
@@ -130,8 +126,8 @@ private void startConnector(long initTimeoutMillis) {
130126
CommunicationException e = new CommunicationException(
131127
initTimeoutMillis +
132128
"ms is exceeded when waiting for client initialization. " +
133-
"You could configure init timeout in TarantoolConfig"
134-
);
129+
"You could configure init timeout in TarantoolConfig",
130+
thumbstone);
135131

136132
close(e);
137133
throw e;
@@ -147,7 +143,7 @@ protected void reconnect(Throwable lastError) {
147143
int retryNumber = 0;
148144
while (!Thread.currentThread().isInterrupted()) {
149145
try {
150-
channel = socketProvider.get(retryNumber++, lastError == NOT_INIT_EXCEPTION ? null : lastError);
146+
channel = socketProvider.get(retryNumber++, lastError);
151147
} catch (Exception e) {
152148
closeChannel(channel);
153149
lastError = e;

src/main/java/org/tarantool/protocol/ReadableViaSelectorChannel.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@ public int read(ByteBuffer buffer) throws IOException {
3737
count = n = channel.read(buffer);
3838

3939
if (n < 0) {
40-
throw new CommunicationException("Channel read failed " + n);
40+
throw new CommunicationException("Channel read failed " + formatReadBytes(n));
4141
}
4242

4343
while (buffer.remaining() > 0) {
4444
selector.select();
4545
n = channel.read(buffer);
4646
if (n < 0) {
47-
throw new CommunicationException("Channel read failed: " + n);
47+
throw new CommunicationException("Channel read failed: " + formatReadBytes(n));
4848
}
4949
count += n;
5050
}
@@ -61,4 +61,15 @@ public void close() throws IOException {
6161
selector.close();
6262
channel.close();
6363
}
64+
65+
/**
66+
* Formats the bytes count to a human readable message.
67+
*
68+
* @param bytes number of bytes
69+
*
70+
* @return formatted message
71+
*/
72+
private String formatReadBytes(int bytes) {
73+
return bytes < 0 ? "EOF" : bytes + " bytes";
74+
}
6475
}

src/test/java/org/tarantool/ClientReconnectClusterIT.java

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,11 @@
11
package org.tarantool;
22

3-
import static org.junit.jupiter.api.Assertions.assertEquals;
4-
import static org.junit.jupiter.api.Assertions.assertThrows;
5-
import static org.tarantool.TestUtils.makeDefaultClusterClientConfig;
6-
import static org.tarantool.TestUtils.makeDiscoveryFunction;
7-
8-
import org.tarantool.cluster.ClusterServiceStoredFunctionDiscovererIT;
9-
103
import org.junit.jupiter.api.AfterEach;
114
import org.junit.jupiter.api.BeforeAll;
125
import org.junit.jupiter.api.BeforeEach;
136
import org.junit.jupiter.api.DisplayName;
147
import org.junit.jupiter.api.Test;
8+
import org.tarantool.cluster.ClusterServiceStoredFunctionDiscovererIT;
159

1610
import java.time.Duration;
1711
import java.util.Arrays;
@@ -26,6 +20,13 @@
2620
import java.util.stream.Collectors;
2721
import java.util.stream.Stream;
2822

23+
import static org.junit.jupiter.api.Assertions.assertEquals;
24+
import static org.junit.jupiter.api.Assertions.assertThrows;
25+
import static org.junit.jupiter.api.Assertions.assertTrue;
26+
import static org.tarantool.TestUtils.findCause;
27+
import static org.tarantool.TestUtils.makeDefaultClusterClientConfig;
28+
import static org.tarantool.TestUtils.makeDiscoveryFunction;
29+
2930
@DisplayName("A cluster client")
3031
public class ClientReconnectClusterIT {
3132

@@ -393,6 +394,53 @@ void testDelayFunctionResultFetch() {
393394
expectDisconnected(client, spaceId, pkId);
394395
}
395396

397+
@Test
398+
void testRoundRobinSocketProviderRefusedByFakeReason() {
399+
stopInstancesAndAwait(SRV1);
400+
stopInstancesAndAwait(SRV2);
401+
stopInstancesAndAwait(SRV3);
402+
403+
RuntimeException error = new RuntimeException("Fake error");
404+
TarantoolClusterClientConfig config = makeDefaultClusterClientConfig();
405+
config.initTimeoutMillis = 1000;
406+
Throwable exception = assertThrows(
407+
CommunicationException.class,
408+
() -> {
409+
new TarantoolClusterClient(
410+
config,
411+
TestUtils.wrapByErroredProvider(new RoundRobinSocketProviderImpl(
412+
"localhost:" + PORTS[0],
413+
"localhost:" + PORTS[1],
414+
"localhost:" + PORTS[2]
415+
), error)
416+
);
417+
}
418+
);
419+
assertTrue(findCause(exception, error));
420+
}
421+
422+
@Test
423+
void testRoundRobinSocketProviderRefusedAfterConnect() {
424+
final TarantoolClientImpl client = makeClusterClient(
425+
"localhost:" + PORTS[0],
426+
"localhost:" + PORTS[1],
427+
"localhost:" + PORTS[2]
428+
);
429+
430+
client.ping();
431+
stopInstancesAndAwait(SRV1);
432+
433+
client.ping();
434+
stopInstancesAndAwait(SRV2);
435+
436+
client.ping();
437+
stopInstancesAndAwait(SRV3);
438+
439+
CommunicationException exception = assertThrows(CommunicationException.class, client::ping);
440+
Throwable origin = exception.getCause();
441+
assertEquals(origin, client.getThumbstone());
442+
}
443+
396444
private void tryAwait(CyclicBarrier barrier) {
397445
try {
398446
barrier.await(6000, TimeUnit.MILLISECONDS);

src/test/java/org/tarantool/ClientReconnectIT.java

Lines changed: 106 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,12 @@
11
package org.tarantool;
22

3-
import static org.junit.jupiter.api.Assertions.assertEquals;
4-
import static org.junit.jupiter.api.Assertions.assertFalse;
5-
import static org.junit.jupiter.api.Assertions.assertNotNull;
6-
import static org.junit.jupiter.api.Assertions.assertThrows;
7-
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
8-
import static org.junit.jupiter.api.Assertions.assertTrue;
9-
import static org.junit.jupiter.api.Assertions.fail;
10-
import static org.tarantool.TestUtils.makeDefaultClientConfig;
11-
import static org.tarantool.TestUtils.makeTestClient;
12-
133
import org.junit.jupiter.api.AfterEach;
144
import org.junit.jupiter.api.BeforeAll;
155
import org.junit.jupiter.api.BeforeEach;
166
import org.junit.jupiter.api.Test;
177
import org.junit.jupiter.api.function.Executable;
188

9+
import java.net.ConnectException;
1910
import java.nio.channels.SocketChannel;
2011
import java.time.Duration;
2112
import java.util.Collections;
@@ -27,9 +18,21 @@
2718
import java.util.concurrent.TimeUnit;
2819
import java.util.concurrent.atomic.AtomicBoolean;
2920
import java.util.concurrent.atomic.AtomicInteger;
21+
import java.util.concurrent.atomic.AtomicReference;
3022
import java.util.concurrent.atomic.AtomicReferenceArray;
3123
import java.util.concurrent.locks.LockSupport;
3224

25+
import static org.junit.jupiter.api.Assertions.assertEquals;
26+
import static org.junit.jupiter.api.Assertions.assertFalse;
27+
import static org.junit.jupiter.api.Assertions.assertNotNull;
28+
import static org.junit.jupiter.api.Assertions.assertThrows;
29+
import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively;
30+
import static org.junit.jupiter.api.Assertions.assertTrue;
31+
import static org.junit.jupiter.api.Assertions.fail;
32+
import static org.tarantool.TestUtils.findCause;
33+
import static org.tarantool.TestUtils.makeDefaultClientConfig;
34+
import static org.tarantool.TestUtils.makeTestClient;
35+
3336
public class ClientReconnectIT {
3437

3538
private static final int RESTART_TIMEOUT = 2000;
@@ -377,7 +380,7 @@ public void run() {
377380
/**
378381
* Verify that we don't exceed a file descriptor limit (and so likely don't
379382
* leak file descriptors) when trying to connect to an existing node with
380-
* wrong authentification credentials.
383+
* wrong authentication credentials.
381384
* <p>
382385
* The test sets SO_LINGER to 0 for outgoing connections to avoid producing
383386
* many TIME_WAIT sockets, because an available port range can be
@@ -412,13 +415,103 @@ public void testReconnectWrongAuth() throws Exception {
412415
client.close();
413416
}
414417

415-
private TestSocketChannelProvider makeZeroLingerProvider() {
418+
@Test
419+
void testFirstConnectionRefused() {
420+
RuntimeException error = new RuntimeException("Fake error");
421+
TarantoolClientConfig config = makeDefaultClientConfig();
422+
config.initTimeoutMillis = 100;
423+
Throwable exception = assertThrows(
424+
CommunicationException.class,
425+
() -> new TarantoolClientImpl(makeErroredProvider(error), config)
426+
);
427+
assertTrue(findCause(exception, error));
428+
}
429+
430+
@Test
431+
void testConnectionRefusedAfterConnect() {
432+
TarantoolClientImpl client = new TarantoolClientImpl(makeErroredProvider(null), makeDefaultClientConfig());
433+
client.ping();
434+
435+
testHelper.stopInstance();
436+
CommunicationException exception = assertThrows(CommunicationException.class, client::ping);
437+
438+
Throwable origin = exception.getCause();
439+
assertEquals(origin, client.getThumbstone());
440+
441+
testHelper.startInstance();
442+
}
443+
444+
@Test
445+
void testSocketProviderRefusedByFakeReason() {
446+
TarantoolClientConfig config = makeDefaultClientConfig();
447+
RuntimeException error = new RuntimeException("Fake error");
448+
config.initTimeoutMillis = 1000;
449+
450+
SingleSocketChannelProviderImpl socketProvider = new SingleSocketChannelProviderImpl("localhost:3301");
451+
452+
testHelper.stopInstance();
453+
Throwable exception = assertThrows(
454+
CommunicationException.class,
455+
() -> new TarantoolClientImpl(TestUtils.wrapByErroredProvider(socketProvider, error), config)
456+
);
457+
testHelper.startInstance();
458+
assertTrue(findCause(exception, error));
459+
}
460+
461+
@Test
462+
void testSingleSocketProviderRefused() {
463+
testHelper.stopInstance();
464+
465+
TarantoolClientConfig config = makeDefaultClientConfig();
466+
config.initTimeoutMillis = 1000;
467+
468+
SingleSocketChannelProviderImpl socketProvider = new SingleSocketChannelProviderImpl("localhost:3301");
469+
470+
Throwable exception = assertThrows(
471+
CommunicationException.class,
472+
() -> new TarantoolClientImpl(socketProvider, config)
473+
);
474+
testHelper.startInstance();
475+
assertTrue(findCause(exception, ConnectException.class));
476+
}
477+
478+
@Test
479+
void testSingleSocketProviderRefusedAfterConnect() {
480+
TarantoolClientImpl client = new TarantoolClientImpl(socketChannelProvider, makeDefaultClientConfig());
481+
482+
client.ping();
483+
testHelper.stopInstance();
484+
485+
CommunicationException exception = assertThrows(CommunicationException.class, client::ping);
486+
Throwable origin = exception.getCause();
487+
assertEquals(origin, client.getThumbstone());
488+
489+
testHelper.startInstance();
490+
}
491+
492+
private SocketChannelProvider makeZeroLingerProvider() {
416493
return new TestSocketChannelProvider(
417494
TarantoolTestHelper.HOST, TarantoolTestHelper.PORT, RESTART_TIMEOUT
418495
).setSoLinger(0);
419496
}
420497

421-
TarantoolClient makeClient(SocketChannelProvider provider) {
498+
private SocketChannelProvider makeErroredProvider(RuntimeException error) {
499+
return new SocketChannelProvider() {
500+
private final SocketChannelProvider delegate = makeZeroLingerProvider();
501+
private AtomicReference<RuntimeException> errorReference = new AtomicReference<>(error);
502+
503+
@Override
504+
public SocketChannel get(int retryNumber, Throwable lastError) {
505+
RuntimeException rawError = errorReference.get();
506+
if (rawError != null) {
507+
throw rawError;
508+
}
509+
return delegate.get(retryNumber, lastError);
510+
}
511+
};
512+
}
513+
514+
private TarantoolClient makeClient(SocketChannelProvider provider) {
422515
return new TarantoolClientImpl(provider, makeDefaultClientConfig());
423516
}
424517

src/test/java/org/tarantool/TestUtils.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.net.InetSocketAddress;
66
import java.net.Socket;
77
import java.net.SocketAddress;
8+
import java.nio.channels.SocketChannel;
89
import java.util.Collection;
910
import java.util.HashMap;
1011
import java.util.List;
@@ -290,4 +291,68 @@ public static TarantoolClientConfig makeDefaultClientConfig() {
290291
return config;
291292
}
292293

294+
/**
295+
* Wraps a socket channel provider
296+
* {@link SocketChannelProvider#get(int, Throwable)} method.
297+
* When an error is raised the wrapper substitutes
298+
* this error by the predefined one. The original value is
299+
* still accessible as a cause of the injected error.
300+
*
301+
* @param provider provider to be wrapped
302+
* @param error error to be thrown instead of original
303+
*
304+
* @return wrapped provider
305+
*/
306+
public static SocketChannelProvider wrapByErroredProvider(SocketChannelProvider provider, RuntimeException error) {
307+
return new SocketChannelProvider() {
308+
private final SocketChannelProvider delegate = provider;
309+
310+
@Override
311+
public SocketChannel get(int retryNumber, Throwable lastError) {
312+
try {
313+
return delegate.get(retryNumber, lastError);
314+
} catch (Exception e) {
315+
error.initCause(e);
316+
throw error;
317+
}
318+
}
319+
};
320+
}
321+
322+
/**
323+
* Searches recursively the given cause for a root error.
324+
*
325+
* @param error root error
326+
* @param cause cause to be found
327+
*
328+
* @return {@literal true} if cause is found within a cause chain
329+
*/
330+
public static boolean findCause(Throwable error, Throwable cause) {
331+
while (error.getCause() != null) {
332+
error = error.getCause();
333+
if (cause.equals(error)) {
334+
return true;
335+
}
336+
}
337+
return false;
338+
}
339+
340+
/**
341+
* Searches recursively the first cause being the given class type.
342+
*
343+
* @param error root error
344+
* @param type cause class to be found
345+
*
346+
* @return {@literal true} if cause is found within a cause chain
347+
*/
348+
public static boolean findCause(Throwable error, Class<?> type) {
349+
while (error.getCause() != null) {
350+
error = error.getCause();
351+
if (type == error.getClass()) {
352+
return true;
353+
}
354+
}
355+
return false;
356+
}
357+
293358
}

0 commit comments

Comments
 (0)