diff --git a/core/pom.xml b/core/pom.xml index df4126689..7e7c87a39 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -8,7 +8,7 @@ ../release-parent com.arangodb release-parent - 7.18.0 + 7.19.0-SNAPSHOT core diff --git a/core/src/main/java/com/arangodb/ArangoDB.java b/core/src/main/java/com/arangodb/ArangoDB.java index a865e0306..7ebdb2e7e 100644 --- a/core/src/main/java/com/arangodb/ArangoDB.java +++ b/core/src/main/java/com/arangodb/ArangoDB.java @@ -517,6 +517,17 @@ public Builder chunkSize(final Integer chunkSize) { return this; } + /** + * Set whether to use requests pipelining in HTTP/1.1 ({@link Protocol#HTTP_JSON} or {@link Protocol#HTTP_VPACK}). + * + * @param pipelining {@code true} if enabled + * @return {@link ArangoDB.Builder} + */ + public Builder pipelining(final Boolean pipelining) { + config.setPipelining(pipelining); + return this; + } + /** * Sets the maximum number of connections the built in connection pool will open per host. * diff --git a/core/src/main/java/com/arangodb/config/ArangoConfigProperties.java b/core/src/main/java/com/arangodb/config/ArangoConfigProperties.java index 047d55280..832c8862d 100644 --- a/core/src/main/java/com/arangodb/config/ArangoConfigProperties.java +++ b/core/src/main/java/com/arangodb/config/ArangoConfigProperties.java @@ -21,6 +21,7 @@ public interface ArangoConfigProperties { String KEY_USE_SSL = "useSsl"; String KEY_VERIFY_HOST = "verifyHost"; String KEY_CHUNK_SIZE = "chunkSize"; + String KEY_PIPELINING = "pipelining"; String KEY_MAX_CONNECTIONS = "maxConnections"; String KEY_CONNECTION_TTL = "connectionTtl"; String KEY_KEEP_ALIVE_INTERVAL = "keepAliveInterval"; @@ -110,6 +111,10 @@ default Optional getChunkSize() { return Optional.empty(); } + default Optional getPipelining() { + return Optional.empty(); + } + default Optional getMaxConnections() { return Optional.empty(); } diff --git a/core/src/main/java/com/arangodb/internal/ArangoDefaults.java b/core/src/main/java/com/arangodb/internal/ArangoDefaults.java index b08c045da..25448187c 100644 --- a/core/src/main/java/com/arangodb/internal/ArangoDefaults.java +++ b/core/src/main/java/com/arangodb/internal/ArangoDefaults.java @@ -50,6 +50,7 @@ public final class ArangoDefaults { public static final Boolean DEFAULT_USE_SSL = false; public static final Boolean DEFAULT_VERIFY_HOST = true; public static final Integer DEFAULT_CHUNK_SIZE = 30_000; + public static final Boolean DEFAULT_PIPELINING = false; public static final Boolean DEFAULT_ACQUIRE_HOST_LIST = false; public static final Integer DEFAULT_ACQUIRE_HOST_LIST_INTERVAL = 60 * 60 * 1000; // hour public static final LoadBalancingStrategy DEFAULT_LOAD_BALANCING_STRATEGY = LoadBalancingStrategy.NONE; diff --git a/core/src/main/java/com/arangodb/internal/config/ArangoConfig.java b/core/src/main/java/com/arangodb/internal/config/ArangoConfig.java index f38903532..f95e0a4e2 100644 --- a/core/src/main/java/com/arangodb/internal/config/ArangoConfig.java +++ b/core/src/main/java/com/arangodb/internal/config/ArangoConfig.java @@ -33,6 +33,7 @@ public class ArangoConfig { private SSLContext sslContext; private Boolean verifyHost; private Integer chunkSize; + private Boolean pipelining; private Integer maxConnections; private Long connectionTtl; private Integer keepAliveInterval; @@ -70,6 +71,7 @@ public void loadProperties(final ArangoConfigProperties properties) { useSsl = properties.getUseSsl().orElse(ArangoDefaults.DEFAULT_USE_SSL); verifyHost = properties.getVerifyHost().orElse(ArangoDefaults.DEFAULT_VERIFY_HOST); chunkSize = properties.getChunkSize().orElse(ArangoDefaults.DEFAULT_CHUNK_SIZE); + pipelining = properties.getPipelining().orElse(ArangoDefaults.DEFAULT_PIPELINING); // FIXME: make maxConnections field Optional maxConnections = properties.getMaxConnections().orElse(null); // FIXME: make connectionTtl field Optional @@ -173,6 +175,14 @@ public void setChunkSize(Integer chunkSize) { this.chunkSize = chunkSize; } + public Boolean getPipelining() { + return pipelining; + } + + public void setPipelining(Boolean pipelining) { + this.pipelining = pipelining; + } + public Integer getMaxConnections() { if (maxConnections == null) { maxConnections = getDefaultMaxConnections(); diff --git a/core/src/main/java/com/arangodb/internal/config/ArangoConfigPropertiesImpl.java b/core/src/main/java/com/arangodb/internal/config/ArangoConfigPropertiesImpl.java index e4502d3d8..f7d865f81 100644 --- a/core/src/main/java/com/arangodb/internal/config/ArangoConfigPropertiesImpl.java +++ b/core/src/main/java/com/arangodb/internal/config/ArangoConfigPropertiesImpl.java @@ -119,6 +119,11 @@ public Optional getChunkSize() { return Optional.ofNullable(getProperty(KEY_CHUNK_SIZE)).map(Integer::valueOf); } + @Override + public Optional getPipelining() { + return Optional.ofNullable(getProperty(KEY_PIPELINING)).map(Boolean::valueOf); + } + @Override public Optional getMaxConnections() { return Optional.ofNullable(getProperty(KEY_MAX_CONNECTIONS)).map(Integer::valueOf); diff --git a/core/src/main/java/com/arangodb/internal/net/Communication.java b/core/src/main/java/com/arangodb/internal/net/Communication.java index 0309cd04c..26251e33d 100644 --- a/core/src/main/java/com/arangodb/internal/net/Communication.java +++ b/core/src/main/java/com/arangodb/internal/net/Communication.java @@ -50,7 +50,9 @@ public CompletableFuture executeAsync(final InternalRequest re private CompletableFuture executeAsync(final InternalRequest request, final HostHandle hostHandle, final Host host, final int attemptCount) { long reqId = reqCount.getAndIncrement(); - return doExecuteAsync(request, hostHandle, host, attemptCount, host.connection(), reqId); + return host.connection().thenCompose(c -> + doExecuteAsync(request, hostHandle, host, attemptCount, c, reqId) + .whenComplete((r, t) -> c.release())); } private CompletableFuture doExecuteAsync( diff --git a/core/src/main/java/com/arangodb/internal/net/Connection.java b/core/src/main/java/com/arangodb/internal/net/Connection.java index b092448d3..461c5ccea 100644 --- a/core/src/main/java/com/arangodb/internal/net/Connection.java +++ b/core/src/main/java/com/arangodb/internal/net/Connection.java @@ -35,4 +35,6 @@ public interface Connection extends Closeable { void setJwt(String jwt); CompletableFuture executeAsync(InternalRequest request); + + void release(); } diff --git a/core/src/main/java/com/arangodb/internal/net/ConnectionFactory.java b/core/src/main/java/com/arangodb/internal/net/ConnectionFactory.java index b0fbbdf7b..0e01ca824 100644 --- a/core/src/main/java/com/arangodb/internal/net/ConnectionFactory.java +++ b/core/src/main/java/com/arangodb/internal/net/ConnectionFactory.java @@ -29,5 +29,5 @@ */ @UsedInApi public interface ConnectionFactory { - Connection create(ArangoConfig config, HostDescription host); + Connection create(ArangoConfig config, HostDescription host, ConnectionPool pool); } diff --git a/core/src/main/java/com/arangodb/internal/net/ConnectionPool.java b/core/src/main/java/com/arangodb/internal/net/ConnectionPool.java index 91c12bb02..0db87c0c3 100644 --- a/core/src/main/java/com/arangodb/internal/net/ConnectionPool.java +++ b/core/src/main/java/com/arangodb/internal/net/ConnectionPool.java @@ -20,18 +20,22 @@ package com.arangodb.internal.net; -import com.arangodb.config.HostDescription; +import com.arangodb.arch.UsedInApi; import java.io.Closeable; +import java.util.concurrent.CompletableFuture; /** * @author Mark Vollmary */ +@UsedInApi public interface ConnectionPool extends Closeable { - Connection createConnection(final HostDescription host); + Connection createConnection(); - Connection connection(); + CompletableFuture connection(); + + void release(final Connection connection); void setJwt(String jwt); diff --git a/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java b/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java index 8337a67bf..9f22ee50a 100644 --- a/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java +++ b/core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java @@ -23,24 +23,28 @@ import com.arangodb.ArangoDBException; import com.arangodb.config.HostDescription; import com.arangodb.internal.config.ArangoConfig; +import com.arangodb.internal.util.AsyncQueue; import java.io.IOException; -import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; -/** - * @author Mark Vollmary - */ public class ConnectionPoolImpl implements ConnectionPool { + public static final int HTTP1_SLOTS = 1; // HTTP/1: max 1 pending request + public static final int HTTP1_SLOTS_PIPELINING = 10; // HTTP/1: max pipelining + public static final int HTTP2_SLOTS = 32; // HTTP/2: max streams, hard-coded see BTS-2049 + + private final AsyncQueue slots = new AsyncQueue<>(); private final HostDescription host; private final ArangoConfig config; private final int maxConnections; private final List connections; private final ConnectionFactory factory; - private int current; + private final int maxSlots; private volatile String jwt = null; - private boolean closed = false; + private volatile boolean closed = false; public ConnectionPoolImpl(final HostDescription host, final ArangoConfig config, final ConnectionFactory factory) { super(); @@ -48,39 +52,48 @@ public ConnectionPoolImpl(final HostDescription host, final ArangoConfig config, this.config = config; this.maxConnections = config.getMaxConnections(); this.factory = factory; - connections = new ArrayList<>(); - current = 0; + connections = new CopyOnWriteArrayList<>(); + switch (config.getProtocol()) { + case HTTP_JSON: + case HTTP_VPACK: + maxSlots = config.getPipelining() ? HTTP1_SLOTS_PIPELINING : HTTP1_SLOTS; + break; + default: + maxSlots = HTTP2_SLOTS; + } } @Override - public Connection createConnection(final HostDescription host) { - Connection c = factory.create(config, host); + public Connection createConnection() { + Connection c = factory.create(config, host, this); c.setJwt(jwt); return c; } @Override - public synchronized Connection connection() { + public CompletableFuture connection() { if (closed) { throw new ArangoDBException("Connection pool already closed!"); } - final Connection connection; - if (connections.size() < maxConnections) { - connection = createConnection(host); + Connection connection = createConnection(); connections.add(connection); - current++; - } else { - final int index = Math.floorMod(current++, connections.size()); - connection = connections.get(index); + for (int i = 0; i < maxSlots; i++) { + slots.offer((connection)); + } } - return connection; + return slots.poll(); + } + + @Override + public void release(Connection connection) { + slots.offer(connection); } @Override - public synchronized void setJwt(String jwt) { + public void setJwt(String jwt) { if (jwt != null) { this.jwt = jwt; for (Connection connection : connections) { @@ -90,18 +103,17 @@ public synchronized void setJwt(String jwt) { } @Override - public synchronized void close() throws IOException { + public void close() throws IOException { closed = true; for (final Connection connection : connections) { connection.close(); } - connections.clear(); } @Override public String toString() { return "ConnectionPoolImpl [host=" + host + ", maxConnections=" + maxConnections + ", connections=" - + connections.size() + ", current=" + current + ", factory=" + factory.getClass().getSimpleName() + "]"; + + connections.size() + ", factory=" + factory.getClass().getSimpleName() + "]"; } } diff --git a/core/src/main/java/com/arangodb/internal/net/Host.java b/core/src/main/java/com/arangodb/internal/net/Host.java index 07fd3c6ee..b2afdd8e1 100644 --- a/core/src/main/java/com/arangodb/internal/net/Host.java +++ b/core/src/main/java/com/arangodb/internal/net/Host.java @@ -24,6 +24,7 @@ import com.arangodb.config.HostDescription; import java.io.IOException; +import java.util.concurrent.CompletableFuture; /** * @author Mark Vollmary @@ -33,9 +34,7 @@ public interface Host { HostDescription getDescription(); - Connection connection(); - - void closeOnError(); + CompletableFuture connection(); void close() throws IOException; @@ -44,5 +43,4 @@ public interface Host { void setMarkforDeletion(boolean markforDeletion); void setJwt(String jwt); - } diff --git a/core/src/main/java/com/arangodb/internal/net/HostImpl.java b/core/src/main/java/com/arangodb/internal/net/HostImpl.java index 1ef822618..0277f8246 100644 --- a/core/src/main/java/com/arangodb/internal/net/HostImpl.java +++ b/core/src/main/java/com/arangodb/internal/net/HostImpl.java @@ -20,10 +20,10 @@ package com.arangodb.internal.net; -import com.arangodb.ArangoDBException; import com.arangodb.config.HostDescription; import java.io.IOException; +import java.util.concurrent.CompletableFuture; /** * @author Mark Vollmary @@ -51,19 +51,10 @@ public HostDescription getDescription() { } @Override - public Connection connection() { + public CompletableFuture connection() { return connectionPool.connection(); } - @Override - public void closeOnError() { - try { - connectionPool.close(); - } catch (final IOException e) { - throw ArangoDBException.of(e); - } - } - @Override public String toString() { return "HostImpl [connectionPool=" + connectionPool + ", description=" + description + ", markforDeletion=" diff --git a/core/src/main/java/com/arangodb/internal/util/AsyncQueue.java b/core/src/main/java/com/arangodb/internal/util/AsyncQueue.java new file mode 100644 index 000000000..d3b1a223a --- /dev/null +++ b/core/src/main/java/com/arangodb/internal/util/AsyncQueue.java @@ -0,0 +1,45 @@ +package com.arangodb.internal.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.*; + +public class AsyncQueue { + private static final Logger LOGGER = LoggerFactory.getLogger(AsyncQueue.class); + private final Queue> requests = new ConcurrentLinkedQueue<>(); + private final Queue offers = new ArrayDeque<>(); + + public synchronized CompletableFuture poll() { + LOGGER.trace("poll()"); + T o = offers.poll(); + if (o != null) { + LOGGER.trace("poll(): short-circuit: {}", o); + return CompletableFuture.completedFuture(o); + } + CompletableFuture r = new CompletableFuture<>(); + LOGGER.trace("poll(): enqueue request: {}", r); + requests.add(r); + return r; + } + + public void offer(T o) { + LOGGER.trace("offer({})", o); + CompletableFuture r = requests.poll(); + if (r == null) { + synchronized (this) { + r = requests.poll(); + if (r == null) { + LOGGER.trace("offer({}): enqueue", o); + offers.add(o); + } + } + } + if (r != null) { + LOGGER.trace("offer({}): short-circuit: {}", o, r); + r.complete(o); + } + } +} diff --git a/docker/start_db.sh b/docker/start_db.sh index 4c0c2e9a2..e8c58ebcd 100755 --- a/docker/start_db.sh +++ b/docker/start_db.sh @@ -66,7 +66,7 @@ docker run -d \ --starter.address="${GW}" \ --docker.image="${DOCKER_IMAGE}" \ --starter.local --starter.mode=${STARTER_MODE} --all.log.level=debug --all.log.output=+ --log.verbose \ - --all.server.descriptors-minimum=1024 --all.javascript.allow-admin-execute=true + --all.server.descriptors-minimum=1024 --all.javascript.allow-admin-execute=true --all.server.maximal-threads=128 wait_server() { diff --git a/driver/pom.xml b/driver/pom.xml index 2d31dfb0c..542b239a5 100644 --- a/driver/pom.xml +++ b/driver/pom.xml @@ -8,7 +8,7 @@ ../release-parent com.arangodb release-parent - 7.18.0 + 7.19.0-SNAPSHOT arangodb-java-driver diff --git a/http-protocol/pom.xml b/http-protocol/pom.xml index d9b538f46..2bd8fc87c 100644 --- a/http-protocol/pom.xml +++ b/http-protocol/pom.xml @@ -8,7 +8,7 @@ ../release-parent com.arangodb release-parent - 7.18.0 + 7.19.0-SNAPSHOT http-protocol diff --git a/http-protocol/src/main/java/com/arangodb/http/HttpConnection.java b/http-protocol/src/main/java/com/arangodb/http/HttpConnection.java index 87a47c760..d777f0e3f 100644 --- a/http-protocol/src/main/java/com/arangodb/http/HttpConnection.java +++ b/http-protocol/src/main/java/com/arangodb/http/HttpConnection.java @@ -29,6 +29,7 @@ import com.arangodb.internal.RequestType; import com.arangodb.internal.config.ArangoConfig; import com.arangodb.internal.net.Connection; +import com.arangodb.internal.net.ConnectionPool; import com.arangodb.internal.serde.ContentTypeFactory; import com.arangodb.internal.util.EncodeUtils; import io.netty.handler.ssl.ApplicationProtocolConfig; @@ -63,6 +64,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static com.arangodb.internal.net.ConnectionPoolImpl.HTTP1_SLOTS_PIPELINING; +import static com.arangodb.internal.net.ConnectionPoolImpl.HTTP2_SLOTS; + /** * @author Mark Vollmary @@ -81,14 +85,16 @@ public class HttpConnection implements Connection { private final WebClient client; private final Integer timeout; private final MultiMap commonHeaders = MultiMap.caseInsensitiveMultiMap(); + private final Vertx vertx; private final Vertx vertxToClose; + private final ConnectionPool pool; private static String getUserAgent() { return "JavaDriver/" + PackageVersion.VERSION + " (JVM/" + System.getProperty("java.specification.version") + ")"; } - HttpConnection(final ArangoConfig config, final HostDescription host, final HttpProtocolConfig protocolConfig) { - super(); + HttpConnection(final ArangoConfig config, final HttpProtocolConfig protocolConfig, final HostDescription host, final ConnectionPool pool) { + this.pool = pool; Protocol protocol = config.getProtocol(); ContentType contentType = ContentTypeFactory.of(protocol); if (contentType == ContentType.VPACK) { @@ -112,20 +118,19 @@ private static String getUserAgent() { config.getUser(), Optional.ofNullable(config.getPassword()).orElse("") ).toHttpAuthorization(); - Vertx vertxToUse; if (protocolConfig.getVertx() != null) { // reuse existing Vert.x - vertxToUse = protocolConfig.getVertx(); + vertx = protocolConfig.getVertx(); // Vert.x will not be closed when connection is closed vertxToClose = null; LOGGER.debug("Reusing existing Vert.x instance"); } else { // create a new Vert.x instance LOGGER.debug("Creating new Vert.x instance"); - vertxToUse = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true).setEventLoopPoolSize(1)); - vertxToUse.runOnContext(e -> Thread.currentThread().setName("adb-http-" + THREAD_COUNT.getAndIncrement())); + vertx = Vertx.vertx(new VertxOptions().setPreferNativeTransport(true).setEventLoopPoolSize(1)); + vertx.runOnContext(e -> Thread.currentThread().setName("adb-http-" + THREAD_COUNT.getAndIncrement())); // Vert.x be closed when connection is closed - vertxToClose = vertxToUse; + vertxToClose = vertx; } int intTtl = Optional.ofNullable(config.getConnectionTtl()) @@ -148,7 +153,9 @@ private static String getUserAgent() { .setLogActivity(true) .setKeepAlive(true) .setTcpKeepAlive(true) - .setPipelining(true) + .setPipelining(config.getPipelining()) + .setPipeliningLimit(HTTP1_SLOTS_PIPELINING) + .setHttp2MultiplexingLimit(HTTP2_SLOTS) .setReuseAddress(true) .setReusePort(true) .setHttp2ClearTextUpgrade(false) @@ -205,7 +212,7 @@ public SslContextFactory sslContextFactory() { }); } - client = WebClient.create(vertxToUse, webClientOptions); + client = WebClient.create(vertx, webClientOptions); } private static String buildUrl(final InternalRequest request) { @@ -265,6 +272,11 @@ private HttpMethod requestTypeToHttpMethod(RequestType requestType) { } } + @Override + public void release() { + vertx.runOnContext(__ -> pool.release(this)); + } + @Override @UnstableApi public CompletableFuture executeAsync(@UnstableApi final InternalRequest request) { @@ -273,7 +285,7 @@ public CompletableFuture executeAsync(@UnstableApi final Inter return rfuture; } - public void doExecute(@UnstableApi final InternalRequest request, @UnstableApi final CompletableFuture rfuture) { + private void doExecute(@UnstableApi final InternalRequest request, @UnstableApi final CompletableFuture rfuture) { String path = buildUrl(request); HttpRequest httpRequest = client .request(requestTypeToHttpMethod(request.getRequestType()), path) diff --git a/http-protocol/src/main/java/com/arangodb/http/HttpConnectionFactory.java b/http-protocol/src/main/java/com/arangodb/http/HttpConnectionFactory.java index 69980aec3..72c8c9086 100644 --- a/http-protocol/src/main/java/com/arangodb/http/HttpConnectionFactory.java +++ b/http-protocol/src/main/java/com/arangodb/http/HttpConnectionFactory.java @@ -26,6 +26,7 @@ import com.arangodb.internal.config.ArangoConfig; import com.arangodb.internal.net.Connection; import com.arangodb.internal.net.ConnectionFactory; +import com.arangodb.internal.net.ConnectionPool; import io.vertx.core.Vertx; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +50,9 @@ public HttpConnectionFactory(@UnstableApi final HttpProtocolConfig cfg) { @Override @UnstableApi - public Connection create(@UnstableApi final ArangoConfig config, final HostDescription host) { - return new HttpConnection(config, host, protocolConfig); + public Connection create(@UnstableApi final ArangoConfig config, + final HostDescription host, + @UnstableApi final ConnectionPool pool) { + return new HttpConnection(config, protocolConfig, host, pool); } } diff --git a/jackson-serde-json/pom.xml b/jackson-serde-json/pom.xml index 61b9acd55..816d6eefc 100644 --- a/jackson-serde-json/pom.xml +++ b/jackson-serde-json/pom.xml @@ -8,7 +8,7 @@ ../release-parent com.arangodb release-parent - 7.18.0 + 7.19.0-SNAPSHOT jackson-serde-json diff --git a/jackson-serde-vpack/pom.xml b/jackson-serde-vpack/pom.xml index 81a581480..14d2cafc5 100644 --- a/jackson-serde-vpack/pom.xml +++ b/jackson-serde-vpack/pom.xml @@ -8,7 +8,7 @@ ../release-parent com.arangodb release-parent - 7.18.0 + 7.19.0-SNAPSHOT jackson-serde-vpack diff --git a/jsonb-serde/pom.xml b/jsonb-serde/pom.xml index 5cb72c2a1..e3d23beae 100644 --- a/jsonb-serde/pom.xml +++ b/jsonb-serde/pom.xml @@ -8,7 +8,7 @@ ../release-parent com.arangodb release-parent - 7.18.0 + 7.19.0-SNAPSHOT jsonb-serde diff --git a/pom.xml b/pom.xml index 3493461af..d1c2a81f4 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.arangodb arangodb-java-driver-parent - 7.18.0 + 7.19.0-SNAPSHOT 2016 release-parent diff --git a/release-parent/pom.xml b/release-parent/pom.xml index e0846a551..502df1b43 100644 --- a/release-parent/pom.xml +++ b/release-parent/pom.xml @@ -6,7 +6,7 @@ com.arangodb arangodb-java-driver-parent - 7.18.0 + 7.19.0-SNAPSHOT pom diff --git a/shaded/pom.xml b/shaded/pom.xml index 76e43d4d3..6a4147cd1 100644 --- a/shaded/pom.xml +++ b/shaded/pom.xml @@ -8,7 +8,7 @@ ../release-parent com.arangodb release-parent - 7.18.0 + 7.19.0-SNAPSHOT arangodb-java-driver-shaded diff --git a/test-functional/pom.xml b/test-functional/pom.xml index 71bdae4ec..a8fd16797 100644 --- a/test-functional/pom.xml +++ b/test-functional/pom.xml @@ -8,7 +8,7 @@ ../test-parent com.arangodb test-parent - 7.18.0 + 7.19.0-SNAPSHOT test-functional diff --git a/test-functional/src/test/java/com/arangodb/UserAgentTest.java b/test-functional/src/test/java/com/arangodb/UserAgentTest.java index aa794f069..9af38f28a 100644 --- a/test-functional/src/test/java/com/arangodb/UserAgentTest.java +++ b/test-functional/src/test/java/com/arangodb/UserAgentTest.java @@ -10,7 +10,7 @@ class UserAgentTest extends BaseJunit5 { - private static final String EXPECTED_VERSION = "7.18.0"; + private static final String EXPECTED_VERSION = "7.19.0-SNAPSHOT"; private static final boolean SHADED = Boolean.parseBoolean(System.getProperty("shaded")); diff --git a/test-functional/src/test/java/com/arangodb/internal/HostHandlerTest.java b/test-functional/src/test/java/com/arangodb/internal/HostHandlerTest.java index 674c14851..109a9eb5e 100644 --- a/test-functional/src/test/java/com/arangodb/internal/HostHandlerTest.java +++ b/test-functional/src/test/java/com/arangodb/internal/HostHandlerTest.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.fail; @@ -39,15 +40,20 @@ class HostHandlerTest { private static final ConnectionPool mockCP = new ConnectionPool() { @Override - public Connection createConnection(HostDescription host) { + public Connection createConnection() { return null; } @Override - public Connection connection() { + public CompletableFuture connection() { return null; } + @Override + public void release(Connection connection) { + + } + @Override public void setJwt(String jwt) { diff --git a/test-functional/src/test/resources/simplelogger.properties b/test-functional/src/test/resources/simplelogger.properties index c375e20ef..a2a4ce6d5 100644 --- a/test-functional/src/test/resources/simplelogger.properties +++ b/test-functional/src/test/resources/simplelogger.properties @@ -10,3 +10,5 @@ org.slf4j.simpleLogger.defaultLogLevel=info #org.slf4j.simpleLogger.log.com.arangodb.internal.serde.JacksonUtils=debug #org.slf4j.simpleLogger.log.com.arangodb.internal.net.Communication=debug #org.slf4j.simpleLogger.log.com.arangodb.internal.serde.InternalSerdeImpl=debug +#org.slf4j.simpleLogger.log.io.netty.handler.logging.LoggingHandler=debug +#org.slf4j.simpleLogger.log.io.netty.handler.codec.http2.Http2FrameLogger=debug diff --git a/test-non-functional/pom.xml b/test-non-functional/pom.xml index bf7219a25..a10f0aaa9 100644 --- a/test-non-functional/pom.xml +++ b/test-non-functional/pom.xml @@ -8,7 +8,7 @@ ../test-parent com.arangodb test-parent - 7.18.0 + 7.19.0-SNAPSHOT test-non-functional diff --git a/test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java b/test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java new file mode 100644 index 000000000..a3f5200a2 --- /dev/null +++ b/test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java @@ -0,0 +1,113 @@ +package concurrency; + +import com.arangodb.*; +import com.arangodb.config.ArangoConfigProperties; +import com.arangodb.internal.net.ConnectionPoolImpl; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import util.TestUtils; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.awaitility.Awaitility.await; + +public class ConnectionLoadBalanceTest { + private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionLoadBalanceTest.class); + + public static Stream configs() { + return Stream.of( + // FIXME: DE-1017 + // new Config(Protocol.VST, 1), + // new Config(Protocol.VST, 2), + new Config(Protocol.HTTP_JSON, 10), + new Config(Protocol.HTTP_JSON, 20), + new Config(Protocol.HTTP2_JSON, 1), + new Config(Protocol.HTTP2_JSON, 2) + ).map(Arguments::of); + } + + // Test the requests load balancing across different connections, when all the slots except 1 are busy + @MethodSource("configs") + @ParameterizedTest + void loadBalanceToAvailableSlots(Config cfg) throws InterruptedException { + doTestLoadBalance(cfg, 1); + } + + // Test the requests load balancing across different connections, when all the slots are busy + @MethodSource("configs") + @ParameterizedTest + void loadBalanceAllBusy(Config cfg) throws InterruptedException { + doTestLoadBalance(cfg, 2); + } + + void doTestLoadBalance(Config cfg, int sleepCycles) throws InterruptedException { + int longTasksCount = cfg.maxStreams() * cfg.maxConnections * sleepCycles - 1; + int shortTasksCount = 10; + long sleepDuration = 2; + + ArangoDatabaseAsync db = new ArangoDB.Builder() + .loadProperties(ArangoConfigProperties.fromFile()) + .protocol(cfg.protocol) + .serde(TestUtils.createSerde(cfg.protocol)) + .maxConnections(cfg.maxConnections) + .build().async().db(); + + LOGGER.debug("starting..."); + + CompletableFuture longRunningTasks = CompletableFuture.allOf( + IntStream.range(0, longTasksCount) + .mapToObj(__ -> + db.query("RETURN SLEEP(@duration)", Void.class, Map.of("duration", sleepDuration))) + .toArray(CompletableFuture[]::new) + ); + + Thread.sleep(100); + + CompletableFuture shortRunningTasks = CompletableFuture.allOf( + IntStream.range(0, shortTasksCount) + .mapToObj(__ -> db.getVersion()) + .toArray(CompletableFuture[]::new) + ); + + LOGGER.debug("awaiting..."); + + await() + .timeout(Duration.ofSeconds(sleepDuration * sleepCycles - 1L)) + .until(shortRunningTasks::isDone); + + LOGGER.debug("completed shortRunningTasks"); + + // join exceptional completions + shortRunningTasks.join(); + + await() + .timeout(Duration.ofSeconds(sleepDuration * sleepCycles + 2L)) + .until(longRunningTasks::isDone); + + LOGGER.debug("completed longRunningTasks"); + + // join exceptional completions + longRunningTasks.join(); + + db.arango().shutdown(); + } + + private record Config( + Protocol protocol, + int maxConnections + ) { + int maxStreams() { + return switch (protocol) { + case HTTP_JSON, HTTP_VPACK -> ConnectionPoolImpl.HTTP1_SLOTS; + default -> ConnectionPoolImpl.HTTP2_SLOTS; + }; + } + } +} diff --git a/test-non-functional/src/test/java/concurrency/ConnectionPoolConcurrencyTest.java b/test-non-functional/src/test/java/concurrency/ConnectionPoolConcurrencyTest.java index 618f229f3..bf9641e0c 100644 --- a/test-non-functional/src/test/java/concurrency/ConnectionPoolConcurrencyTest.java +++ b/test-non-functional/src/test/java/concurrency/ConnectionPoolConcurrencyTest.java @@ -4,10 +4,7 @@ import com.arangodb.internal.InternalRequest; import com.arangodb.internal.InternalResponse; import com.arangodb.internal.config.ArangoConfig; -import com.arangodb.internal.net.Connection; -import com.arangodb.internal.net.ConnectionFactory; -import com.arangodb.internal.net.ConnectionPool; -import com.arangodb.internal.net.ConnectionPoolImpl; +import com.arangodb.internal.net.*; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -23,7 +20,7 @@ public class ConnectionPoolConcurrencyTest { cfg.setMaxConnections(10_000); } - private final ConnectionFactory cf = (config, host) -> new Connection() { + private final ConnectionFactory cf = (config, host, pool) -> new Connection() { @Override public void setJwt(String jwt) { } @@ -33,6 +30,10 @@ public CompletableFuture executeAsync(InternalRequest request) throw new UnsupportedOperationException(); } + @Override + public void release() { + } + @Override public void close() { } @@ -45,7 +46,7 @@ void foo() throws InterruptedException, ExecutionException, IOException { List> futures = es.invokeAll(Collections.nCopies(8, (Callable) () -> { for (int i = 0; i < 10_000; i++) { - cp.createConnection(HostDescription.parse("127.0.0.1:8529")); + cp.createConnection(); cp.connection(); cp.setJwt("foo"); } diff --git a/test-non-functional/src/test/java/mp/ArangoConfigPropertiesMPImpl.java b/test-non-functional/src/test/java/mp/ArangoConfigPropertiesMPImpl.java index 1a0407a4c..4cac7a647 100644 --- a/test-non-functional/src/test/java/mp/ArangoConfigPropertiesMPImpl.java +++ b/test-non-functional/src/test/java/mp/ArangoConfigPropertiesMPImpl.java @@ -23,6 +23,7 @@ public final class ArangoConfigPropertiesMPImpl implements ArangoConfigPropertie private Optional useSsl; private Optional verifyHost; private Optional chunkSize; + private Optional pipelining; private Optional maxConnections; private Optional connectionTtl; private Optional keepAliveInterval; @@ -80,6 +81,11 @@ public Optional getChunkSize() { return chunkSize; } + @Override + public Optional getPipelining() { + return pipelining; + } + @Override public Optional getMaxConnections() { return maxConnections; @@ -140,12 +146,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ArangoConfigPropertiesMPImpl that = (ArangoConfigPropertiesMPImpl) o; - return Objects.equals(hosts, that.hosts) && Objects.equals(protocol, that.protocol) && Objects.equals(user, that.user) && Objects.equals(password, that.password) && Objects.equals(jwt, that.jwt) && Objects.equals(timeout, that.timeout) && Objects.equals(useSsl, that.useSsl) && Objects.equals(verifyHost, that.verifyHost) && Objects.equals(chunkSize, that.chunkSize) && Objects.equals(maxConnections, that.maxConnections) && Objects.equals(connectionTtl, that.connectionTtl) && Objects.equals(keepAliveInterval, that.keepAliveInterval) && Objects.equals(acquireHostList, that.acquireHostList) && Objects.equals(acquireHostListInterval, that.acquireHostListInterval) && Objects.equals(loadBalancingStrategy, that.loadBalancingStrategy) && Objects.equals(responseQueueTimeSamples, that.responseQueueTimeSamples) && Objects.equals(compression, that.compression) && Objects.equals(compressionThreshold, that.compressionThreshold) && Objects.equals(compressionLevel, that.compressionLevel) && Objects.equals(serdeProviderClass, that.serdeProviderClass); + return Objects.equals(hosts, that.hosts) && Objects.equals(protocol, that.protocol) && Objects.equals(user, that.user) && Objects.equals(password, that.password) && Objects.equals(jwt, that.jwt) && Objects.equals(timeout, that.timeout) && Objects.equals(useSsl, that.useSsl) && Objects.equals(verifyHost, that.verifyHost) && Objects.equals(chunkSize, that.chunkSize) && Objects.equals(pipelining, that.pipelining) && Objects.equals(maxConnections, that.maxConnections) && Objects.equals(connectionTtl, that.connectionTtl) && Objects.equals(keepAliveInterval, that.keepAliveInterval) && Objects.equals(acquireHostList, that.acquireHostList) && Objects.equals(acquireHostListInterval, that.acquireHostListInterval) && Objects.equals(loadBalancingStrategy, that.loadBalancingStrategy) && Objects.equals(responseQueueTimeSamples, that.responseQueueTimeSamples) && Objects.equals(compression, that.compression) && Objects.equals(compressionThreshold, that.compressionThreshold) && Objects.equals(compressionLevel, that.compressionLevel) && Objects.equals(serdeProviderClass, that.serdeProviderClass); } @Override public int hashCode() { - return Objects.hash(hosts, protocol, user, password, jwt, timeout, useSsl, verifyHost, chunkSize, maxConnections, connectionTtl, keepAliveInterval, acquireHostList, acquireHostListInterval, loadBalancingStrategy, responseQueueTimeSamples, compression, compressionThreshold, compressionLevel, serdeProviderClass); + return Objects.hash(hosts, protocol, user, password, jwt, timeout, useSsl, verifyHost, chunkSize, pipelining, maxConnections, connectionTtl, keepAliveInterval, acquireHostList, acquireHostListInterval, loadBalancingStrategy, responseQueueTimeSamples, compression, compressionThreshold, compressionLevel, serdeProviderClass); } @Override @@ -160,6 +166,7 @@ public String toString() { ", useSsl=" + useSsl + ", verifyHost=" + verifyHost + ", chunkSize=" + chunkSize + + ", pipelining=" + pipelining + ", maxConnections=" + maxConnections + ", connectionTtl=" + connectionTtl + ", keepAliveInterval=" + keepAliveInterval + diff --git a/test-non-functional/src/test/java/mp/ConfigMPDefaultsTest.java b/test-non-functional/src/test/java/mp/ConfigMPDefaultsTest.java index 3ff81bd04..5a8f861ef 100644 --- a/test-non-functional/src/test/java/mp/ConfigMPDefaultsTest.java +++ b/test-non-functional/src/test/java/mp/ConfigMPDefaultsTest.java @@ -25,6 +25,7 @@ private void checkResult(ArangoConfigProperties config) { assertThat(config.getUseSsl()).isEmpty(); assertThat(config.getVerifyHost()).isEmpty(); assertThat(config.getChunkSize()).isEmpty(); + assertThat(config.getPipelining()).isEmpty(); assertThat(config.getMaxConnections()).isNotPresent(); assertThat(config.getConnectionTtl()).isNotPresent(); assertThat(config.getKeepAliveInterval()).isNotPresent(); diff --git a/test-non-functional/src/test/java/mp/ConfigMPTest.java b/test-non-functional/src/test/java/mp/ConfigMPTest.java index 4a7aaa993..5d5f605e3 100644 --- a/test-non-functional/src/test/java/mp/ConfigMPTest.java +++ b/test-non-functional/src/test/java/mp/ConfigMPTest.java @@ -23,6 +23,7 @@ class ConfigMPTest { private final Boolean useSsl = true; private final Boolean verifyHost = false; private final Integer vstChunkSize = 1234; + private final Boolean pipelining = true; private final Integer maxConnections = 123; private final Long connectionTtl = 12345L; private final Integer keepAliveInterval = 123456; @@ -58,6 +59,7 @@ private void checkResult(ArangoConfigProperties config) { assertThat(config.getUseSsl()).hasValue(useSsl); assertThat(config.getVerifyHost()).hasValue(verifyHost); assertThat(config.getChunkSize()).hasValue(vstChunkSize); + assertThat(config.getPipelining()).hasValue(pipelining); assertThat(config.getMaxConnections()) .isPresent() .hasValue(maxConnections); diff --git a/test-non-functional/src/test/resources/arangodb-config-test.properties b/test-non-functional/src/test/resources/arangodb-config-test.properties index ef25aaf11..1d5e675af 100644 --- a/test-non-functional/src/test/resources/arangodb-config-test.properties +++ b/test-non-functional/src/test/resources/arangodb-config-test.properties @@ -7,6 +7,7 @@ adb.timeout=9876 adb.useSsl=true adb.verifyHost=false adb.chunkSize=1234 +adb.pipelining=true adb.maxConnections=123 adb.connectionTtl=12345 adb.keepAliveInterval=123456 diff --git a/test-non-functional/src/test/resources/simplelogger.properties b/test-non-functional/src/test/resources/simplelogger.properties index 7649bd6c7..495a73812 100644 --- a/test-non-functional/src/test/resources/simplelogger.properties +++ b/test-non-functional/src/test/resources/simplelogger.properties @@ -9,3 +9,6 @@ org.slf4j.simpleLogger.showShortLogName=false org.slf4j.simpleLogger.defaultLogLevel=info #org.slf4j.simpleLogger.log.com.arangodb.internal.serde.JacksonUtils=debug #org.slf4j.simpleLogger.log.com.arangodb.internal.net.Communication=debug +#org.slf4j.simpleLogger.log.io.netty.handler.logging.LoggingHandler=debug +#org.slf4j.simpleLogger.log.io.netty.handler.codec.http2.Http2FrameLogger=debug +#org.slf4j.simpleLogger.log.com.arangodb.internal.util.AsyncQueue=trace diff --git a/test-parent/pom.xml b/test-parent/pom.xml index b68b5c4d1..d582c6f5b 100644 --- a/test-parent/pom.xml +++ b/test-parent/pom.xml @@ -7,7 +7,7 @@ com.arangodb arangodb-java-driver-parent - 7.18.0 + 7.19.0-SNAPSHOT pom @@ -60,6 +60,11 @@ assertj-core test + + org.awaitility + awaitility + test + @@ -93,6 +98,12 @@ assertj-core 3.25.3 + + org.awaitility + awaitility + 4.2.1 + test + com.tngtech.archunit archunit-junit5 diff --git a/test-perf/pom.xml b/test-perf/pom.xml index 4e31b0ab1..3ab88b158 100644 --- a/test-perf/pom.xml +++ b/test-perf/pom.xml @@ -7,7 +7,7 @@ ../test-parent com.arangodb test-parent - 7.18.0 + 7.19.0-SNAPSHOT test-perf diff --git a/test-resilience/pom.xml b/test-resilience/pom.xml index bd7963bbc..a6b8f4f56 100644 --- a/test-resilience/pom.xml +++ b/test-resilience/pom.xml @@ -6,7 +6,7 @@ ../test-parent com.arangodb test-parent - 7.18.0 + 7.19.0-SNAPSHOT 4.0.0 @@ -29,12 +29,6 @@ 2.1.7 test - - org.awaitility - awaitility - 4.2.0 - test - ch.qos.logback logback-classic diff --git a/tutorial/gradle/build.gradle b/tutorial/gradle/build.gradle index cb34d15d7..ff468c6db 100644 --- a/tutorial/gradle/build.gradle +++ b/tutorial/gradle/build.gradle @@ -12,7 +12,7 @@ repositories { } dependencies { - implementation 'com.arangodb:arangodb-java-driver:7.18.0' + implementation 'com.arangodb:arangodb-java-driver:7.19.0-SNAPSHOT' } ext { diff --git a/tutorial/maven/pom.xml b/tutorial/maven/pom.xml index e5e32c7c2..7e33fd350 100644 --- a/tutorial/maven/pom.xml +++ b/tutorial/maven/pom.xml @@ -19,7 +19,7 @@ com.arangodb arangodb-java-driver - 7.18.0 + 7.19.0-SNAPSHOT diff --git a/vst-protocol/pom.xml b/vst-protocol/pom.xml index 94b176118..3aedd968d 100644 --- a/vst-protocol/pom.xml +++ b/vst-protocol/pom.xml @@ -8,7 +8,7 @@ ../release-parent com.arangodb release-parent - 7.18.0 + 7.19.0-SNAPSHOT vst-protocol diff --git a/vst-protocol/src/main/java/com/arangodb/vst/VstConnectionFactoryAsync.java b/vst-protocol/src/main/java/com/arangodb/vst/VstConnectionFactoryAsync.java index f0faca44f..1db7852a0 100644 --- a/vst-protocol/src/main/java/com/arangodb/vst/VstConnectionFactoryAsync.java +++ b/vst-protocol/src/main/java/com/arangodb/vst/VstConnectionFactoryAsync.java @@ -25,6 +25,7 @@ import com.arangodb.internal.config.ArangoConfig; import com.arangodb.internal.net.Connection; import com.arangodb.internal.net.ConnectionFactory; +import com.arangodb.internal.net.ConnectionPool; import com.arangodb.vst.internal.VstConnectionAsync; /** @@ -35,8 +36,8 @@ public class VstConnectionFactoryAsync implements ConnectionFactory { @Override @UnstableApi - public Connection create(@UnstableApi final ArangoConfig config, final HostDescription host) { - return new VstConnectionAsync(config, host); + public Connection create(@UnstableApi final ArangoConfig config, final HostDescription host, @UnstableApi final ConnectionPool pool) { + return new VstConnectionAsync(config, host, pool); } } diff --git a/vst-protocol/src/main/java/com/arangodb/vst/internal/VstConnection.java b/vst-protocol/src/main/java/com/arangodb/vst/internal/VstConnection.java index ddd886d10..8b4cdc211 100644 --- a/vst-protocol/src/main/java/com/arangodb/vst/internal/VstConnection.java +++ b/vst-protocol/src/main/java/com/arangodb/vst/internal/VstConnection.java @@ -25,6 +25,7 @@ import com.arangodb.internal.ArangoDefaults; import com.arangodb.internal.config.ArangoConfig; import com.arangodb.internal.net.Connection; +import com.arangodb.internal.net.ConnectionPool; import com.arangodb.velocypack.VPackBuilder; import com.arangodb.velocypack.VPackSlice; import com.arangodb.velocypack.ValueType; @@ -68,6 +69,7 @@ public abstract class VstConnection implements Connection { private final HostDescription host; private final Map sendTimestamps = new ConcurrentHashMap<>(); private final String connectionName; + private final ConnectionPool pool; private final byte[] keepAliveRequest = new VPackBuilder() .add(ValueType.ARRAY) .add(1) @@ -89,7 +91,7 @@ public abstract class VstConnection implements Connection { private OutputStream outputStream; private InputStream inputStream; - protected VstConnection(final ArangoConfig config, final HostDescription host) { + protected VstConnection(final ArangoConfig config, final HostDescription host, final ConnectionPool pool) { super(); timeout = config.getTimeout(); ttl = config.getConnectionTtl(); @@ -97,6 +99,7 @@ protected VstConnection(final ArangoConfig config, final HostDescription host) { useSsl = config.getUseSsl(); sslContext = config.getSslContext(); this.host = host; + this.pool = pool; connectionName = "connection_" + System.currentTimeMillis() + "_" + Math.random(); LOGGER.debug("[" + connectionName + "]: Connection created"); @@ -244,6 +247,11 @@ public synchronized void close() { } } + @Override + public void release() { + pool.release(this); + } + private synchronized void sendProtocolHeader() throws IOException { if (LOGGER.isDebugEnabled()) { LOGGER.debug(String.format("[%s]: Send velocystream protocol header to %s", connectionName, socket)); diff --git a/vst-protocol/src/main/java/com/arangodb/vst/internal/VstConnectionAsync.java b/vst-protocol/src/main/java/com/arangodb/vst/internal/VstConnectionAsync.java index 8b74cbd57..5b128340e 100644 --- a/vst-protocol/src/main/java/com/arangodb/vst/internal/VstConnectionAsync.java +++ b/vst-protocol/src/main/java/com/arangodb/vst/internal/VstConnectionAsync.java @@ -25,6 +25,7 @@ import com.arangodb.internal.InternalRequest; import com.arangodb.internal.InternalResponse; import com.arangodb.internal.config.ArangoConfig; +import com.arangodb.internal.net.ConnectionPool; import com.arangodb.internal.serde.InternalSerde; import com.arangodb.velocypack.VPackSlice; import com.arangodb.velocypack.exception.VPackParserException; @@ -51,8 +52,8 @@ public class VstConnectionAsync extends VstConnection private final InternalSerde serde; - public VstConnectionAsync(final ArangoConfig config, final HostDescription host) { - super(config, host); + public VstConnectionAsync(final ArangoConfig config, final HostDescription host, final ConnectionPool pool) { + super(config, host, pool); chunkSize = config.getChunkSize(); serde = config.getInternalSerde(); } @@ -98,7 +99,7 @@ public CompletableFuture executeAsync(final InternalRequest re return; } rfuture.complete(response); - } else { + } else { Throwable e = ex instanceof CompletionException ? ex.getCause() : ex; rfuture.completeExceptionally(e); }