Skip to content

Commit 72dc912

Browse files
committed
Logging of client reconnects and transient errors
Add logging waring messages when client tries establishing connection to Tarantool instance. All the request that cannot be processed because of some transient errors are also traced. Disable JUL logging for test classes by default. Follows on: #194
1 parent 7d1b052 commit 72dc912

9 files changed

+153
-32
lines changed

pom.xml

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23
<modelVersion>4.0.0</modelVersion>
34

45
<groupId>org.tarantool</groupId>
@@ -12,6 +13,7 @@
1213
<mockito.version>1.10.19</mockito.version>
1314
<!-- logger dependency versions -->
1415
<sfl4j.version>1.7.26</sfl4j.version>
16+
<project.testResources>${project.basedir}/src/test/resources</project.testResources>
1517
</properties>
1618
<name>Tarantool Connector for Java</name>
1719
<url>https://github.com/tarantool/tarantool-java</url>
@@ -73,17 +75,24 @@
7375
<artifactId>templating-maven-plugin</artifactId>
7476
<version>1.0.0</version>
7577
<executions>
76-
<execution>
77-
<id>filter-src</id>
78-
<goals>
79-
<goal>filter-sources</goal>
80-
</goals>
81-
</execution>
78+
<execution>
79+
<id>filter-src</id>
80+
<goals>
81+
<goal>filter-sources</goal>
82+
</goals>
83+
</execution>
8284
</executions>
8385
</plugin>
8486
<plugin>
8587
<artifactId>maven-surefire-plugin</artifactId>
8688
<version>3.0.0-M3</version>
89+
<configuration>
90+
<systemPropertyVariables>
91+
<java.util.logging.config.file>
92+
${project.testResources}/jul-silent.properties
93+
</java.util.logging.config.file>
94+
</systemPropertyVariables>
95+
</configuration>
8796
</plugin>
8897
<plugin>
8998
<groupId>org.apache.maven.plugins</groupId>
@@ -99,6 +108,12 @@
99108
</executions>
100109
<configuration>
101110
<trimStackTrace>false</trimStackTrace>
111+
<systemPropertyVariables>
112+
<java.util.logging.config.file>
113+
${project.testResources}/jul-silent.properties
114+
</java.util.logging.config.file>
115+
</systemPropertyVariables>
116+
102117
</configuration>
103118
</plugin>
104119
<plugin>

src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.util.concurrent.locks.Lock;
1414
import java.util.concurrent.locks.ReadWriteLock;
1515
import java.util.concurrent.locks.ReentrantReadWriteLock;
16+
import java.util.function.Supplier;
1617
import java.util.stream.Collectors;
1718

1819
/**
@@ -93,13 +94,7 @@ private void updateAddressList(Collection<String> addresses) {
9394
* @return socket addresses
9495
*/
9596
public List<SocketAddress> getAddresses() {
96-
Lock readLock = addressListLock.readLock();
97-
readLock.lock();
98-
try {
99-
return Collections.unmodifiableList(this.socketAddresses);
100-
} finally {
101-
readLock.unlock();
102-
}
97+
return readGuard(() -> Collections.unmodifiableList(this.socketAddresses));
10398
}
10499

105100
/**
@@ -109,14 +104,10 @@ public List<SocketAddress> getAddresses() {
109104
* if {@link #currentPosition} has {@link #UNSET_POSITION} value
110105
*/
111106
protected InetSocketAddress getLastObtainedAddress() {
112-
Lock readLock = addressListLock.readLock();
113-
readLock.lock();
114-
try {
107+
return readGuard(() -> {
115108
int index = currentPosition.get();
116109
return index != UNSET_POSITION ? socketAddresses.get(index) : null;
117-
} finally {
118-
readLock.unlock();
119-
}
110+
});
120111
}
121112

122113
/**
@@ -176,13 +167,7 @@ public void setRetriesLimit(int retriesLimit) {
176167
* @return Number of configured addresses.
177168
*/
178169
protected int getAddressCount() {
179-
Lock readLock = addressListLock.readLock();
180-
readLock.lock();
181-
try {
182-
return socketAddresses.size();
183-
} finally {
184-
readLock.unlock();
185-
}
170+
return readGuard(socketAddresses::size);
186171
}
187172

