Skip to content

Commit ff7bb60

Browse files
committed
Logging of client reconnects and transient errors
Add logging waring messages when client tries establishing connection to Tarantool instance. All the request that cannot be processed because of some transient errors are also traced. Disable JUL logging for test classes by default. Follows on: #194
1 parent f8ee727 commit ff7bb60

9 files changed

+151
-32
lines changed

pom.xml

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23
<modelVersion>4.0.0</modelVersion>
34

45
<groupId>org.tarantool</groupId>
@@ -12,6 +13,7 @@
1213
<mockito.version>1.10.19</mockito.version>
1314
<!-- logger dependency versions -->
1415
<sfl4j.version>1.7.26</sfl4j.version>
16+
<project.testResources>${project.basedir}/src/test/resources</project.testResources>
1517
</properties>
1618
<name>Tarantool Connector for Java</name>
1719
<url>https://github.com/tarantool/tarantool-java</url>
@@ -73,17 +75,24 @@
7375
<artifactId>templating-maven-plugin</artifactId>
7476
<version>1.0.0</version>
7577
<executions>
76-
<execution>
77-
<id>filter-src</id>
78-
<goals>
79-
<goal>filter-sources</goal>
80-
</goals>
81-
</execution>
78+
<execution>
79+
<id>filter-src</id>
80+
<goals>
81+
<goal>filter-sources</goal>
82+
</goals>
83+
</execution>
8284
</executions>
8385
</plugin>
8486
<plugin>
8587
<artifactId>maven-surefire-plugin</artifactId>
8688
<version>3.0.0-M3</version>
89+
<configuration>
90+
<systemPropertyVariables>
91+
<java.util.logging.config.file>
92+
${project.testResources}/jul-silent.properties
93+
</java.util.logging.config.file>
94+
</systemPropertyVariables>
95+
</configuration>
8796
</plugin>
8897
<plugin>
8998
<groupId>org.apache.maven.plugins</groupId>
@@ -99,6 +108,12 @@
99108
</executions>
100109
<configuration>
101110
<trimStackTrace>false</trimStackTrace>
111+
<systemPropertyVariables>
112+
<java.util.logging.config.file>
113+
${project.testResources}/jul-silent.properties
114+
</java.util.logging.config.file>
115+
</systemPropertyVariables>
116+
102117
</configuration>
103118
</plugin>
104119
<plugin>

src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.concurrent.locks.Lock;
1414
import java.util.concurrent.locks.ReadWriteLock;
1515
import java.util.concurrent.locks.ReentrantReadWriteLock;
16+
import java.util.function.Supplier;
1617
import java.util.stream.Collectors;
1718

1819
/**
@@ -93,13 +94,7 @@ private void updateAddressList(Collection<String> addresses) {
9394
* @return socket addresses
9495
*/
9596
public List<SocketAddress> getAddresses() {
96-
Lock readLock = addressListLock.readLock();
97-
readLock.lock();
98-
try {
99-
return Collections.unmodifiableList(this.socketAddresses);
100-
} finally {
101-
readLock.unlock();
102-
}
97+
return readGuard(() -> Collections.unmodifiableList(this.socketAddresses));
10398
}
10499

105100
/**
@@ -109,14 +104,10 @@ public List<SocketAddress> getAddresses() {
109104
* if {@link #currentPosition} has {@link #UNSET_POSITION} value
110105
*/
111106
protected InetSocketAddress getLastObtainedAddress() {
112-
Lock readLock = addressListLock.readLock();
113-
readLock.lock();
114-
try {
107+
return readGuard(() -> {
115108
int index = currentPosition.get();
116109
return index != UNSET_POSITION ? socketAddresses.get(index) : null;
117-
} finally {
118-
readLock.unlock();
119-
}
110+
});
120111
}
121112

122113
/**
@@ -176,13 +167,7 @@ public void setRetriesLimit(int retriesLimit) {
176167
* @return Number of configured addresses.
177168
*/
178169
protected int getAddressCount() {
179-
Lock readLock = addressListLock.readLock();
180-
readLock.lock();
181-
try {
182-
return socketAddresses.size();
183-
} finally {
184-
readLock.unlock();
185-
}
170+
return readGuard(socketAddresses::size);
186171
}
187172

