From 19af90b1b80536dc951f4969d811d225566e0f56 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 19 May 2025 21:54:10 +0200 Subject: [PATCH 1/6] ConnectionLoadBalanceTest --- docker/start_db.sh | 2 +- .../ConnectionLoadBalanceTest.java | 97 +++++++++++++++++++ test-parent/pom.xml | 11 +++ test-resilience/pom.xml | 6 -- 4 files changed, 109 insertions(+), 7 deletions(-) create mode 100644 test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java diff --git a/docker/start_db.sh b/docker/start_db.sh index 4c0c2e9a2..e8c58ebcd 100755 --- a/docker/start_db.sh +++ b/docker/start_db.sh @@ -66,7 +66,7 @@ docker run -d \ --starter.address="${GW}" \ --docker.image="${DOCKER_IMAGE}" \ --starter.local --starter.mode=${STARTER_MODE} --all.log.level=debug --all.log.output=+ --log.verbose \ - --all.server.descriptors-minimum=1024 --all.javascript.allow-admin-execute=true + --all.server.descriptors-minimum=1024 --all.javascript.allow-admin-execute=true --all.server.maximal-threads=128 wait_server() { diff --git a/test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java b/test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java new file mode 100644 index 000000000..926c8a112 --- /dev/null +++ b/test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java @@ -0,0 +1,97 @@ +package concurrency; + +import com.arangodb.*; +import com.arangodb.config.ArangoConfigProperties; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import util.TestUtils; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.await; + +public class ConnectionLoadBalanceTest { + + public static Stream configs() { + return Stream.of( + // FIXME: DE-1017 + // new Config(Protocol.VST, 1), + // new Config(Protocol.VST, 2), + new Config(Protocol.HTTP_JSON, 10), + new Config(Protocol.HTTP_JSON, 20), + new Config(Protocol.HTTP2_JSON, 1), + new Config(Protocol.HTTP2_JSON, 2) + ).map(Arguments::of); + } + + // Test the requests load balancing across different connections, when all the slots except 1 are busy + @MethodSource("configs") + @ParameterizedTest + void loadBalanceToFreeConnection(Config cfg) throws InterruptedException { + doTestLoadBalance(cfg, 1); + } + + // Test the requests load balancing across different connections, when all the slots are busy + @MethodSource("configs") + @ParameterizedTest + void loadBalanceAllBusy(Config cfg) throws InterruptedException { + doTestLoadBalance(cfg, 2); + } + + void doTestLoadBalance(Config cfg, int sleepCycles) throws InterruptedException { + int longTasksCount = cfg.maxStreams() * cfg.maxConnections * sleepCycles - 1; + int shortTasksCount = 10; + long sleepDuration = 2; + + ArangoDatabaseAsync db = new ArangoDB.Builder() + .loadProperties(ArangoConfigProperties.fromFile()) + .protocol(cfg.protocol) + .serde(TestUtils.createSerde(cfg.protocol)) + .maxConnections(cfg.maxConnections) + .build().async().db(); + + CompletableFuture longRunningTasks = CompletableFuture.allOf( + IntStream.range(0, longTasksCount) + .mapToObj(__ -> + db.query("RETURN SLEEP(@duration)", Void.class, Map.of("duration", sleepDuration))) + .toArray(CompletableFuture[]::new) + ); + + Thread.sleep(100); + + CompletableFuture shortRunningTasks = CompletableFuture.allOf( + IntStream.range(0, shortTasksCount) + .mapToObj(__ -> db.query("RETURN 1", Integer.class)) + .toArray(CompletableFuture[]::new) + ); + + await() + .timeout(Duration.ofSeconds(sleepDuration * sleepCycles - 1L)) + .until(shortRunningTasks::isDone); + + await() + .timeout(Duration.ofSeconds(sleepDuration * sleepCycles + 1L)) + .until(longRunningTasks::isDone); + + shortRunningTasks.join(); + longRunningTasks.join(); + db.arango().shutdown(); + } + + private record Config( + Protocol protocol, + int maxConnections + ) { + int maxStreams() { + return switch (protocol) { + case HTTP_JSON, HTTP_VPACK -> 1; + default -> 32; + }; + } + } +} diff --git a/test-parent/pom.xml b/test-parent/pom.xml index b68b5c4d1..59d33dc28 100644 --- a/test-parent/pom.xml +++ b/test-parent/pom.xml @@ -60,6 +60,11 @@ assertj-core test + + org.awaitility + awaitility + test + @@ -93,6 +98,12 @@ assertj-core 3.25.3 + + org.awaitility + awaitility + 4.2.1 + test + com.tngtech.archunit archunit-junit5 diff --git a/test-resilience/pom.xml b/test-resilience/pom.xml index bd7963bbc..3e18b7950 100644 --- a/test-resilience/pom.xml +++ b/test-resilience/pom.xml @@ -29,12 +29,6 @@ 2.1.7 test - - org.awaitility - awaitility - 4.2.0 - test - ch.qos.logback logback-classic From 147a632d57f639c591f142d42077f6eb4d89e1dc Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Wed, 21 May 2025 11:27:52 +0200 Subject: [PATCH 2/6] added pipelining option --- core/pom.xml | 2 +- core/src/main/java/com/arangodb/ArangoDB.java | 11 +++++++++++ .../com/arangodb/config/ArangoConfigProperties.java | 5 +++++ .../java/com/arangodb/internal/ArangoDefaults.java | 1 + .../com/arangodb/internal/config/ArangoConfig.java | 10 ++++++++++ .../internal/config/ArangoConfigPropertiesImpl.java | 5 +++++ driver/pom.xml | 2 +- http-protocol/pom.xml | 2 +- jackson-serde-json/pom.xml | 2 +- jackson-serde-vpack/pom.xml | 2 +- jsonb-serde/pom.xml | 2 +- pom.xml | 2 +- release-parent/pom.xml | 2 +- shaded/pom.xml | 2 +- test-functional/pom.xml | 2 +- .../src/test/java/com/arangodb/UserAgentTest.java | 2 +- test-non-functional/pom.xml | 2 +- .../test/java/mp/ArangoConfigPropertiesMPImpl.java | 11 +++++++++-- .../src/test/java/mp/ConfigMPDefaultsTest.java | 1 + .../src/test/java/mp/ConfigMPTest.java | 2 ++ test-parent/pom.xml | 2 +- test-perf/pom.xml | 2 +- test-resilience/pom.xml | 2 +- tutorial/gradle/build.gradle | 2 +- tutorial/maven/pom.xml | 2 +- vst-protocol/pom.xml | 2 +- 26 files changed, 62 insertions(+), 20 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index df4126689..7e7c87a39 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -8,7 +8,7 @@ ../release-parent com.arangodb release-parent - 7.18.0 + 7.19.0-SNAPSHOT core diff --git a/core/src/main/java/com/arangodb/ArangoDB.java b/core/src/main/java/com/arangodb/ArangoDB.java index a865e0306..7ebdb2e7e 100644 --- a/core/src/main/java/com/arangodb/ArangoDB.java +++ b/core/src/main/java/com/arangodb/ArangoDB.java @@ -517,6 +517,17 @@ public Builder chunkSize(final Integer chunkSize) { return this; } + /** + * Set whether to use requests pipelining in HTTP/1.1 ({@link Protocol#HTTP_JSON} or {@link Protocol#HTTP_VPACK}). + * + * @param pipelining {@code true} if enabled + * @return {@link ArangoDB.Builder} + */ + public Builder pipelining(final Boolean pipelining) { + config.setPipelining(pipelining); + return this; + } + /** * Sets the maximum number of connections the built in connection pool will open per host. * diff --git a/core/src/main/java/com/arangodb/config/ArangoConfigProperties.java b/core/src/main/java/com/arangodb/config/ArangoConfigProperties.java index 047d55280..832c8862d 100644 --- a/core/src/main/java/com/arangodb/config/ArangoConfigProperties.java +++ b/core/src/main/java/com/arangodb/config/ArangoConfigProperties.java @@ -21,6 +21,7 @@ public interface ArangoConfigProperties { String KEY_USE_SSL = "useSsl"; String KEY_VERIFY_HOST = "verifyHost"; String KEY_CHUNK_SIZE = "chunkSize"; + String KEY_PIPELINING = "pipelining"; String KEY_MAX_CONNECTIONS = "maxConnections"; String KEY_CONNECTION_TTL = "connectionTtl"; String KEY_KEEP_ALIVE_INTERVAL = "keepAliveInterval"; @@ -110,6 +111,10 @@ default Optional getChunkSize() { return Optional.empty(); } + default Optional getPipelining() { + return Optional.empty(); + } + default Optional getMaxConnections() { return Optional.empty(); } diff --git a/core/src/main/java/com/arangodb/internal/ArangoDefaults.java b/core/src/main/java/com/arangodb/internal/ArangoDefaults.java index b08c045da..25448187c 100644 --- a/core/src/main/java/com/arangodb/internal/ArangoDefaults.java +++ b/core/src/main/java/com/arangodb/internal/ArangoDefaults.java @@ -50,6 +50,7 @@ public final class ArangoDefaults { public static final Boolean DEFAULT_USE_SSL = false; public static final Boolean DEFAULT_VERIFY_HOST = true; public static final Integer DEFAULT_CHUNK_SIZE = 30_000; + public static final Boolean DEFAULT_PIPELINING = false; public static final Boolean DEFAULT_ACQUIRE_HOST_LIST = false; public static final Integer DEFAULT_ACQUIRE_HOST_LIST_INTERVAL = 60 * 60 * 1000; // hour public static final LoadBalancingStrategy DEFAULT_LOAD_BALANCING_STRATEGY = LoadBalancingStrategy.NONE; diff --git a/core/src/main/java/com/arangodb/internal/config/ArangoConfig.java b/core/src/main/java/com/arangodb/internal/config/ArangoConfig.java index f38903532..f95e0a4e2 100644 --- a/core/src/main/java/com/arangodb/internal/config/ArangoConfig.java +++ b/core/src/main/java/com/arangodb/internal/config/ArangoConfig.java @@ -33,6 +33,7 @@ public class ArangoConfig { private SSLContext sslContext; private Boolean verifyHost; private Integer chunkSize; + private Boolean pipelining; private Integer maxConnections; private Long connectionTtl; private Integer keepAliveInterval; @@ -70,6 +71,7 @@ public void loadProperties(final ArangoConfigProperties properties) { useSsl = properties.getUseSsl().orElse(ArangoDefaults.DEFAULT_USE_SSL); verifyHost = properties.getVerifyHost().orElse(ArangoDefaults.DEFAULT_VERIFY_HOST); chunkSize = properties.getChunkSize().orElse(ArangoDefaults.DEFAULT_CHUNK_SIZE); + pipelining = properties.getPipelining().orElse(ArangoDefaults.DEFAULT_PIPELINING); // FIXME: make maxConnections field Optional maxConnections = properties.getMaxConnections().orElse(null); // FIXME: make connectionTtl field Optional @@ -173,6 +175,14 @@ public void setChunkSize(Integer chunkSize) { this.chunkSize = chunkSize; } + public Boolean getPipelining() { + return pipelining; + } + + public void setPipelining(Boolean pipelining) { + this.pipelining = pipelining; + } + public Integer getMaxConnections() { if (maxConnections == null) { maxConnections = getDefaultMaxConnections(); diff --git a/core/src/main/java/com/arangodb/internal/config/ArangoConfigPropertiesImpl.java b/core/src/main/java/com/arangodb/internal/config/ArangoConfigPropertiesImpl.java index e4502d3d8..f7d865f81 100644 --- a/core/src/main/java/com/arangodb/internal/config/ArangoConfigPropertiesImpl.java +++ b/core/src/main/java/com/arangodb/internal/config/ArangoConfigPropertiesImpl.java @@ -119,6 +119,11 @@ public Optional getChunkSize() { return Optional.ofNullable(getProperty(KEY_CHUNK_SIZE)).map(Integer::valueOf); } + @Override + public Optional getPipelining() { + return Optional.ofNullable(getProperty(KEY_PIPELINING)).map(Boolean::valueOf); + } + @Override public Optional getMaxConnections() { return Optional.ofNullable(getProperty(KEY_MAX_CONNECTIONS)).map(Integer::valueOf); diff --git a/driver/pom.xml b/driver/pom.xml index 2d31dfb0c..542b239a5 100644 --- a/driver/pom.xml +++ b/driver/pom.xml @@ -8,7 +8,7 @@ ../release-parent com.arangodb release-parent - 7.18.0 + 7.19.0-SNAPSHOT arangodb-java-driver diff --git a/http-protocol/pom.xml b/http-protocol/pom.xml index d9b538f46..2bd8fc87c 100644 --- a/http-protocol/pom.xml +++ b/http-protocol/pom.xml @@ -8,7 +8,7 @@ ../release-parent com.arangodb release-parent - 7.18.0 + 7.19.0-SNAPSHOT http-protocol diff --git a/jackson-serde-json/pom.xml b/jackson-serde-json/pom.xml index 61b9acd55..816d6eefc 100644 --- a/jackson-serde-json/pom.xml +++ b/jackson-serde-json/pom.xml @@ -8,7 +8,7 @@ ../release-parent com.arangodb release-parent - 7.18.0 + 7.19.0-SNAPSHOT jackson-serde-json diff --git a/jackson-serde-vpack/pom.xml b/jackson-serde-vpack/pom.xml index 81a581480..14d2cafc5 100644 --- a/jackson-serde-vpack/pom.xml +++ b/jackson-serde-vpack/pom.xml @@ -8,7 +8,7 @@ ../release-parent com.arangodb release-parent - 7.18.0 + 7.19.0-SNAPSHOT jackson-serde-vpack diff --git a/jsonb-serde/pom.xml b/jsonb-serde/pom.xml index 5cb72c2a1..e3d23beae 100644 --- a/jsonb-serde/pom.xml +++ b/jsonb-serde/pom.xml @@ -8,7 +8,7 @@ ../release-parent com.arangodb release-parent - 7.18.0 + 7.19.0-SNAPSHOT jsonb-serde diff --git a/pom.xml b/pom.xml index 3493461af..d1c2a81f4 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.arangodb arangodb-java-driver-parent - 7.18.0 + 7.19.0-SNAPSHOT 2016 release-parent diff --git a/release-parent/pom.xml b/release-parent/pom.xml index e0846a551..502df1b43 100644 --- a/release-parent/pom.xml +++ b/release-parent/pom.xml @@ -6,7 +6,7 @@ com.arangodb arangodb-java-driver-parent - 7.18.0 + 7.19.0-SNAPSHOT pom diff --git a/shaded/pom.xml b/shaded/pom.xml index 76e43d4d3..6a4147cd1 100644 --- a/shaded/pom.xml +++ b/shaded/pom.xml @@ -8,7 +8,7 @@ ../release-parent com.arangodb release-parent - 7.18.0 + 7.19.0-SNAPSHOT arangodb-java-driver-shaded diff --git a/test-functional/pom.xml b/test-functional/pom.xml index 71bdae4ec..a8fd16797 100644 --- a/test-functional/pom.xml +++ b/test-functional/pom.xml @@ -8,7 +8,7 @@ ../test-parent com.arangodb test-parent - 7.18.0 + 7.19.0-SNAPSHOT test-functional diff --git a/test-functional/src/test/java/com/arangodb/UserAgentTest.java b/test-functional/src/test/java/com/arangodb/UserAgentTest.java index aa794f069..9af38f28a 100644 --- a/test-functional/src/test/java/com/arangodb/UserAgentTest.java +++ b/test-functional/src/test/java/com/arangodb/UserAgentTest.java @@ -10,7 +10,7 @@ class UserAgentTest extends BaseJunit5 { - private static final String EXPECTED_VERSION = "7.18.0"; + private static final String EXPECTED_VERSION = "7.19.0-SNAPSHOT"; private static final boolean SHADED = Boolean.parseBoolean(System.getProperty("shaded")); diff --git a/test-non-functional/pom.xml b/test-non-functional/pom.xml index bf7219a25..a10f0aaa9 100644 --- a/test-non-functional/pom.xml +++ b/test-non-functional/pom.xml @@ -8,7 +8,7 @@ ../test-parent com.arangodb test-parent - 7.18.0 + 7.19.0-SNAPSHOT test-non-functional diff --git a/test-non-functional/src/test/java/mp/ArangoConfigPropertiesMPImpl.java b/test-non-functional/src/test/java/mp/ArangoConfigPropertiesMPImpl.java index 1a0407a4c..4cac7a647 100644 --- a/test-non-functional/src/test/java/mp/ArangoConfigPropertiesMPImpl.java +++ b/test-non-functional/src/test/java/mp/ArangoConfigPropertiesMPImpl.java @@ -23,6 +23,7 @@ public final class ArangoConfigPropertiesMPImpl implements ArangoConfigPropertie private Optional useSsl; private Optional verifyHost; private Optional chunkSize; + private Optional pipelining; private Optional maxConnections; private Optional connectionTtl; private Optional keepAliveInterval; @@ -80,6 +81,11 @@ public Optional getChunkSize() { return chunkSize; } + @Override + public Optional getPipelining() { + return pipelining; + } + @Override public Optional getMaxConnections() { return maxConnections; @@ -140,12 +146,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ArangoConfigPropertiesMPImpl that = (ArangoConfigPropertiesMPImpl) o; - return Objects.equals(hosts, that.hosts) && Objects.equals(protocol, that.protocol) && Objects.equals(user, that.user) && Objects.equals(password, that.password) && Objects.equals(jwt, that.jwt) && Objects.equals(timeout, that.timeout) && Objects.equals(useSsl, that.useSsl) && Objects.equals(verifyHost, that.verifyHost) && Objects.equals(chunkSize, that.chunkSize) && Objects.equals(maxConnections, that.maxConnections) && Objects.equals(connectionTtl, that.connectionTtl) && Objects.equals(keepAliveInterval, that.keepAliveInterval) && Objects.equals(acquireHostList, that.acquireHostList) && Objects.equals(acquireHostListInterval, that.acquireHostListInterval) && Objects.equals(loadBalancingStrategy, that.loadBalancingStrategy) && Objects.equals(responseQueueTimeSamples, that.responseQueueTimeSamples) && Objects.equals(compression, that.compression) && Objects.equals(compressionThreshold, that.compressionThreshold) && Objects.equals(compressionLevel, that.compressionLevel) && Objects.equals(serdeProviderClass, that.serdeProviderClass); + return Objects.equals(hosts, that.hosts) && Objects.equals(protocol, that.protocol) && Objects.equals(user, that.user) && Objects.equals(password, that.password) && Objects.equals(jwt, that.jwt) && Objects.equals(timeout, that.timeout) && Objects.equals(useSsl, that.useSsl) && Objects.equals(verifyHost, that.verifyHost) && Objects.equals(chunkSize, that.chunkSize) && Objects.equals(pipelining, that.pipelining) && Objects.equals(maxConnections, that.maxConnections) && Objects.equals(connectionTtl, that.connectionTtl) && Objects.equals(keepAliveInterval, that.keepAliveInterval) && Objects.equals(acquireHostList, that.acquireHostList) && Objects.equals(acquireHostListInterval, that.acquireHostListInterval) && Objects.equals(loadBalancingStrategy, that.loadBalancingStrategy) && Objects.equals(responseQueueTimeSamples, that.responseQueueTimeSamples) && Objects.equals(compression, that.compression) && Objects.equals(compressionThreshold, that.compressionThreshold) && Objects.equals(compressionLevel, that.compressionLevel) && Objects.equals(serdeProviderClass, that.serdeProviderClass); } @Override public int hashCode() { - return Objects.hash(hosts, protocol, user, password, jwt, timeout, useSsl, verifyHost, chunkSize, maxConnections, connectionTtl, keepAliveInterval, acquireHostList, acquireHostListInterval, loadBalancingStrategy, responseQueueTimeSamples, compression, compressionThreshold, compressionLevel, serdeProviderClass); + return Objects.hash(hosts, protocol, user, password, jwt, timeout, useSsl, verifyHost, chunkSize, pipelining, maxConnections, connectionTtl, keepAliveInterval, acquireHostList, acquireHostListInterval, loadBalancingStrategy, responseQueueTimeSamples, compression, compressionThreshold, compressionLevel, serdeProviderClass); } @Override @@ -160,6 +166,7 @@ public String toString() { ", useSsl=" + useSsl + ", verifyHost=" + verifyHost + ", chunkSize=" + chunkSize + + ", pipelining=" + pipelining + ", maxConnections=" + maxConnections + ", connectionTtl=" + connectionTtl + ", keepAliveInterval=" + keepAliveInterval + diff --git a/test-non-functional/src/test/java/mp/ConfigMPDefaultsTest.java b/test-non-functional/src/test/java/mp/ConfigMPDefaultsTest.java index 3ff81bd04..5a8f861ef 100644 --- a/test-non-functional/src/test/java/mp/ConfigMPDefaultsTest.java +++ b/test-non-functional/src/test/java/mp/ConfigMPDefaultsTest.java @@ -25,6 +25,7 @@ private void checkResult(ArangoConfigProperties config) { assertThat(config.getUseSsl()).isEmpty(); assertThat(config.getVerifyHost()).isEmpty(); assertThat(config.getChunkSize()).isEmpty(); + assertThat(config.getPipelining()).isEmpty(); assertThat(config.getMaxConnections()).isNotPresent(); assertThat(config.getConnectionTtl()).isNotPresent(); assertThat(config.getKeepAliveInterval()).isNotPresent(); diff --git a/test-non-functional/src/test/java/mp/ConfigMPTest.java b/test-non-functional/src/test/java/mp/ConfigMPTest.java index 4a7aaa993..5d5f605e3 100644 --- a/test-non-functional/src/test/java/mp/ConfigMPTest.java +++ b/test-non-functional/src/test/java/mp/ConfigMPTest.java @@ -23,6 +23,7 @@ class ConfigMPTest { private final Boolean useSsl = true; private final Boolean verifyHost = false; private final Integer vstChunkSize = 1234; + private final Boolean pipelining = true; private final Integer maxConnections = 123; private final Long connectionTtl = 12345L; private final Integer keepAliveInterval = 123456; @@ -58,6 +59,7 @@ private void checkResult(ArangoConfigProperties config) { assertThat(config.getUseSsl()).hasValue(useSsl); assertThat(config.getVerifyHost()).hasValue(verifyHost); assertThat(config.getChunkSize()).hasValue(vstChunkSize); + assertThat(config.getPipelining()).hasValue(pipelining); assertThat(config.getMaxConnections()) .isPresent() .hasValue(maxConnections); diff --git a/test-parent/pom.xml b/test-parent/pom.xml index 59d33dc28..d582c6f5b 100644 --- a/test-parent/pom.xml +++ b/test-parent/pom.xml @@ -7,7 +7,7 @@ com.arangodb arangodb-java-driver-parent - 7.18.0 + 7.19.0-SNAPSHOT pom diff --git a/test-perf/pom.xml b/test-perf/pom.xml index 4e31b0ab1..3ab88b158 100644 --- a/test-perf/pom.xml +++ b/test-perf/pom.xml @@ -7,7 +7,7 @@ ../test-parent com.arangodb test-parent - 7.18.0 + 7.19.0-SNAPSHOT test-perf diff --git a/test-resilience/pom.xml b/test-resilience/pom.xml index 3e18b7950..a6b8f4f56 100644 --- a/test-resilience/pom.xml +++ b/test-resilience/pom.xml @@ -6,7 +6,7 @@ ../test-parent com.arangodb test-parent - 7.18.0 + 7.19.0-SNAPSHOT 4.0.0 diff --git a/tutorial/gradle/build.gradle b/tutorial/gradle/build.gradle index cb34d15d7..ff468c6db 100644 --- a/tutorial/gradle/build.gradle +++ b/tutorial/gradle/build.gradle @@ -12,7 +12,7 @@ repositories { } dependencies { - implementation 'com.arangodb:arangodb-java-driver:7.18.0' + implementation 'com.arangodb:arangodb-java-driver:7.19.0-SNAPSHOT' } ext { diff --git a/tutorial/maven/pom.xml b/tutorial/maven/pom.xml index e5e32c7c2..7e33fd350 100644 --- a/tutorial/maven/pom.xml +++ b/tutorial/maven/pom.xml @@ -19,7 +19,7 @@ com.arangodb arangodb-java-driver - 7.18.0 + 7.19.0-SNAPSHOT diff --git a/vst-protocol/pom.xml b/vst-protocol/pom.xml index 94b176118..3aedd968d 100644 --- a/vst-protocol/pom.xml +++ b/vst-protocol/pom.xml @@ -8,7 +8,7 @@ ../release-parent com.arangodb release-parent - 7.18.0 + 7.19.0-SNAPSHOT vst-protocol From 0096781634acd666c34cd72706904fa086c5391b Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Fri, 23 May 2025 11:19:02 +0200 Subject: [PATCH 3/6] async connection pool --- .../arangodb/internal/net/Communication.java | 4 +- .../arangodb/internal/net/ConnectionPool.java | 5 ++- .../internal/net/ConnectionPoolImpl.java | 39 +++++++++++++------ .../java/com/arangodb/internal/net/Host.java | 6 ++- .../com/arangodb/internal/net/HostImpl.java | 8 +++- .../arangodb/internal/util/AsyncQueue.java | 30 ++++++++++++++ .../com/arangodb/http/HttpConnection.java | 10 +++-- .../arangodb/internal/HostHandlerTest.java | 8 +++- .../test/resources/simplelogger.properties | 2 + .../ConnectionLoadBalanceTest.java | 28 +++++++++---- .../resources/arangodb-config-test.properties | 1 + .../test/resources/simplelogger.properties | 2 + 12 files changed, 115 insertions(+), 28 deletions(-) create mode 100644 core/src/main/java/com/arangodb/internal/util/AsyncQueue.java diff --git a/core/src/main/java/com/arangodb/internal/net/Communication.java b/core/src/main/java/com/arangodb/internal/net/Communication.java index 0309cd04c..1bbe59f25 100644 --- a/core/src/main/java/com/arangodb/internal/net/Communication.java +++ b/core/src/main/java/com/arangodb/internal/net/Communication.java @@ -50,7 +50,9 @@ public CompletableFuture executeAsync(final InternalRequest re private CompletableFuture executeAsync(final InternalRequest request, final HostHandle hostHandle, final Host host, final int attemptCount) { long reqId = reqCount.getAndIncrement(); - return doExecuteAsync(request, hostHandle, host, attemptCount, host.connection(), reqId); + return host.connection().thenCompose(c -> + doExecuteAsync(request, hostHandle, host, attemptCount, c, reqId) + .whenComplete((r, t) -> host.release(c))); } private CompletableFuture doExecuteAsync( diff --git a/core/src/main/java/com/arangodb/internal/net/ConnectionPool.java b/core/src/main/java/com/arangodb/internal/net/ConnectionPool.java index 91c12bb02..1fdb5130b 100644 --- a/core/src/main/java/com/arangodb/internal/net/ConnectionPool.java +++ b/core/src/main/java/com/arangodb/internal/net/ConnectionPool.java @@ -23,6 +23,7 @@ import com.arangodb.config.HostDescription; import java.io.Closeable; +import java.util.concurrent.CompletableFuture; /** * @author Mark Vollmary @@ -31,7 +32,9 @@ public interface ConnectionPool extends Closeable { Connection createConnection(final HostDescription host); - Connection connection(); + CompletableFuture connection(); + + void release(final Connection connection); void setJwt(String jwt); diff --git a/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java b/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java index 8337a67bf..f85c456ce 100644 --- a/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java +++ b/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java @@ -23,22 +23,28 @@ import com.arangodb.ArangoDBException; import com.arangodb.config.HostDescription; import com.arangodb.internal.config.ArangoConfig; +import com.arangodb.internal.util.AsyncQueue; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; /** * @author Mark Vollmary */ public class ConnectionPoolImpl implements ConnectionPool { + public static final int HTTP1_PIPELINING_LIMIT = 10; + public static final int HTTP2_STREAMS = 32; // hard-coded, see BTS-2049 + + private final AsyncQueue slots = new AsyncQueue<>(); private final HostDescription host; private final ArangoConfig config; private final int maxConnections; private final List connections; private final ConnectionFactory factory; - private int current; + private final int maxSlots; private volatile String jwt = null; private boolean closed = false; @@ -49,7 +55,14 @@ public ConnectionPoolImpl(final HostDescription host, final ArangoConfig config, this.maxConnections = config.getMaxConnections(); this.factory = factory; connections = new ArrayList<>(); - current = 0; + switch (config.getProtocol()) { + case HTTP_JSON: + case HTTP_VPACK: + maxSlots = config.getPipelining() ? HTTP1_PIPELINING_LIMIT : 1; + break; + default: + maxSlots = HTTP2_STREAMS; + } } @Override @@ -60,23 +73,25 @@ public Connection createConnection(final HostDescription host) { } @Override - public synchronized Connection connection() { + public synchronized CompletableFuture connection() { if (closed) { throw new ArangoDBException("Connection pool already closed!"); } - final Connection connection; - if (connections.size() < maxConnections) { - connection = createConnection(host); + Connection connection = createConnection(host); connections.add(connection); - current++; - } else { - final int index = Math.floorMod(current++, connections.size()); - connection = connections.get(index); + for (int i = 0; i < maxSlots; i++) { + slots.offer((connection)); + } } - return connection; + return slots.poll(); + } + + @Override + public void release(Connection connection) { + slots.offer(connection); } @Override @@ -101,7 +116,7 @@ public synchronized void close() throws IOException { @Override public String toString() { return "ConnectionPoolImpl [host=" + host + ", maxConnections=" + maxConnections + ", connections=" - + connections.size() + ", current=" + current + ", factory=" + factory.getClass().getSimpleName() + "]"; + + connections.size() + ", factory=" + factory.getClass().getSimpleName() + "]"; } } diff --git a/core/src/main/java/com/arangodb/internal/net/Host.java b/core/src/main/java/com/arangodb/internal/net/Host.java index 07fd3c6ee..08f49516d 100644 --- a/core/src/main/java/com/arangodb/internal/net/Host.java +++ b/core/src/main/java/com/arangodb/internal/net/Host.java @@ -24,6 +24,7 @@ import com.arangodb.config.HostDescription; import java.io.IOException; +import java.util.concurrent.CompletableFuture; /** * @author Mark Vollmary @@ -33,7 +34,9 @@ public interface Host { HostDescription getDescription(); - Connection connection(); + CompletableFuture connection(); + + void release(Connection c); void closeOnError(); @@ -44,5 +47,4 @@ public interface Host { void setMarkforDeletion(boolean markforDeletion); void setJwt(String jwt); - } diff --git a/core/src/main/java/com/arangodb/internal/net/HostImpl.java b/core/src/main/java/com/arangodb/internal/net/HostImpl.java index 1ef822618..af2722418 100644 --- a/core/src/main/java/com/arangodb/internal/net/HostImpl.java +++ b/core/src/main/java/com/arangodb/internal/net/HostImpl.java @@ -24,6 +24,7 @@ import com.arangodb.config.HostDescription; import java.io.IOException; +import java.util.concurrent.CompletableFuture; /** * @author Mark Vollmary @@ -51,10 +52,15 @@ public HostDescription getDescription() { } @Override - public Connection connection() { + public CompletableFuture connection() { return connectionPool.connection(); } + @Override + public void release(Connection c) { + connectionPool.release(c); + } + @Override public void closeOnError() { try { diff --git a/core/src/main/java/com/arangodb/internal/util/AsyncQueue.java b/core/src/main/java/com/arangodb/internal/util/AsyncQueue.java new file mode 100644 index 000000000..999bf2816 --- /dev/null +++ b/core/src/main/java/com/arangodb/internal/util/AsyncQueue.java @@ -0,0 +1,30 @@ +package com.arangodb.internal.util; + +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; + +public class AsyncQueue { + private final Queue> requests = new ArrayDeque<>(); + private final Queue offers = new ArrayDeque<>(); + + public synchronized CompletableFuture poll() { + CompletableFuture r = new CompletableFuture<>(); + T o = offers.poll(); + if (o != null) { + r.complete(o); + } else { + requests.add(r); + } + return r; + } + + public synchronized void offer(T o) { + CompletableFuture r = requests.poll(); + if (r != null) { + r.complete(o); + } else { + offers.add(o); + } + } +} diff --git a/http-protocol/src/main/java/com/arangodb/http/HttpConnection.java b/http-protocol/src/main/java/com/arangodb/http/HttpConnection.java index 87a47c760..683bb954d 100644 --- a/http-protocol/src/main/java/com/arangodb/http/HttpConnection.java +++ b/http-protocol/src/main/java/com/arangodb/http/HttpConnection.java @@ -63,6 +63,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static com.arangodb.internal.net.ConnectionPoolImpl.HTTP1_PIPELINING_LIMIT; +import static com.arangodb.internal.net.ConnectionPoolImpl.HTTP2_STREAMS; + /** * @author Mark Vollmary @@ -88,7 +91,6 @@ private static String getUserAgent() { } HttpConnection(final ArangoConfig config, final HostDescription host, final HttpProtocolConfig protocolConfig) { - super(); Protocol protocol = config.getProtocol(); ContentType contentType = ContentTypeFactory.of(protocol); if (contentType == ContentType.VPACK) { @@ -148,7 +150,9 @@ private static String getUserAgent() { .setLogActivity(true) .setKeepAlive(true) .setTcpKeepAlive(true) - .setPipelining(true) + .setPipelining(config.getPipelining()) + .setPipeliningLimit(HTTP1_PIPELINING_LIMIT) + .setHttp2MultiplexingLimit(HTTP2_STREAMS) .setReuseAddress(true) .setReusePort(true) .setHttp2ClearTextUpgrade(false) @@ -273,7 +277,7 @@ public CompletableFuture executeAsync(@UnstableApi final Inter return rfuture; } - public void doExecute(@UnstableApi final InternalRequest request, @UnstableApi final CompletableFuture rfuture) { + private void doExecute(@UnstableApi final InternalRequest request, @UnstableApi final CompletableFuture rfuture) { String path = buildUrl(request); HttpRequest httpRequest = client .request(requestTypeToHttpMethod(request.getRequestType()), path) diff --git a/test-functional/src/test/java/com/arangodb/internal/HostHandlerTest.java b/test-functional/src/test/java/com/arangodb/internal/HostHandlerTest.java index 674c14851..92061a62b 100644 --- a/test-functional/src/test/java/com/arangodb/internal/HostHandlerTest.java +++ b/test-functional/src/test/java/com/arangodb/internal/HostHandlerTest.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.fail; @@ -44,10 +45,15 @@ public Connection createConnection(HostDescription host) { } @Override - public Connection connection() { + public CompletableFuture connection() { return null; } + @Override + public void release(Connection connection) { + + } + @Override public void setJwt(String jwt) { diff --git a/test-functional/src/test/resources/simplelogger.properties b/test-functional/src/test/resources/simplelogger.properties index c375e20ef..a2a4ce6d5 100644 --- a/test-functional/src/test/resources/simplelogger.properties +++ b/test-functional/src/test/resources/simplelogger.properties @@ -10,3 +10,5 @@ org.slf4j.simpleLogger.defaultLogLevel=info #org.slf4j.simpleLogger.log.com.arangodb.internal.serde.JacksonUtils=debug #org.slf4j.simpleLogger.log.com.arangodb.internal.net.Communication=debug #org.slf4j.simpleLogger.log.com.arangodb.internal.serde.InternalSerdeImpl=debug +#org.slf4j.simpleLogger.log.io.netty.handler.logging.LoggingHandler=debug +#org.slf4j.simpleLogger.log.io.netty.handler.codec.http2.Http2FrameLogger=debug diff --git a/test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java b/test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java index 926c8a112..402450ce0 100644 --- a/test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java +++ b/test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java @@ -2,9 +2,12 @@ import com.arangodb.*; import com.arangodb.config.ArangoConfigProperties; +import com.arangodb.internal.net.ConnectionPoolImpl; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import util.TestUtils; import java.time.Duration; @@ -16,12 +19,13 @@ import static org.awaitility.Awaitility.await; public class ConnectionLoadBalanceTest { + private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionLoadBalanceTest.class); public static Stream configs() { return Stream.of( - // FIXME: DE-1017 - // new Config(Protocol.VST, 1), - // new Config(Protocol.VST, 2), +// FIXME: DE-1017 +// new Config(Protocol.VST, 1), +// new Config(Protocol.VST, 2), new Config(Protocol.HTTP_JSON, 10), new Config(Protocol.HTTP_JSON, 20), new Config(Protocol.HTTP2_JSON, 1), @@ -32,7 +36,7 @@ public static Stream configs() { // Test the requests load balancing across different connections, when all the slots except 1 are busy @MethodSource("configs") @ParameterizedTest - void loadBalanceToFreeConnection(Config cfg) throws InterruptedException { + void loadBalanceToAvailableSlots(Config cfg) throws InterruptedException { doTestLoadBalance(cfg, 1); } @@ -66,20 +70,30 @@ void doTestLoadBalance(Config cfg, int sleepCycles) throws InterruptedException CompletableFuture shortRunningTasks = CompletableFuture.allOf( IntStream.range(0, shortTasksCount) - .mapToObj(__ -> db.query("RETURN 1", Integer.class)) + .mapToObj(__ -> db.getVersion()) .toArray(CompletableFuture[]::new) ); + LOGGER.debug("awaiting..."); + await() .timeout(Duration.ofSeconds(sleepDuration * sleepCycles - 1L)) .until(shortRunningTasks::isDone); + LOGGER.debug("completed shortRunningTasks"); + + // join exceptional completions + shortRunningTasks.join(); + await() .timeout(Duration.ofSeconds(sleepDuration * sleepCycles + 1L)) .until(longRunningTasks::isDone); - shortRunningTasks.join(); + LOGGER.debug("completed longRunningTasks"); + + // join exceptional completions longRunningTasks.join(); + db.arango().shutdown(); } @@ -90,7 +104,7 @@ private record Config( int maxStreams() { return switch (protocol) { case HTTP_JSON, HTTP_VPACK -> 1; - default -> 32; + default -> ConnectionPoolImpl.HTTP2_STREAMS; }; } } diff --git a/test-non-functional/src/test/resources/arangodb-config-test.properties b/test-non-functional/src/test/resources/arangodb-config-test.properties index ef25aaf11..1d5e675af 100644 --- a/test-non-functional/src/test/resources/arangodb-config-test.properties +++ b/test-non-functional/src/test/resources/arangodb-config-test.properties @@ -7,6 +7,7 @@ adb.timeout=9876 adb.useSsl=true adb.verifyHost=false adb.chunkSize=1234 +adb.pipelining=true adb.maxConnections=123 adb.connectionTtl=12345 adb.keepAliveInterval=123456 diff --git a/test-non-functional/src/test/resources/simplelogger.properties b/test-non-functional/src/test/resources/simplelogger.properties index 7649bd6c7..d992c3d63 100644 --- a/test-non-functional/src/test/resources/simplelogger.properties +++ b/test-non-functional/src/test/resources/simplelogger.properties @@ -9,3 +9,5 @@ org.slf4j.simpleLogger.showShortLogName=false org.slf4j.simpleLogger.defaultLogLevel=info #org.slf4j.simpleLogger.log.com.arangodb.internal.serde.JacksonUtils=debug #org.slf4j.simpleLogger.log.com.arangodb.internal.net.Communication=debug +#org.slf4j.simpleLogger.log.io.netty.handler.logging.LoggingHandler=debug +#org.slf4j.simpleLogger.log.io.netty.handler.codec.http2.Http2FrameLogger=debug From 7e2c985837a1dec968af868048a4c2a7c9d1876a Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 26 May 2025 11:30:30 +0200 Subject: [PATCH 4/6] increase test waiting time --- .../java/com/arangodb/internal/net/ConnectionPoolImpl.java | 3 --- .../src/test/java/concurrency/ConnectionLoadBalanceTest.java | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java b/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java index f85c456ce..b055758d6 100644 --- a/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java +++ b/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java @@ -30,9 +30,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; -/** - * @author Mark Vollmary - */ public class ConnectionPoolImpl implements ConnectionPool { public static final int HTTP1_PIPELINING_LIMIT = 10; diff --git a/test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java b/test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java index 402450ce0..f6413bb73 100644 --- a/test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java +++ b/test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java @@ -86,7 +86,7 @@ void doTestLoadBalance(Config cfg, int sleepCycles) throws InterruptedException shortRunningTasks.join(); await() - .timeout(Duration.ofSeconds(sleepDuration * sleepCycles + 1L)) + .timeout(Duration.ofSeconds(sleepDuration * sleepCycles + 2L)) .until(longRunningTasks::isDone); LOGGER.debug("completed longRunningTasks"); From 973ff1f7ba55ed2026abd69904af677f194d693d Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 26 May 2025 21:55:01 +0200 Subject: [PATCH 5/6] non-blocking AsyncQueue::offer() --- .../internal/net/ConnectionPoolImpl.java | 13 ++++---- .../arangodb/internal/util/AsyncQueue.java | 32 +++++++++++++------ 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java b/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java index b055758d6..3a4c0d1c7 100644 --- a/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java +++ b/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java @@ -26,9 +26,9 @@ import com.arangodb.internal.util.AsyncQueue; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; public class ConnectionPoolImpl implements ConnectionPool { @@ -43,7 +43,7 @@ public class ConnectionPoolImpl implements ConnectionPool { private final ConnectionFactory factory; private final int maxSlots; private volatile String jwt = null; - private boolean closed = false; + private volatile boolean closed = false; public ConnectionPoolImpl(final HostDescription host, final ArangoConfig config, final ConnectionFactory factory) { super(); @@ -51,7 +51,7 @@ public ConnectionPoolImpl(final HostDescription host, final ArangoConfig config, this.config = config; this.maxConnections = config.getMaxConnections(); this.factory = factory; - connections = new ArrayList<>(); + connections = new CopyOnWriteArrayList<>(); switch (config.getProtocol()) { case HTTP_JSON: case HTTP_VPACK: @@ -70,7 +70,7 @@ public Connection createConnection(final HostDescription host) { } @Override - public synchronized CompletableFuture connection() { + public CompletableFuture connection() { if (closed) { throw new ArangoDBException("Connection pool already closed!"); } @@ -92,7 +92,7 @@ public void release(Connection connection) { } @Override - public synchronized void setJwt(String jwt) { + public void setJwt(String jwt) { if (jwt != null) { this.jwt = jwt; for (Connection connection : connections) { @@ -102,12 +102,11 @@ public synchronized void setJwt(String jwt) { } @Override - public synchronized void close() throws IOException { + public void close() throws IOException { closed = true; for (final Connection connection : connections) { connection.close(); } - connections.clear(); } @Override diff --git a/core/src/main/java/com/arangodb/internal/util/AsyncQueue.java b/core/src/main/java/com/arangodb/internal/util/AsyncQueue.java index 999bf2816..33a9128bf 100644 --- a/core/src/main/java/com/arangodb/internal/util/AsyncQueue.java +++ b/core/src/main/java/com/arangodb/internal/util/AsyncQueue.java @@ -1,30 +1,44 @@ package com.arangodb.internal.util; -import java.util.ArrayDeque; import java.util.Queue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; public class AsyncQueue { - private final Queue> requests = new ArrayDeque<>(); - private final Queue offers = new ArrayDeque<>(); + private final Queue> requests = new ConcurrentLinkedQueue<>(); + private final Queue offers = new ConcurrentLinkedQueue<>(); public synchronized CompletableFuture poll() { - CompletableFuture r = new CompletableFuture<>(); T o = offers.poll(); if (o != null) { - r.complete(o); + return CompletableFuture.completedFuture(o); } else { - requests.add(r); + CompletableFuture res = new CompletableFuture<>(); + requests.offer(res); + update(); + return res; } - return r; } - public synchronized void offer(T o) { + public void offer(T o) { CompletableFuture r = requests.poll(); if (r != null) { r.complete(o); } else { - offers.add(o); + offers.offer(o); + update(); + } + } + + private void update() { + CompletableFuture r; + T o; + synchronized (this) { + if (offers.isEmpty()) return; + r = requests.poll(); + if (r == null) return; + o = offers.poll(); } + r.complete(o); } } From 7cd12bdcc75616de478c9ef1e6332897e0998e85 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Tue, 27 May 2025 21:10:57 +0200 Subject: [PATCH 6/6] refactoring AsyncQueue --- .../arangodb/internal/net/Communication.java | 2 +- .../com/arangodb/internal/net/Connection.java | 2 + .../internal/net/ConnectionFactory.java | 2 +- .../arangodb/internal/net/ConnectionPool.java | 5 +- .../internal/net/ConnectionPoolImpl.java | 15 +++--- .../java/com/arangodb/internal/net/Host.java | 4 -- .../com/arangodb/internal/net/HostImpl.java | 15 ------ .../arangodb/internal/util/AsyncQueue.java | 47 ++++++++++--------- .../com/arangodb/http/HttpConnection.java | 30 +++++++----- .../arangodb/http/HttpConnectionFactory.java | 7 ++- .../arangodb/internal/HostHandlerTest.java | 2 +- .../ConnectionLoadBalanceTest.java | 12 +++-- .../ConnectionPoolConcurrencyTest.java | 13 ++--- .../test/resources/simplelogger.properties | 1 + .../vst/VstConnectionFactoryAsync.java | 5 +- .../arangodb/vst/internal/VstConnection.java | 10 +++- .../vst/internal/VstConnectionAsync.java | 7 +-- 17 files changed, 95 insertions(+), 84 deletions(-) diff --git a/core/src/main/java/com/arangodb/internal/net/Communication.java b/core/src/main/java/com/arangodb/internal/net/Communication.java index 1bbe59f25..26251e33d 100644 --- a/core/src/main/java/com/arangodb/internal/net/Communication.java +++ b/core/src/main/java/com/arangodb/internal/net/Communication.java @@ -52,7 +52,7 @@ private CompletableFuture executeAsync(final InternalRequest r long reqId = reqCount.getAndIncrement(); return host.connection().thenCompose(c -> doExecuteAsync(request, hostHandle, host, attemptCount, c, reqId) - .whenComplete((r, t) -> host.release(c))); + .whenComplete((r, t) -> c.release())); } private CompletableFuture doExecuteAsync( diff --git a/core/src/main/java/com/arangodb/internal/net/Connection.java b/core/src/main/java/com/arangodb/internal/net/Connection.java index b092448d3..461c5ccea 100644 --- a/core/src/main/java/com/arangodb/internal/net/Connection.java +++ b/core/src/main/java/com/arangodb/internal/net/Connection.java @@ -35,4 +35,6 @@ public interface Connection extends Closeable { void setJwt(String jwt); CompletableFuture executeAsync(InternalRequest request); + + void release(); } diff --git a/core/src/main/java/com/arangodb/internal/net/ConnectionFactory.java b/core/src/main/java/com/arangodb/internal/net/ConnectionFactory.java index b0fbbdf7b..0e01ca824 100644 --- a/core/src/main/java/com/arangodb/internal/net/ConnectionFactory.java +++ b/core/src/main/java/com/arangodb/internal/net/ConnectionFactory.java @@ -29,5 +29,5 @@ */ @UsedInApi public interface ConnectionFactory { - Connection create(ArangoConfig config, HostDescription host); + Connection create(ArangoConfig config, HostDescription host, ConnectionPool pool); } diff --git a/core/src/main/java/com/arangodb/internal/net/ConnectionPool.java b/core/src/main/java/com/arangodb/internal/net/ConnectionPool.java index 1fdb5130b..0db87c0c3 100644 --- a/core/src/main/java/com/arangodb/internal/net/ConnectionPool.java +++ b/core/src/main/java/com/arangodb/internal/net/ConnectionPool.java @@ -20,7 +20,7 @@ package com.arangodb.internal.net; -import com.arangodb.config.HostDescription; +import com.arangodb.arch.UsedInApi; import java.io.Closeable; import java.util.concurrent.CompletableFuture; @@ -28,9 +28,10 @@ /** * @author Mark Vollmary */ +@UsedInApi public interface ConnectionPool extends Closeable { - Connection createConnection(final HostDescription host); + Connection createConnection(); CompletableFuture connection(); diff --git a/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java b/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java index 3a4c0d1c7..9f22ee50a 100644 --- a/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java +++ b/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java @@ -32,8 +32,9 @@ public class ConnectionPoolImpl implements ConnectionPool { - public static final int HTTP1_PIPELINING_LIMIT = 10; - public static final int HTTP2_STREAMS = 32; // hard-coded, see BTS-2049 + public static final int HTTP1_SLOTS = 1; // HTTP/1: max 1 pending request + public static final int HTTP1_SLOTS_PIPELINING = 10; // HTTP/1: max pipelining + public static final int HTTP2_SLOTS = 32; // HTTP/2: max streams, hard-coded see BTS-2049 private final AsyncQueue slots = new AsyncQueue<>(); private final HostDescription host; @@ -55,16 +56,16 @@ public ConnectionPoolImpl(final HostDescription host, final ArangoConfig config, switch (config.getProtocol()) { case HTTP_JSON: case HTTP_VPACK: - maxSlots = config.getPipelining() ? HTTP1_PIPELINING_LIMIT : 1; + maxSlots = config.getPipelining() ? HTTP1_SLOTS_PIPELINING : HTTP1_SLOTS; break; default: - maxSlots = HTTP2_STREAMS; + maxSlots = HTTP2_SLOTS; } } @Override - public Connection createConnection(final HostDescription host) { - Connection c = factory.create(config, host); + public Connection createConnection() { + Connection c = factory.create(config, host, this); c.setJwt(jwt); return c; } @@ -76,7 +77,7 @@ public CompletableFuture connection() { } if (connections.size() < maxConnections) { - Connection connection = createConnection(host); + Connection connection = createConnection(); connections.add(connection); for (int i = 0; i < maxSlots; i++) { slots.offer((connection)); diff --git a/core/src/main/java/com/arangodb/internal/net/Host.java b/core/src/main/java/com/arangodb/internal/net/Host.java index 08f49516d..b2afdd8e1 100644 --- a/core/src/main/java/com/arangodb/internal/net/Host.java +++ b/core/src/main/java/com/arangodb/internal/net/Host.java @@ -36,10 +36,6 @@ public interface Host { CompletableFuture connection(); - void release(Connection c); - - void closeOnError(); - void close() throws IOException; boolean isMarkforDeletion(); diff --git a/core/src/main/java/com/arangodb/internal/net/HostImpl.java b/core/src/main/java/com/arangodb/internal/net/HostImpl.java index af2722418..0277f8246 100644 --- a/core/src/main/java/com/arangodb/internal/net/HostImpl.java +++ b/core/src/main/java/com/arangodb/internal/net/HostImpl.java @@ -20,7 +20,6 @@ package com.arangodb.internal.net; -import com.arangodb.ArangoDBException; import com.arangodb.config.HostDescription; import java.io.IOException; @@ -56,20 +55,6 @@ public CompletableFuture connection() { return connectionPool.connection(); } - @Override - public void release(Connection c) { - connectionPool.release(c); - } - - @Override - public void closeOnError() { - try { - connectionPool.close(); - } catch (final IOException e) { - throw ArangoDBException.of(e); - } - } - @Override public String toString() { return "HostImpl [connectionPool=" + connectionPool + ", description=" + description + ", markforDeletion=" diff --git a/core/src/main/java/com/arangodb/internal/util/AsyncQueue.java b/core/src/main/java/com/arangodb/internal/util/AsyncQueue.java index 33a9128bf..d3b1a223a 100644 --- a/core/src/main/java/com/arangodb/internal/util/AsyncQueue.java +++ b/core/src/main/java/com/arangodb/internal/util/AsyncQueue.java @@ -1,44 +1,45 @@ package com.arangodb.internal.util; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; import java.util.Queue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.*; public class AsyncQueue { + private static final Logger LOGGER = LoggerFactory.getLogger(AsyncQueue.class); private final Queue> requests = new ConcurrentLinkedQueue<>(); - private final Queue offers = new ConcurrentLinkedQueue<>(); + private final Queue offers = new ArrayDeque<>(); public synchronized CompletableFuture poll() { + LOGGER.trace("poll()"); T o = offers.poll(); if (o != null) { + LOGGER.trace("poll(): short-circuit: {}", o); return CompletableFuture.completedFuture(o); - } else { - CompletableFuture res = new CompletableFuture<>(); - requests.offer(res); - update(); - return res; } + CompletableFuture r = new CompletableFuture<>(); + LOGGER.trace("poll(): enqueue request: {}", r); + requests.add(r); + return r; } public void offer(T o) { + LOGGER.trace("offer({})", o); CompletableFuture r = requests.poll(); + if (r == null) { + synchronized (this) { + r = requests.poll(); + if (r == null) { + LOGGER.trace("offer({}): enqueue", o); + offers.add(o); + } + } + } if (r != null) { + LOGGER.trace("offer({}): short-circuit: {}", o, r); r.complete(o); - } else { - offers.offer(o); - update(); - } - } - - private void update() { - CompletableFuture r; - T o; - synchronized (this) { - if (offers.isEmpty()) return; - r = requests.poll(); - if (r == null) return; - o = offers.poll(); } - r.complete(o); } } diff --git a/http-protocol/src/main/java/com/arangodb/http/HttpConnection.java b/http-protocol/src/main/java/com/arangodb/http/HttpConnection.java index 683bb954d..d777f0e3f 100644 --- a/http-protocol/src/main/java/com/arangodb/http/HttpConnection.java +++ b/http-protocol/src/main/java/com/arangodb/http/HttpConnection.java @@ -29,6 +29,7 @@ import com.arangodb.internal.RequestType; import com.arangodb.internal.config.ArangoConfig; import com.arangodb.internal.net.Connection; +import com.arangodb.internal.net.ConnectionPool; import com.arangodb.internal.serde.ContentTypeFactory; import com.arangodb.internal.util.EncodeUtils; import io.netty.handler.ssl.ApplicationProtocolConfig; @@ -63,8 +64,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import static com.arangodb.internal.net.ConnectionPoolImpl.HTTP1_PIPELINING_LIMIT; -import static com.arangodb.internal.net.ConnectionPoolImpl.HTTP2_STREAMS; +import static com.arangodb.internal.net.ConnectionPoolImpl.HTTP1_SLOTS_PIPELINING; +import static com.arangodb.internal.net.ConnectionPoolImpl.HTTP2_SLOTS; /** @@ -84,13 +85,16 @@ public class HttpConnection implements Connection { private final WebClient client; private final Integer timeout; private final MultiMap commonHeaders = MultiMap.caseInsensitiveMultiMap(); + private final Vertx vertx; private final Vertx vertxToClose; + private final ConnectionPool pool; private static String getUserAgent() { return "JavaDriver/" + PackageVersion.VERSION + " (JVM/" + System.getProperty("java.specification.version") + ")"; } - HttpConnection(final ArangoConfig config, final HostDescription host, final HttpProtocolConfig protocolConfig) { + HttpConnection(final ArangoConfig config, final HttpProtocolConfig protocolConfig, final HostDescription host, final ConnectionPool pool) { + this.pool = pool; Protocol protocol = config.getProtocol(); ContentType contentType = ContentTypeFactory.of(protocol); if (contentType == ContentType.VPACK) { @@ -114,20 +118,19 @@ private static String getUserAgent() { config.getUser(), Optional.ofNullable(config.getPassword()).orElse("") ).toHttpAuthorization(); - Vertx vertxToUse; if (protocolConfig.getVertx() != null) { // reuse existing Vert.x - vertxToUse = protocolConfig.getVertx(); + vertx = protocolConfig.getVertx(); // Vert.x will not be closed when connection is closed vertxToClose = null; LOGGER.debug("Reusing existing Vert.x instance"); } else { // create a new Vert.x instance LOGGER.debug("Creating new Vert.x instance"); - vertxToUse = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true).setEventLoopPoolSize(1)); - vertxToUse.runOnContext(e -> Thread.currentThread().setName("adb-http-" + THREAD_COUNT.getAndIncrement())); + vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true).setEventLoopPoolSize(1)); + vertx.runOnContext(e -> Thread.currentThread().setName("adb-http-" + THREAD_COUNT.getAndIncrement())); // Vert.x be closed when connection is closed - vertxToClose = vertxToUse; + vertxToClose = vertx; } int intTtl = Optional.ofNullable(config.getConnectionTtl()) @@ -151,8 +154,8 @@ private static String getUserAgent() { .setKeepAlive(true) .setTcpKeepAlive(true) .setPipelining(config.getPipelining()) - .setPipeliningLimit(HTTP1_PIPELINING_LIMIT) - .setHttp2MultiplexingLimit(HTTP2_STREAMS) + .setPipeliningLimit(HTTP1_SLOTS_PIPELINING) + .setHttp2MultiplexingLimit(HTTP2_SLOTS) .setReuseAddress(true) .setReusePort(true) .setHttp2ClearTextUpgrade(false) @@ -209,7 +212,7 @@ public SslContextFactory sslContextFactory() { }); } - client = WebClient.create(vertxToUse, webClientOptions); + client = WebClient.create(vertx, webClientOptions); } private static String buildUrl(final InternalRequest request) { @@ -269,6 +272,11 @@ private HttpMethod requestTypeToHttpMethod(RequestType requestType) { } } + @Override + public void release() { + vertx.runOnContext(__ -> pool.release(this)); + } + @Override @UnstableApi public CompletableFuture executeAsync(@UnstableApi final InternalRequest request) { diff --git a/http-protocol/src/main/java/com/arangodb/http/HttpConnectionFactory.java b/http-protocol/src/main/java/com/arangodb/http/HttpConnectionFactory.java index 69980aec3..72c8c9086 100644 --- a/http-protocol/src/main/java/com/arangodb/http/HttpConnectionFactory.java +++ b/http-protocol/src/main/java/com/arangodb/http/HttpConnectionFactory.java @@ -26,6 +26,7 @@ import com.arangodb.internal.config.ArangoConfig; import com.arangodb.internal.net.Connection; import com.arangodb.internal.net.ConnectionFactory; +import com.arangodb.internal.net.ConnectionPool; import io.vertx.core.Vertx; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +50,9 @@ public HttpConnectionFactory(@UnstableApi final HttpProtocolConfig cfg) { @Override @UnstableApi - public Connection create(@UnstableApi final ArangoConfig config, final HostDescription host) { - return new HttpConnection(config, host, protocolConfig); + public Connection create(@UnstableApi final ArangoConfig config, + final HostDescription host, + @UnstableApi final ConnectionPool pool) { + return new HttpConnection(config, protocolConfig, host, pool); } } diff --git a/test-functional/src/test/java/com/arangodb/internal/HostHandlerTest.java b/test-functional/src/test/java/com/arangodb/internal/HostHandlerTest.java index 92061a62b..109a9eb5e 100644 --- a/test-functional/src/test/java/com/arangodb/internal/HostHandlerTest.java +++ b/test-functional/src/test/java/com/arangodb/internal/HostHandlerTest.java @@ -40,7 +40,7 @@ class HostHandlerTest { private static final ConnectionPool mockCP = new ConnectionPool() { @Override - public Connection createConnection(HostDescription host) { + public Connection createConnection() { return null; } diff --git a/test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java b/test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java index f6413bb73..a3f5200a2 100644 --- a/test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java +++ b/test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java @@ -23,9 +23,9 @@ public class ConnectionLoadBalanceTest { public static Stream configs() { return Stream.of( -// FIXME: DE-1017 -// new Config(Protocol.VST, 1), -// new Config(Protocol.VST, 2), + // FIXME: DE-1017 + // new Config(Protocol.VST, 1), + // new Config(Protocol.VST, 2), new Config(Protocol.HTTP_JSON, 10), new Config(Protocol.HTTP_JSON, 20), new Config(Protocol.HTTP2_JSON, 1), @@ -59,6 +59,8 @@ void doTestLoadBalance(Config cfg, int sleepCycles) throws InterruptedException .maxConnections(cfg.maxConnections) .build().async().db(); + LOGGER.debug("starting..."); + CompletableFuture longRunningTasks = CompletableFuture.allOf( IntStream.range(0, longTasksCount) .mapToObj(__ -> @@ -103,8 +105,8 @@ private record Config( ) { int maxStreams() { return switch (protocol) { - case HTTP_JSON, HTTP_VPACK -> 1; - default -> ConnectionPoolImpl.HTTP2_STREAMS; + case HTTP_JSON, HTTP_VPACK -> ConnectionPoolImpl.HTTP1_SLOTS; + default -> ConnectionPoolImpl.HTTP2_SLOTS; }; } } diff --git a/test-non-functional/src/test/java/concurrency/ConnectionPoolConcurrencyTest.java b/test-non-functional/src/test/java/concurrency/ConnectionPoolConcurrencyTest.java index 618f229f3..bf9641e0c 100644 --- a/test-non-functional/src/test/java/concurrency/ConnectionPoolConcurrencyTest.java +++ b/test-non-functional/src/test/java/concurrency/ConnectionPoolConcurrencyTest.java @@ -4,10 +4,7 @@ import com.arangodb.internal.InternalRequest; import com.arangodb.internal.InternalResponse; import com.arangodb.internal.config.ArangoConfig; -import com.arangodb.internal.net.Connection; -import com.arangodb.internal.net.ConnectionFactory; -import com.arangodb.internal.net.ConnectionPool; -import com.arangodb.internal.net.ConnectionPoolImpl; +import com.arangodb.internal.net.*; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -23,7 +20,7 @@ public class ConnectionPoolConcurrencyTest { cfg.setMaxConnections(10_000); } - private final ConnectionFactory cf = (config, host) -> new Connection() { + private final ConnectionFactory cf = (config, host, pool) -> new Connection() { @Override public void setJwt(String jwt) { } @@ -33,6 +30,10 @@ public CompletableFuture executeAsync(InternalRequest request) throw new UnsupportedOperationException(); } + @Override + public void release() { + } + @Override public void close() { } @@ -45,7 +46,7 @@ void foo() throws InterruptedException, ExecutionException, IOException { List> futures = es.invokeAll(Collections.nCopies(8, (Callable) () -> { for (int i = 0; i < 10_000; i++) { - cp.createConnection(HostDescription.parse("127.0.0.1:8529")); + cp.createConnection(); cp.connection(); cp.setJwt("foo"); } diff --git a/test-non-functional/src/test/resources/simplelogger.properties b/test-non-functional/src/test/resources/simplelogger.properties index d992c3d63..495a73812 100644 --- a/test-non-functional/src/test/resources/simplelogger.properties +++ b/test-non-functional/src/test/resources/simplelogger.properties @@ -11,3 +11,4 @@ org.slf4j.simpleLogger.defaultLogLevel=info #org.slf4j.simpleLogger.log.com.arangodb.internal.net.Communication=debug #org.slf4j.simpleLogger.log.io.netty.handler.logging.LoggingHandler=debug #org.slf4j.simpleLogger.log.io.netty.handler.codec.http2.Http2FrameLogger=debug +#org.slf4j.simpleLogger.log.com.arangodb.internal.util.AsyncQueue=trace diff --git a/vst-protocol/src/main/java/com/arangodb/vst/VstConnectionFactoryAsync.java b/vst-protocol/src/main/java/com/arangodb/vst/VstConnectionFactoryAsync.java index f0faca44f..1db7852a0 100644 --- a/vst-protocol/src/main/java/com/arangodb/vst/VstConnectionFactoryAsync.java +++ b/vst-protocol/src/main/java/com/arangodb/vst/VstConnectionFactoryAsync.java @@ -25,6 +25,7 @@ import com.arangodb.internal.config.ArangoConfig; import com.arangodb.internal.net.Connection; import com.arangodb.internal.net.ConnectionFactory; +import com.arangodb.internal.net.ConnectionPool; import com.arangodb.vst.internal.VstConnectionAsync; /** @@ -35,8 +36,8 @@ public class VstConnectionFactoryAsync implements ConnectionFactory { @Override @UnstableApi - public Connection create(@UnstableApi final ArangoConfig config, final HostDescription host) { - return new VstConnectionAsync(config, host); + public Connection create(@UnstableApi final ArangoConfig config, final HostDescription host, @UnstableApi final ConnectionPool pool) { + return new VstConnectionAsync(config, host, pool); } } diff --git a/vst-protocol/src/main/java/com/arangodb/vst/internal/VstConnection.java b/vst-protocol/src/main/java/com/arangodb/vst/internal/VstConnection.java index ddd886d10..8b4cdc211 100644 --- a/vst-protocol/src/main/java/com/arangodb/vst/internal/VstConnection.java +++ b/vst-protocol/src/main/java/com/arangodb/vst/internal/VstConnection.java @@ -25,6 +25,7 @@ import com.arangodb.internal.ArangoDefaults; import com.arangodb.internal.config.ArangoConfig; import com.arangodb.internal.net.Connection; +import com.arangodb.internal.net.ConnectionPool; import com.arangodb.velocypack.VPackBuilder; import com.arangodb.velocypack.VPackSlice; import com.arangodb.velocypack.ValueType; @@ -68,6 +69,7 @@ public abstract class VstConnection implements Connection { private final HostDescription host; private final Map sendTimestamps = new ConcurrentHashMap<>(); private final String connectionName; + private final ConnectionPool pool; private final byte[] keepAliveRequest = new VPackBuilder() .add(ValueType.ARRAY) .add(1) @@ -89,7 +91,7 @@ public abstract class VstConnection implements Connection { private OutputStream outputStream; private InputStream inputStream; - protected VstConnection(final ArangoConfig config, final HostDescription host) { + protected VstConnection(final ArangoConfig config, final HostDescription host, final ConnectionPool pool) { super(); timeout = config.getTimeout(); ttl = config.getConnectionTtl(); @@ -97,6 +99,7 @@ protected VstConnection(final ArangoConfig config, final HostDescription host) { useSsl = config.getUseSsl(); sslContext = config.getSslContext(); this.host = host; + this.pool = pool; connectionName = "connection_" + System.currentTimeMillis() + "_" + Math.random(); LOGGER.debug("[" + connectionName + "]: Connection created"); @@ -244,6 +247,11 @@ public synchronized void close() { } } + @Override + public void release() { + pool.release(this); + } + private synchronized void sendProtocolHeader() throws IOException { if (LOGGER.isDebugEnabled()) { LOGGER.debug(String.format("[%s]: Send velocystream protocol header to %s", connectionName, socket)); diff --git a/vst-protocol/src/main/java/com/arangodb/vst/internal/VstConnectionAsync.java b/vst-protocol/src/main/java/com/arangodb/vst/internal/VstConnectionAsync.java index 8b74cbd57..5b128340e 100644 --- a/vst-protocol/src/main/java/com/arangodb/vst/internal/VstConnectionAsync.java +++ b/vst-protocol/src/main/java/com/arangodb/vst/internal/VstConnectionAsync.java @@ -25,6 +25,7 @@ import com.arangodb.internal.InternalRequest; import com.arangodb.internal.InternalResponse; import com.arangodb.internal.config.ArangoConfig; +import com.arangodb.internal.net.ConnectionPool; import com.arangodb.internal.serde.InternalSerde; import com.arangodb.velocypack.VPackSlice; import com.arangodb.velocypack.exception.VPackParserException; @@ -51,8 +52,8 @@ public class VstConnectionAsync extends VstConnection private final InternalSerde serde; - public VstConnectionAsync(final ArangoConfig config, final HostDescription host) { - super(config, host); + public VstConnectionAsync(final ArangoConfig config, final HostDescription host, final ConnectionPool pool) { + super(config, host, pool); chunkSize = config.getChunkSize(); serde = config.getInternalSerde(); } @@ -98,7 +99,7 @@ public CompletableFuture executeAsync(final InternalRequest re return; } rfuture.complete(response); - } else { + } else { Throwable e = ex instanceof CompletionException ? ex.getCause() : ex; rfuture.completeExceptionally(e); }