188173
/**
@@ -191,11 +176,25 @@ protected int getAddressCount() {
191176
* @return Socket address to use for the next reconnection attempt
192177
*/
193178
protected InetSocketAddress getNextSocketAddress() {
179+
return readGuard(() -> {
180+
int position = currentPosition.updateAndGet(i -> (i + 1) % socketAddresses.size());
181+
return socketAddresses.get(position);
182+
});
183+
}
184+
185+
@Override
186+
public SocketAddress getAddress() {
187+
return readGuard(() -> {
188+
int position = (currentPosition.get() + 1) % socketAddresses.size();
189+
return socketAddresses.get(position);
190+
});
191+
}
192+
193+
private <R> R readGuard(Supplier<R> supplier) {
194194
Lock readLock = addressListLock.readLock();
195195
readLock.lock();
196196
try {
197-
int position = currentPosition.updateAndGet(i -> (i + 1) % socketAddresses.size());
198-
return socketAddresses.get(position);
197+
return supplier.get();
199198
} finally {
200199
readLock.unlock();
201200
}

src/main/java/org/tarantool/SingleSocketChannelProviderImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public SingleSocketChannelProviderImpl(String address) {
2424
setAddress(address);
2525
}
2626

27+
@Override
2728
public SocketAddress getAddress() {
2829
return address;
2930
}

src/main/java/org/tarantool/SocketChannelProvider.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,15 @@ public interface SocketChannelProvider {
1919
*/
2020
SocketChannel get(int retryNumber, Throwable lastError);
2121

22+
/**
23+
* Gets an address that will be used when
24+
* {@link #get(int, Throwable)} is invoked.
25+
*
26+
* @return effective address or {@literal null}
27+
* if it cannot be calculated in advance.
28+
*/
29+
default SocketAddress getAddress() {
30+
return null;
31+
}
32+
2233
}

src/main/java/org/tarantool/TarantoolClientImpl.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
package org.tarantool;
22

3+
import org.tarantool.logging.Logger;
4+
import org.tarantool.logging.LoggerFactory;
35
import org.tarantool.protocol.ProtoUtils;
46
import org.tarantool.protocol.ReadableViaSelectorChannel;
57
import org.tarantool.protocol.TarantoolGreeting;
68
import org.tarantool.protocol.TarantoolPacket;
9+
import org.tarantool.util.StringUtils;
710

811
import java.io.IOException;
12+
import java.net.SocketAddress;
913
import java.nio.ByteBuffer;
1014
import java.nio.channels.SocketChannel;
15+
import java.util.Arrays;
1116
import java.util.Iterator;
1217
import java.util.List;
1318
import java.util.Map;
@@ -28,6 +33,8 @@
2833

2934
public class TarantoolClientImpl extends TarantoolBase<Future<?>> implements TarantoolClient {
3035

36+
private static final Logger LOGGER = LoggerFactory.getLogger(TarantoolClientImpl.class);
37+
3138
public static final CommunicationException NOT_INIT_EXCEPTION
3239
= new CommunicationException("Not connected, initializing connection");
3340

@@ -147,6 +154,15 @@ protected void reconnect(Throwable lastError) {
147154
int retryNumber = 0;
148155
while (!Thread.currentThread().isInterrupted()) {
149156
try {
157+
if (lastError != NOT_INIT_EXCEPTION) {
158+
LOGGER.warn(() -> {
159+
SocketAddress address = socketProvider.getAddress();
160+
return "Attempt to (re-)connect to Tarantool instance " +
161+
(StringUtils.isBlank(config.username) ? "" : config.username + ":*****@") +
162+
(address == null ? "unknown" : address);
163+
}, lastError
164+
);
165+
}
150166
channel = socketProvider.get(retryNumber++, lastError == NOT_INIT_EXCEPTION ? null : lastError);
151167
} catch (Exception e) {
152168
closeChannel(channel);
@@ -714,6 +730,15 @@ public Object[] getArgs() {
714730
return args;
715731
}
716732

733+
@Override
734+
public String toString() {
735+
return "TarantoolOp{" +
736+
"id=" + id +
737+
", code=" + code +
738+
", args=" + Arrays.toString(args) +
739+
'}';
740+
}
741+
717742
/**
718743
* Missed in jdk8 CompletableFuture operator to limit execution
719744
* by time.

src/main/java/org/tarantool/TarantoolClusterClient.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import org.tarantool.cluster.TarantoolClusterDiscoverer;
44
import org.tarantool.cluster.TarantoolClusterStoredFunctionDiscoverer;
5+
import org.tarantool.logging.Logger;
6+
import org.tarantool.logging.LoggerFactory;
57
import org.tarantool.protocol.TarantoolPacket;
68
import org.tarantool.util.StringUtils;
79

@@ -27,6 +29,8 @@
2729
*/
2830
public class TarantoolClusterClient extends TarantoolClientImpl {
2931

32+
private static final Logger LOGGER = LoggerFactory.getLogger(TarantoolClusterClient.class);
33+
3034
/**
3135
* Need some execution context to retry writes.
3236
*/
@@ -153,6 +157,7 @@ protected boolean checkFail(TarantoolOp<?> future, Exception e) {
153157
} else {
154158
assert retries != null;
155159
retries.put(future.getId(), future);
160+
LOGGER.trace("Request {0} was delayed because of {1}", future, e);
156161
return false;
157162
}
158163
}
@@ -194,13 +199,18 @@ protected void onReconnect() {
194199
// First call is before the constructor finished. Skip it.
195200
return;
196201
}
197-
Collection<TarantoolOp<?>> futuresToRetry = new ArrayList<>(retries.values());
202+
Collection<TarantoolOp<?>> delayed = new ArrayList<>(retries.values());
203+
Collection<TarantoolOp<?>> reissued = new ArrayList<>(retries.size());
198204
retries.clear();
199-
for (final TarantoolOp<?> future : futuresToRetry) {
205+
for (final TarantoolOp<?> future : delayed) {
200206
if (!future.isDone()) {
201207
executor.execute(() -> registerOperation(future));
208+
reissued.add(future);
202209
}
203210
}
211+
for (final TarantoolOp<?> future : reissued) {
212+
LOGGER.trace("{0} was re-issued after reconnection", future);
213+
}
204214
}
205215

206216
@Override

src/test/java/org/tarantool/ClientReconnectClusterIT.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@
1414
import org.junit.jupiter.api.Test;
1515

1616
import java.time.Duration;
17+
import java.util.ArrayList;
1718
import java.util.Arrays;
1819
import java.util.Collections;
1920
import java.util.HashMap;
2021
import java.util.List;
2122
import java.util.Map;
2223
import java.util.Set;
2324
import java.util.concurrent.CyclicBarrier;
25+
import java.util.concurrent.ExecutionException;
26+
import java.util.concurrent.Future;
2427
import java.util.concurrent.TimeUnit;
2528
import java.util.function.Consumer;
2629
import java.util.stream.Collectors;
@@ -81,6 +84,48 @@ public void tearDownTest() {
8184
stopInstancesAndAwait(SRV1, SRV2, SRV3);
8285
}
8386

87+
@Test
88+
@DisplayName("requests were re-issued after reconnection")
89+
public void testRetriesOnReconnect() throws ExecutionException, InterruptedException {
90+
CyclicBarrier barrier = new CyclicBarrier(2);
91+
TarantoolClusterClientConfig config = makeDefaultClusterClientConfig();
92+
config.operationExpiryTimeMillis = 3_000;
93+
TarantoolClusterClient client = new TarantoolClusterClient(
94+
config,
95+
"localhost:" + PORTS[0],
96+
"127.0.0.1:" + PORTS[1],
97+
"localhost:" + PORTS[2]
98+
) {
99+
boolean notFirst;
100+
101+
@Override
102+
protected void reconnect(Throwable lastError) {
103+
if (notFirst) {
104+
tryAwait(barrier);
105+
}
106+
notFirst = true;
107+
super.reconnect(lastError);
108+
}
109+
};
110+
111+
stopInstancesAndAwait(SRV1);
112+
113+
List<Future<?>> futures = new ArrayList<>();
114+
futures.add(client.asyncOps().eval("return 1+1"));
115+
futures.add(client.asyncOps().eval("return 1+2"));
116+
futures.add(client.asyncOps().eval("return 1+3"));
117+
futures.add(client.asyncOps().eval("return 1+4"));
118+
119+
tryAwait(barrier);
120+
121+
for (Future<?> future : futures) {
122+
future.get();
123+
}
124+
125+
stopInstancesAndAwait(SRV2);
126+
stopInstancesAndAwait(SRV3);
127+
}
128+
84129
@Test
85130
@DisplayName("reconnected to another node after the current node had disappeared")
86131
public void testRoundRobinReconnect() {

src/test/java/org/tarantool/TestSocketChannelProvider.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static java.net.StandardSocketOptions.SO_LINGER;
44

55
import java.net.InetSocketAddress;
6+
import java.net.SocketAddress;
67
import java.nio.channels.SocketChannel;
78

89
/**
@@ -37,7 +38,7 @@ public SocketChannel get(int retryNumber, Throwable lastError) {
3738
* default behaviour).
3839
*/
3940
channel.setOption(SO_LINGER, soLinger);
40-
channel.connect(new InetSocketAddress(host, port));
41+
channel.connect(getAddress());
4142
return channel;
4243
} catch (Exception e) {
4344
if (budget < System.currentTimeMillis()) {
@@ -53,4 +54,9 @@ public SocketChannel get(int retryNumber, Throwable lastError) {
5354
}
5455
throw new RuntimeException(new InterruptedException());
5556
}
57+
58+
@Override
59+
public SocketAddress getAddress() {
60+
return new InetSocketAddress(host, port);
61+
}
5662
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# This configurations defines a standard console output
2+
# and disables all messages for it by default
3+
4+
handlers = java.util.logging.ConsoleHandler
5+
.level = INFO
6+
java.util.logging.ConsoleHandler.level = OFF
7+
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
8+
# SimpleFormatter uses String.format(format, date, source, logger, level, message, thrown);
9+
java.util.logging.SimpleFormatter.format = [%1$tF %1$tT] [%4$-7s] %3$s %5$s%6$s %n

0 commit comments

Comments
 (0)