188173
/**
@@ -191,11 +176,25 @@ protected int getAddressCount() {
191176
* @return Socket address to use for the next reconnection attempt
192177
*/
193178
protected InetSocketAddress getNextSocketAddress() {
179+
return readGuard(() -> {
180+
int position = currentPosition.updateAndGet(i -> (i + 1) % socketAddresses.size());
181+
return socketAddresses.get(position);
182+
});
183+
}
184+
185+
@Override
186+
public SocketAddress getAddress() {
187+
return readGuard(() -> {
188+
int position = (currentPosition.get() + 1) % socketAddresses.size();
189+
return socketAddresses.get(position);
190+
});
191+
}
192+
193+
private <R> R readGuard(Supplier<R> supplier) {
194194
Lock readLock = addressListLock.readLock();
195195
readLock.lock();
196196
try {
197-
int position = currentPosition.updateAndGet(i -> (i + 1) % socketAddresses.size());
198-
return socketAddresses.get(position);
197+
return supplier.get();
199198
} finally {
200199
readLock.unlock();
201200
}

src/main/java/org/tarantool/SingleSocketChannelProviderImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public SingleSocketChannelProviderImpl(String address) {
2424
setAddress(address);
2525
}
2626

27+
@Override
2728
public SocketAddress getAddress() {
2829
return address;
2930
}

src/main/java/org/tarantool/SocketChannelProvider.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,15 @@ public interface SocketChannelProvider {
1919
*/
2020
SocketChannel get(int retryNumber, Throwable lastError);
2121

22+
/**
23+
* Gets an address that will be used when
24+
* {@link #get(int, Throwable)} is invoked.
25+
*
26+
* @return effective address or {@literal null}
27+
* if it cannot be calculated in advance.
28+
*/
29+
default SocketAddress getAddress() {
30+
return null;
31+
}
32+
2233
}

src/main/java/org/tarantool/TarantoolClientImpl.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
11
package org.tarantool;
22

3+
import org.tarantool.logging.Logger;
4+
import org.tarantool.logging.LoggerFactory;
35
import org.tarantool.protocol.ProtoUtils;
46
import org.tarantool.protocol.ReadableViaSelectorChannel;
57
import org.tarantool.protocol.TarantoolGreeting;
68
import org.tarantool.protocol.TarantoolPacket;
9+
import org.tarantool.util.StringUtils;
710

811
import java.io.IOException;
12+
import java.net.SocketAddress;
913
import java.nio.ByteBuffer;
1014
import java.nio.channels.SocketChannel;
1115
import java.util.Iterator;
@@ -28,6 +32,8 @@
2832

2933
public class TarantoolClientImpl extends TarantoolBase<Future<?>> implements TarantoolClient {
3034

35+
private static final Logger LOGGER = LoggerFactory.getLogger(TarantoolClientImpl.class);
36+
3137
protected TarantoolClientConfig config;
3238
protected long operationTimeout;
3339

@@ -143,6 +149,15 @@ protected void reconnect(Throwable lastError) {
143149
int retryNumber = 0;
144150
while (!Thread.currentThread().isInterrupted()) {
145151
try {
152+
if (lastError != null) {
153+
LOGGER.warn(() -> {
154+
SocketAddress address = socketProvider.getAddress();
155+
return "Attempt to (re-)connect to Tarantool instance " +
156+
(StringUtils.isBlank(config.username) ? "" : config.username + ":*****@") +
157+
(address == null ? "unknown" : address);
158+
}, lastError
159+
);
160+
}
146161
channel = socketProvider.get(retryNumber++, lastError);
147162
} catch (Exception e) {
148163
closeChannel(channel);
@@ -710,6 +725,14 @@ public Object[] getArgs() {
710725
return args;
711726
}
712727

728+
@Override
729+
public String toString() {
730+
return "TarantoolOp{" +
731+
"id=" + id +
732+
", code=" + code +
733+
'}';
734+
}
735+
713736
/**
714737
* Missed in jdk8 CompletableFuture operator to limit execution
715738
* by time.

src/main/java/org/tarantool/TarantoolClusterClient.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import org.tarantool.cluster.TarantoolClusterDiscoverer;
44
import org.tarantool.cluster.TarantoolClusterStoredFunctionDiscoverer;
5+
import org.tarantool.logging.Logger;
6+
import org.tarantool.logging.LoggerFactory;
57
import org.tarantool.protocol.TarantoolPacket;
68
import org.tarantool.util.StringUtils;
79

@@ -27,6 +29,8 @@
2729
*/
2830
public class TarantoolClusterClient extends TarantoolClientImpl {
2931

32+
private static final Logger LOGGER = LoggerFactory.getLogger(TarantoolClusterClient.class);
33+
3034
/**
3135
* Need some execution context to retry writes.
3236
*/
@@ -153,6 +157,7 @@ protected boolean checkFail(TarantoolOp<?> future, Exception e) {
153157
} else {
154158
assert retries != null;
155159
retries.put(future.getId(), future);
160+
LOGGER.trace("Request {0} was delayed because of {1}", future, e);
156161
return false;
157162
}
158163
}
@@ -194,13 +199,18 @@ protected void onReconnect() {
194199
// First call is before the constructor finished. Skip it.
195200
return;
196201
}
197-
Collection<TarantoolOp<?>> futuresToRetry = new ArrayList<>(retries.values());
202+
Collection<TarantoolOp<?>> delayed = new ArrayList<>(retries.values());
203+
Collection<TarantoolOp<?>> reissued = new ArrayList<>(retries.size());
198204
retries.clear();
199-
for (final TarantoolOp<?> future : futuresToRetry) {
205+
for (final TarantoolOp<?> future : delayed) {
200206
if (!future.isDone()) {
201207
executor.execute(() -> registerOperation(future));
208+
reissued.add(future);
202209
}
203210
}
211+
for (final TarantoolOp<?> future : reissued) {
212+
LOGGER.trace("{0} was re-issued after reconnection", future);
213+
}
204214
}
205215

206216
@Override

src/test/java/org/tarantool/ClientReconnectClusterIT.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717

1818
import java.net.ConnectException;
1919
import java.time.Duration;
20+
import java.util.ArrayList;
2021
import java.util.Arrays;
2122
import java.util.Collections;
2223
import java.util.HashMap;
2324
import java.util.List;
2425
import java.util.Map;
2526
import java.util.Set;
2627
import java.util.concurrent.CyclicBarrier;
28+
import java.util.concurrent.ExecutionException;
29+
import java.util.concurrent.Future;
2730
import java.util.concurrent.TimeUnit;
2831
import java.util.function.Consumer;
2932
import java.util.stream.Collectors;
@@ -84,6 +87,48 @@ public void tearDownTest() {
8487
stopInstancesAndAwait(SRV1, SRV2, SRV3);
8588
}
8689

90+
@Test
91+
@DisplayName("requests were re-issued after reconnection")
92+
public void testRetriesOnReconnect() throws ExecutionException, InterruptedException {
93+
CyclicBarrier barrier = new CyclicBarrier(2);
94+
TarantoolClusterClientConfig config = makeDefaultClusterClientConfig();
95+
config.operationExpiryTimeMillis = 3_000;
96+
TarantoolClusterClient client = new TarantoolClusterClient(
97+
config,
98+
"localhost:" + PORTS[0],
99+
"127.0.0.1:" + PORTS[1],
100+
"localhost:" + PORTS[2]
101+
) {
102+
boolean notFirst;
103+
104+
@Override
105+
protected void reconnect(Throwable lastError) {
106+
if (notFirst) {
107+
tryAwait(barrier);
108+
}
109+
notFirst = true;
110+
super.reconnect(lastError);
111+
}
112+
};
113+
114+
stopInstancesAndAwait(SRV1);
115+
116+
List<Future<?>> futures = new ArrayList<>();
117+
futures.add(client.asyncOps().eval("return 1+1"));
118+
futures.add(client.asyncOps().eval("return 1+2"));
119+
futures.add(client.asyncOps().eval("return 1+3"));
120+
futures.add(client.asyncOps().eval("return 1+4"));
121+
122+
tryAwait(barrier);
123+
124+
for (Future<?> future : futures) {
125+
future.get();
126+
}
127+
128+
stopInstancesAndAwait(SRV2);
129+
stopInstancesAndAwait(SRV3);
130+
}
131+
87132
@Test
88133
@DisplayName("reconnected to another node after the current node had disappeared")
89134
public void testRoundRobinReconnect() {

src/test/java/org/tarantool/TestSocketChannelProvider.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static java.net.StandardSocketOptions.SO_LINGER;
44

55
import java.net.InetSocketAddress;
6+
import java.net.SocketAddress;
67
import java.nio.channels.SocketChannel;
78

89
/**
@@ -37,7 +38,7 @@ public SocketChannel get(int retryNumber, Throwable lastError) {
3738
* default behaviour).
3839
*/
3940
channel.setOption(SO_LINGER, soLinger);
40-
channel.connect(new InetSocketAddress(host, port));
41+
channel.connect(getAddress());
4142
return channel;
4243
} catch (Exception e) {
4344
if (budget < System.currentTimeMillis()) {
@@ -53,4 +54,9 @@ public SocketChannel get(int retryNumber, Throwable lastError) {
5354
}
5455
throw new RuntimeException(new InterruptedException());
5556
}
57+
58+
@Override
59+
public SocketAddress getAddress() {
60+
return new InetSocketAddress(host, port);
61+
}
5662
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# This configurations defines a standard console output
2+
# and disables all messages for it by default
3+
4+
handlers = java.util.logging.ConsoleHandler
5+
.level = INFO
6+
java.util.logging.ConsoleHandler.level = OFF
7+
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
8+
# SimpleFormatter uses String.format(format, date, source, logger, level, message, thrown);
9+
java.util.logging.SimpleFormatter.format = [%1$tF %1$tT] [%4$-7s] %3$s %5$s%6$s %n

0 commit comments

Comments
 (0)