diff --git a/driver/src/main/java/org/neo4j/driver/internal/async/connection/BoltProtocolUtil.java b/driver/src/main/java/org/neo4j/driver/internal/async/connection/BoltProtocolUtil.java index bc191a9434..68d7241b35 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/async/connection/BoltProtocolUtil.java +++ b/driver/src/main/java/org/neo4j/driver/internal/async/connection/BoltProtocolUtil.java @@ -25,6 +25,7 @@ import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4; import org.neo4j.driver.internal.messaging.v41.BoltProtocolV41; import org.neo4j.driver.internal.messaging.v42.BoltProtocolV42; +import org.neo4j.driver.internal.messaging.v43.BoltProtocolV43; import static io.netty.buffer.Unpooled.copyInt; import static io.netty.buffer.Unpooled.unreleasableBuffer; @@ -41,9 +42,9 @@ public final class BoltProtocolUtil private static final ByteBuf HANDSHAKE_BUF = unreleasableBuffer( copyInt( BOLT_MAGIC_PREAMBLE, + BoltProtocolV43.VERSION.toInt(), BoltProtocolV42.VERSION.toInt(), BoltProtocolV41.VERSION.toInt(), - BoltProtocolV4.VERSION.toInt(), BoltProtocolV3.VERSION.toInt() ) ).asReadOnly(); private static final String HANDSHAKE_STRING = createHandshakeString(); diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunner.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunner.java index 83937076e9..d28590f7de 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunner.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunner.java @@ -34,7 +34,12 @@ import static org.neo4j.driver.Values.value; import static org.neo4j.driver.internal.DatabaseNameUtil.systemDatabase; -public class MultiDatabasesRoutingProcedureRunner extends RoutingProcedureRunner + +/** + * This implementation of the {@link RoutingProcedureRunner} works with multi database versions of Neo4j calling + * the procedure `dbms.routing.getRoutingTable` + */ +public class MultiDatabasesRoutingProcedureRunner extends SingleDatabaseRoutingProcedureRunner { static final String DATABASE_NAME = "database"; static final String MULTI_DB_GET_ROUTING_TABLE = String.format( "CALL dbms.routing.getRoutingTable($%s, $%s)", ROUTING_CONTEXT, DATABASE_NAME ); diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RouteMessageRoutingProcedureRunner.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RouteMessageRoutingProcedureRunner.java new file mode 100644 index 0000000000..1fe25dac2c --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RouteMessageRoutingProcedureRunner.java @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Bookmark; +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.Values; +import org.neo4j.driver.internal.DatabaseName; +import org.neo4j.driver.internal.InternalRecord; +import org.neo4j.driver.internal.async.connection.DirectConnection; +import org.neo4j.driver.internal.handlers.RouteMessageResponseHandler; +import org.neo4j.driver.internal.messaging.request.RouteMessage; +import org.neo4j.driver.internal.spi.Connection; + +import static java.util.Collections.singletonList; + +/** + * This implementation of the {@link RoutingProcedureRunner} access the routing procedure + * through the bolt's ROUTE message. + */ +public class RouteMessageRoutingProcedureRunner implements RoutingProcedureRunner +{ + private final Map routingContext; + private final Supplier>> createCompletableFuture; + + public RouteMessageRoutingProcedureRunner( RoutingContext routingContext ) + { + this( routingContext, CompletableFuture::new ); + } + + protected RouteMessageRoutingProcedureRunner( RoutingContext routingContext, Supplier>> createCompletableFuture ) + { + this.routingContext = routingContext + .toMap() + .entrySet() + .stream() + .collect( Collectors.toMap( Map.Entry::getKey, entry -> Values.value( entry.getValue() ) ) ); + this.createCompletableFuture = createCompletableFuture; + } + + @Override + public CompletionStage run( Connection connection, DatabaseName databaseName, Bookmark bookmark ) + { + CompletableFuture> completableFuture = createCompletableFuture.get(); + + DirectConnection directConnection = toDirectConnection( connection, databaseName ); + directConnection.writeAndFlush( new RouteMessage( routingContext, databaseName.databaseName().orElse( null ) ), + new RouteMessageResponseHandler( completableFuture ) ); + return completableFuture + .thenApply( routingTable -> new RoutingProcedureResponse( getQuery( databaseName ), singletonList( toRecord( routingTable ) ) ) ) + .exceptionally( throwable -> new RoutingProcedureResponse( getQuery( databaseName ), throwable.getCause() ) ) + .thenCompose( routingProcedureResponse -> directConnection.release().thenApply( ignore -> routingProcedureResponse ) ); + } + + private Record toRecord( Map routingTable ) + { + return new InternalRecord( new ArrayList<>( routingTable.keySet() ), routingTable.values().toArray( new Value[0] ) ); + } + + private DirectConnection toDirectConnection( Connection connection, DatabaseName databaseName ) + { + return new DirectConnection( connection, databaseName, AccessMode.READ ); + } + + private Query getQuery( DatabaseName databaseName ) + { + Map params = new HashMap<>(); + params.put( "routingContext", routingContext ); + params.put( "databaseName", databaseName.databaseName().orElse( null ) ); + return new Query( "ROUTE $routingContext $databaseName", params ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProvider.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProvider.java index 16e5c5220f..1c969b98e9 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProvider.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProvider.java @@ -33,43 +33,53 @@ import static java.lang.String.format; import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.supportsMultiDatabase; +import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.supportsRouteMessage; public class RoutingProcedureClusterCompositionProvider implements ClusterCompositionProvider { private static final String PROTOCOL_ERROR_MESSAGE = "Failed to parse '%s' result received from server due to "; private final Clock clock; - private final RoutingProcedureRunner routingProcedureRunner; + private final RoutingProcedureRunner singleDatabaseRoutingProcedureRunner; private final RoutingProcedureRunner multiDatabaseRoutingProcedureRunner; + private final RoutingProcedureRunner routeMessageRoutingProcedureRunner; public RoutingProcedureClusterCompositionProvider( Clock clock, RoutingContext routingContext ) { - this( clock, new RoutingProcedureRunner( routingContext ), new MultiDatabasesRoutingProcedureRunner( routingContext ) ); + this( clock, new SingleDatabaseRoutingProcedureRunner( routingContext ), new MultiDatabasesRoutingProcedureRunner( routingContext ), + new RouteMessageRoutingProcedureRunner( routingContext ) ); } - RoutingProcedureClusterCompositionProvider( Clock clock, RoutingProcedureRunner routingProcedureRunner, - MultiDatabasesRoutingProcedureRunner multiDatabaseRoutingProcedureRunner ) + RoutingProcedureClusterCompositionProvider( Clock clock, SingleDatabaseRoutingProcedureRunner singleDatabaseRoutingProcedureRunner, + MultiDatabasesRoutingProcedureRunner multiDatabaseRoutingProcedureRunner, + RouteMessageRoutingProcedureRunner routeMessageRoutingProcedureRunner ) { this.clock = clock; - this.routingProcedureRunner = routingProcedureRunner; + this.singleDatabaseRoutingProcedureRunner = singleDatabaseRoutingProcedureRunner; this.multiDatabaseRoutingProcedureRunner = multiDatabaseRoutingProcedureRunner; + this.routeMessageRoutingProcedureRunner = routeMessageRoutingProcedureRunner; } @Override public CompletionStage getClusterComposition( Connection connection, DatabaseName databaseName, Bookmark bookmark ) { RoutingProcedureRunner runner; - if ( supportsMultiDatabase( connection ) ) + + if ( supportsRouteMessage( connection ) ) + { + runner = routeMessageRoutingProcedureRunner; + } + else if ( supportsMultiDatabase( connection ) ) { runner = multiDatabaseRoutingProcedureRunner; } else { - runner = routingProcedureRunner; + runner = singleDatabaseRoutingProcedureRunner; } return runner.run( connection, databaseName, bookmark ) - .thenApply( this::processRoutingResponse ); + .thenApply( this::processRoutingResponse ); } private ClusterComposition processRoutingResponse( RoutingProcedureResponse response ) diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java index 1747006c40..e3b2c225a2 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java @@ -18,112 +18,24 @@ */ package org.neo4j.driver.internal.cluster; -import java.util.List; -import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; -import org.neo4j.driver.AccessMode; import org.neo4j.driver.Bookmark; -import org.neo4j.driver.Query; -import org.neo4j.driver.Record; -import org.neo4j.driver.TransactionConfig; -import org.neo4j.driver.async.ResultCursor; -import org.neo4j.driver.exceptions.ClientException; -import org.neo4j.driver.exceptions.FatalDiscoveryException; -import org.neo4j.driver.internal.BookmarkHolder; import org.neo4j.driver.internal.DatabaseName; -import org.neo4j.driver.internal.async.connection.DirectConnection; import org.neo4j.driver.internal.spi.Connection; -import org.neo4j.driver.internal.util.Futures; -import org.neo4j.driver.internal.util.ServerVersion; -import static org.neo4j.driver.Values.parameters; -import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase; -import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.UNLIMITED_FETCH_SIZE; - -public class RoutingProcedureRunner +/** + * Interface which defines the standard way to get the routing table + */ +public interface RoutingProcedureRunner { - static final String ROUTING_CONTEXT = "context"; - static final String GET_ROUTING_TABLE = "CALL dbms.cluster.routing.getRoutingTable($" + ROUTING_CONTEXT + ")"; - - final RoutingContext context; - - public RoutingProcedureRunner( RoutingContext context ) - { - this.context = context; - } - - public CompletionStage run( Connection connection, DatabaseName databaseName, Bookmark bookmark ) - { - DirectConnection delegate = connection( connection ); - Query procedure = procedureQuery( connection.serverVersion(), databaseName ); - BookmarkHolder bookmarkHolder = bookmarkHolder( bookmark ); - return runProcedure( delegate, procedure, bookmarkHolder ) - .thenCompose( records -> releaseConnection( delegate, records ) ) - .handle( ( records, error ) -> processProcedureResponse( procedure, records, error ) ); - } - - DirectConnection connection( Connection connection ) - { - return new DirectConnection( connection, defaultDatabase(), AccessMode.WRITE ); - } - - Query procedureQuery(ServerVersion serverVersion, DatabaseName databaseName ) - { - if ( databaseName.databaseName().isPresent() ) - { - throw new FatalDiscoveryException( String.format( - "Refreshing routing table for multi-databases is not supported in server version lower than 4.0. " + - "Current server version: %s. Database name: '%s'", serverVersion, databaseName.description() ) ); - } - return new Query( GET_ROUTING_TABLE, parameters( ROUTING_CONTEXT, context.toMap() ) ); - } - - BookmarkHolder bookmarkHolder( Bookmark ignored ) - { - return BookmarkHolder.NO_OP; - } - - CompletionStage> runProcedure(Connection connection, Query procedure, BookmarkHolder bookmarkHolder ) - { - return connection.protocol() - .runInAutoCommitTransaction( connection, procedure, bookmarkHolder, TransactionConfig.empty(), true, UNLIMITED_FETCH_SIZE ) - .asyncResult().thenCompose( ResultCursor::listAsync ); - } - - private CompletionStage> releaseConnection( Connection connection, List records ) - { - // It is not strictly required to release connection after routing procedure invocation because it'll - // be released by the PULL_ALL response handler after result is fully fetched. Such release will happen - // in background. However, releasing it early as part of whole chain makes it easier to reason about - // rediscovery in stub server tests. Some of them assume connections to instances not present in new - // routing table will be closed immediately. - return connection.release().thenApply( ignore -> records ); - } - - private static RoutingProcedureResponse processProcedureResponse(Query procedure, List records, - Throwable error ) - { - Throwable cause = Futures.completionExceptionCause( error ); - if ( cause != null ) - { - return handleError( procedure, cause ); - } - else - { - return new RoutingProcedureResponse( procedure, records ); - } - } - - private static RoutingProcedureResponse handleError(Query procedure, Throwable error ) - { - if ( error instanceof ClientException ) - { - return new RoutingProcedureResponse( procedure, error ); - } - else - { - throw new CompletionException( error ); - } - } + /** + * Run the calls to the server + * + * @param connection The connection which will be used to call the server + * @param databaseName The database name + * @param bookmark The bookmark used to query the routing information + * @return The routing table + */ + CompletionStage run( Connection connection, DatabaseName databaseName, Bookmark bookmark ); } diff --git a/driver/src/main/java/org/neo4j/driver/internal/cluster/SingleDatabaseRoutingProcedureRunner.java b/driver/src/main/java/org/neo4j/driver/internal/cluster/SingleDatabaseRoutingProcedureRunner.java new file mode 100644 index 0000000000..9ed7cebcbb --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/cluster/SingleDatabaseRoutingProcedureRunner.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import java.util.List; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CompletionStage; + +import org.neo4j.driver.AccessMode; +import org.neo4j.driver.Bookmark; +import org.neo4j.driver.Query; +import org.neo4j.driver.Record; +import org.neo4j.driver.TransactionConfig; +import org.neo4j.driver.async.ResultCursor; +import org.neo4j.driver.exceptions.ClientException; +import org.neo4j.driver.exceptions.FatalDiscoveryException; +import org.neo4j.driver.internal.BookmarkHolder; +import org.neo4j.driver.internal.DatabaseName; +import org.neo4j.driver.internal.async.connection.DirectConnection; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.util.Futures; +import org.neo4j.driver.internal.util.ServerVersion; + +import static org.neo4j.driver.Values.parameters; +import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase; +import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.UNLIMITED_FETCH_SIZE; + +/** + * This implementation of the {@link RoutingProcedureRunner} works with single database versions of Neo4j calling + * the procedure `dbms.cluster.routing.getRoutingTable` + */ +public class SingleDatabaseRoutingProcedureRunner implements RoutingProcedureRunner +{ + static final String ROUTING_CONTEXT = "context"; + static final String GET_ROUTING_TABLE = "CALL dbms.cluster.routing.getRoutingTable($" + ROUTING_CONTEXT + ")"; + + final RoutingContext context; + + public SingleDatabaseRoutingProcedureRunner( RoutingContext context ) + { + this.context = context; + } + + @Override + public CompletionStage run( Connection connection, DatabaseName databaseName, Bookmark bookmark ) + { + DirectConnection delegate = connection( connection ); + Query procedure = procedureQuery( connection.serverVersion(), databaseName ); + BookmarkHolder bookmarkHolder = bookmarkHolder( bookmark ); + return runProcedure( delegate, procedure, bookmarkHolder ) + .thenCompose( records -> releaseConnection( delegate, records ) ) + .handle( ( records, error ) -> processProcedureResponse( procedure, records, error ) ); + } + + DirectConnection connection( Connection connection ) + { + return new DirectConnection( connection, defaultDatabase(), AccessMode.WRITE ); + } + + Query procedureQuery(ServerVersion serverVersion, DatabaseName databaseName ) + { + if ( databaseName.databaseName().isPresent() ) + { + throw new FatalDiscoveryException( String.format( + "Refreshing routing table for multi-databases is not supported in server version lower than 4.0. " + + "Current server version: %s. Database name: '%s'", serverVersion, databaseName.description() ) ); + } + return new Query( GET_ROUTING_TABLE, parameters( ROUTING_CONTEXT, context.toMap() ) ); + } + + BookmarkHolder bookmarkHolder( Bookmark ignored ) + { + return BookmarkHolder.NO_OP; + } + + CompletionStage> runProcedure(Connection connection, Query procedure, BookmarkHolder bookmarkHolder ) + { + return connection.protocol() + .runInAutoCommitTransaction( connection, procedure, bookmarkHolder, TransactionConfig.empty(), true, UNLIMITED_FETCH_SIZE ) + .asyncResult().thenCompose( ResultCursor::listAsync ); + } + + private CompletionStage> releaseConnection( Connection connection, List records ) + { + // It is not strictly required to release connection after routing procedure invocation because it'll + // be released by the PULL_ALL response handler after result is fully fetched. Such release will happen + // in background. However, releasing it early as part of whole chain makes it easier to reason about + // rediscovery in stub server tests. Some of them assume connections to instances not present in new + // routing table will be closed immediately. + return connection.release().thenApply( ignore -> records ); + } + + private static RoutingProcedureResponse processProcedureResponse(Query procedure, List records, + Throwable error ) + { + Throwable cause = Futures.completionExceptionCause( error ); + if ( cause != null ) + { + return handleError( procedure, cause ); + } + else + { + return new RoutingProcedureResponse( procedure, records ); + } + } + + private static RoutingProcedureResponse handleError(Query procedure, Throwable error ) + { + if ( error instanceof ClientException ) + { + return new RoutingProcedureResponse( procedure, error ); + } + else + { + throw new CompletionException( error ); + } + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/handlers/RouteMessageResponseHandler.java b/driver/src/main/java/org/neo4j/driver/internal/handlers/RouteMessageResponseHandler.java new file mode 100644 index 0000000000..fabe3c2823 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/handlers/RouteMessageResponseHandler.java @@ -0,0 +1,91 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.handlers; + +import java.util.Arrays; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; + +import org.neo4j.driver.Value; +import org.neo4j.driver.Values; +import org.neo4j.driver.internal.spi.ResponseHandler; + +import static java.util.Objects.requireNonNull; + +/** + * Handles the RouteMessage response getting the success response + * and return its routing table property as the response. + */ +public class RouteMessageResponseHandler implements ResponseHandler +{ + private final CompletableFuture> completableFuture; + + public RouteMessageResponseHandler( final CompletableFuture> completableFuture ) + { + this.completableFuture = requireNonNull( completableFuture ); + } + + @Override + public void onSuccess( Map metadata ) + { + try + { + completableFuture.complete( metadata.get( "rt" ).asMap( Values::value ) ); + } + catch ( Exception ex ) + { + completableFuture.completeExceptionally( ex ); + } + } + + @Override + public void onFailure( Throwable error ) + { + completableFuture.completeExceptionally( error ); + } + + @Override + public void onRecord( Value[] fields ) + { + completableFuture.completeExceptionally( new UnsupportedOperationException( + "Route is not expected to receive records: " + Arrays.toString( fields ) ) ); + } + + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + RouteMessageResponseHandler that = (RouteMessageResponseHandler) o; + return completableFuture.equals( that.completableFuture ); + } + + @Override + public int hashCode() + { + return Objects.hash( completableFuture ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java index ee6c72eee5..8517ad3cf6 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/BoltProtocol.java @@ -39,6 +39,7 @@ import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4; import org.neo4j.driver.internal.messaging.v41.BoltProtocolV41; import org.neo4j.driver.internal.messaging.v42.BoltProtocolV42; +import org.neo4j.driver.internal.messaging.v43.BoltProtocolV43; import org.neo4j.driver.internal.spi.Connection; import static org.neo4j.driver.internal.async.connection.ChannelAttributes.protocolVersion; @@ -168,6 +169,10 @@ else if ( BoltProtocolV42.VERSION.equals( version ) ) { return BoltProtocolV42.INSTANCE; } + else if ( BoltProtocolV43.VERSION.equals( version ) ) + { + return BoltProtocolV43.INSTANCE; + } throw new ClientException( "Unknown protocol version: " + version ); } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/encode/RouteMessageEncoder.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/encode/RouteMessageEncoder.java new file mode 100644 index 0000000000..2f165c9ad7 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/encode/RouteMessageEncoder.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.messaging.encode; + +import java.io.IOException; + +import org.neo4j.driver.internal.messaging.Message; +import org.neo4j.driver.internal.messaging.MessageEncoder; +import org.neo4j.driver.internal.messaging.ValuePacker; +import org.neo4j.driver.internal.messaging.request.RouteMessage; + +import static org.neo4j.driver.internal.util.Preconditions.checkArgument; + +/** + * Encodes the ROUTE message to the stream + */ +public class RouteMessageEncoder implements MessageEncoder +{ + @Override + public void encode( Message message, ValuePacker packer ) throws IOException + { + checkArgument( message, RouteMessage.class ); + RouteMessage routeMessage = (RouteMessage) message; + packer.packStructHeader( 2, message.signature() ); + packer.pack( routeMessage.getRoutingContext() ); + packer.pack( routeMessage.getDatabaseName() ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/request/MultiDatabaseUtil.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/request/MultiDatabaseUtil.java index b3c54f28e9..73dbe42314 100644 --- a/driver/src/main/java/org/neo4j/driver/internal/messaging/request/MultiDatabaseUtil.java +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/request/MultiDatabaseUtil.java @@ -22,6 +22,7 @@ import org.neo4j.driver.internal.DatabaseName; import org.neo4j.driver.internal.messaging.BoltProtocolVersion; import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4; +import org.neo4j.driver.internal.messaging.v43.BoltProtocolV43; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.util.ServerVersion; @@ -32,7 +33,7 @@ public static void assertEmptyDatabaseName( DatabaseName databaseName, BoltProto if ( databaseName.databaseName().isPresent() ) { throw new ClientException( String.format( "Database name parameter for selecting database is not supported in Bolt Protocol Version %s. " + - "Database name: '%s'", boltVersion, databaseName.description() ) ); + "Database name: '%s'", boltVersion, databaseName.description() ) ); } } @@ -41,4 +42,9 @@ public static boolean supportsMultiDatabase( Connection connection ) return connection.serverVersion().greaterThanOrEqual( ServerVersion.v4_0_0 ) && connection.protocol().version().compareTo( BoltProtocolV4.VERSION ) >= 0; } + + public static boolean supportsRouteMessage( Connection connection ) + { + return connection.protocol().version().compareTo( BoltProtocolV43.VERSION ) >= 0; + } } diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/request/RouteMessage.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/request/RouteMessage.java new file mode 100644 index 0000000000..b8cc74ce0e --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/request/RouteMessage.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.messaging.request; + +import java.util.Map; +import java.util.Objects; + +import org.neo4j.driver.Value; +import org.neo4j.driver.internal.messaging.Message; + +import static java.util.Collections.unmodifiableMap; + +/** + * From the application point of view it is not interesting to know about the role a member plays in the cluster. Instead, the application needs to know which + * instance can provide the wanted service. + *

+ * This message is used to fetch this routing information. + */ +public class RouteMessage implements Message +{ + public final static byte SIGNATURE = 0x66; + private final Map routingContext; + private final String databaseName; + + /** + * Constructor + * + * @param routingContext The routing context used to define the routing table. Multi-datacenter deployments is one of its use cases. + * @param databaseName The name of the database to get the routing table for. + */ + public RouteMessage( Map routingContext, String databaseName ) + { + this.routingContext = unmodifiableMap( routingContext ); + this.databaseName = databaseName; + } + + public Map getRoutingContext() + { + return routingContext; + } + + public String getDatabaseName() + { + return databaseName; + } + + @Override + public byte signature() + { + return SIGNATURE; + } + + @Override + public String toString() + { + return String.format( "ROUTE %s %s", routingContext, databaseName ); + } + + @Override + public boolean equals( Object o ) + { + if ( this == o ) + { + return true; + } + if ( o == null || getClass() != o.getClass() ) + { + return false; + } + RouteMessage that = (RouteMessage) o; + return routingContext.equals( that.routingContext ) && + Objects.equals( databaseName, that.databaseName ); + } + + @Override + public int hashCode() + { + return Objects.hash( routingContext, databaseName ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/v43/BoltProtocolV43.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/v43/BoltProtocolV43.java new file mode 100644 index 0000000000..c02938b934 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/v43/BoltProtocolV43.java @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.messaging.v43; + +import org.neo4j.driver.internal.messaging.BoltProtocol; +import org.neo4j.driver.internal.messaging.BoltProtocolVersion; +import org.neo4j.driver.internal.messaging.MessageFormat; +import org.neo4j.driver.internal.messaging.v42.BoltProtocolV42; + +/** + * Definition of the Bolt Protocol 4.3 + * + * The version 4.3 use most of the 4.2 behaviours, but it extends it with new messages such as ROUTE + */ +public class BoltProtocolV43 extends BoltProtocolV42 +{ + public static final BoltProtocolVersion VERSION = new BoltProtocolVersion( 4, 3 ); + public static final BoltProtocol INSTANCE = new BoltProtocolV43(); + + @Override + public MessageFormat createMessageFormat() + { + return new MessageFormatV43(); + } + + @Override + public BoltProtocolVersion version() + { + return VERSION; + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/v43/MessageFormatV43.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/v43/MessageFormatV43.java new file mode 100644 index 0000000000..f9bb2e63f9 --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/v43/MessageFormatV43.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.messaging.v43; + +import org.neo4j.driver.internal.messaging.MessageFormat; +import org.neo4j.driver.internal.messaging.common.CommonMessageReader; +import org.neo4j.driver.internal.packstream.PackInput; +import org.neo4j.driver.internal.packstream.PackOutput; + +/** + * Bolt message format v4.3 + */ +public class MessageFormatV43 implements MessageFormat +{ + @Override + public Writer newWriter( PackOutput output ) + { + return new MessageWriterV43( output ); + } + + @Override + public Reader newReader( PackInput input ) + { + return new CommonMessageReader( input ); + } +} diff --git a/driver/src/main/java/org/neo4j/driver/internal/messaging/v43/MessageWriterV43.java b/driver/src/main/java/org/neo4j/driver/internal/messaging/v43/MessageWriterV43.java new file mode 100644 index 0000000000..2f87e30b1c --- /dev/null +++ b/driver/src/main/java/org/neo4j/driver/internal/messaging/v43/MessageWriterV43.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.messaging.v43; + +import java.util.Map; + +import org.neo4j.driver.internal.messaging.AbstractMessageWriter; +import org.neo4j.driver.internal.messaging.MessageEncoder; +import org.neo4j.driver.internal.messaging.common.CommonValuePacker; +import org.neo4j.driver.internal.messaging.encode.BeginMessageEncoder; +import org.neo4j.driver.internal.messaging.encode.CommitMessageEncoder; +import org.neo4j.driver.internal.messaging.encode.DiscardMessageEncoder; +import org.neo4j.driver.internal.messaging.encode.GoodbyeMessageEncoder; +import org.neo4j.driver.internal.messaging.encode.HelloMessageEncoder; +import org.neo4j.driver.internal.messaging.encode.PullMessageEncoder; +import org.neo4j.driver.internal.messaging.encode.ResetMessageEncoder; +import org.neo4j.driver.internal.messaging.encode.RollbackMessageEncoder; +import org.neo4j.driver.internal.messaging.encode.RouteMessageEncoder; +import org.neo4j.driver.internal.messaging.encode.RunWithMetadataMessageEncoder; +import org.neo4j.driver.internal.messaging.request.BeginMessage; +import org.neo4j.driver.internal.messaging.request.CommitMessage; +import org.neo4j.driver.internal.messaging.request.DiscardMessage; +import org.neo4j.driver.internal.messaging.request.GoodbyeMessage; +import org.neo4j.driver.internal.messaging.request.HelloMessage; +import org.neo4j.driver.internal.messaging.request.PullMessage; +import org.neo4j.driver.internal.messaging.request.ResetMessage; +import org.neo4j.driver.internal.messaging.request.RollbackMessage; +import org.neo4j.driver.internal.messaging.request.RouteMessage; +import org.neo4j.driver.internal.messaging.request.RunWithMetadataMessage; +import org.neo4j.driver.internal.packstream.PackOutput; +import org.neo4j.driver.internal.util.Iterables; + +/** + * Bolt message writer v4.3 + * + * This version is able to encode all the versions existing on v4.2, but it encodes + * new messages such as ROUTE + */ +public class MessageWriterV43 extends AbstractMessageWriter +{ + public MessageWriterV43( PackOutput output ) + { + super( new CommonValuePacker( output ), buildEncoders() ); + } + + private static Map buildEncoders() + { + Map result = Iterables.newHashMapWithSize( 9 ); + result.put( HelloMessage.SIGNATURE, new HelloMessageEncoder() ); + result.put( GoodbyeMessage.SIGNATURE, new GoodbyeMessageEncoder() ); + result.put( RunWithMetadataMessage.SIGNATURE, new RunWithMetadataMessageEncoder() ); + result.put( RouteMessage.SIGNATURE, new RouteMessageEncoder() ); // new + + result.put( DiscardMessage.SIGNATURE, new DiscardMessageEncoder() ); + result.put( PullMessage.SIGNATURE, new PullMessageEncoder() ); + + result.put( BeginMessage.SIGNATURE, new BeginMessageEncoder() ); + result.put( CommitMessage.SIGNATURE, new CommitMessageEncoder() ); + result.put( RollbackMessage.SIGNATURE, new RollbackMessageEncoder() ); + + result.put( ResetMessage.SIGNATURE, new ResetMessageEncoder() ); + return result; + } +} diff --git a/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitIT.java b/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitIT.java index e586750d66..b2d76cb46b 100644 --- a/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitIT.java +++ b/driver/src/test/java/org/neo4j/driver/integration/RoutingDriverBoltKitIT.java @@ -20,6 +20,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; diff --git a/driver/src/test/java/org/neo4j/driver/internal/async/connection/BoltProtocolUtilTest.java b/driver/src/test/java/org/neo4j/driver/internal/async/connection/BoltProtocolUtilTest.java index 4657e31fc8..7f3d1c0d8a 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/async/connection/BoltProtocolUtilTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/async/connection/BoltProtocolUtilTest.java @@ -23,9 +23,9 @@ import org.junit.jupiter.api.Test; import org.neo4j.driver.internal.messaging.v3.BoltProtocolV3; -import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4; import org.neo4j.driver.internal.messaging.v41.BoltProtocolV41; import org.neo4j.driver.internal.messaging.v42.BoltProtocolV42; +import org.neo4j.driver.internal.messaging.v43.BoltProtocolV43; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.neo4j.driver.internal.async.connection.BoltProtocolUtil.BOLT_MAGIC_PREAMBLE; @@ -43,15 +43,15 @@ void shouldReturnHandshakeBuf() { assertByteBufContains( handshakeBuf(), - BOLT_MAGIC_PREAMBLE, BoltProtocolV42.VERSION.toInt(), BoltProtocolV41.VERSION.toInt(), - BoltProtocolV4.VERSION.toInt(), BoltProtocolV3.VERSION.toInt() + BOLT_MAGIC_PREAMBLE, BoltProtocolV43.VERSION.toInt(), BoltProtocolV42.VERSION.toInt(), + BoltProtocolV41.VERSION.toInt(), BoltProtocolV3.VERSION.toInt() ); } @Test void shouldReturnHandshakeString() { - assertEquals( "[0x6060b017, 516, 260, 4, 3]", handshakeString() ); + assertEquals( "[0x6060b017, 772, 516, 260, 3]", handshakeString() ); } @Test diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/AbstractRoutingProcedureRunnerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/AbstractRoutingProcedureRunnerTest.java index 23db0c6e06..3e4ab34f90 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/AbstractRoutingProcedureRunnerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/AbstractRoutingProcedureRunnerTest.java @@ -46,7 +46,7 @@ abstract class AbstractRoutingProcedureRunnerTest void shouldReturnFailedResponseOnClientException() { ClientException error = new ClientException( "Hi" ); - RoutingProcedureRunner runner = routingProcedureRunner( RoutingContext.EMPTY, failedFuture( error ) ); + SingleDatabaseRoutingProcedureRunner runner = singleDatabaseRoutingProcedureRunner( RoutingContext.EMPTY, failedFuture( error ) ); RoutingProcedureResponse response = await( runner.run( connection(), defaultDatabase(), empty() ) ); @@ -58,7 +58,7 @@ void shouldReturnFailedResponseOnClientException() void shouldReturnFailedStageOnError() { Exception error = new Exception( "Hi" ); - RoutingProcedureRunner runner = routingProcedureRunner( RoutingContext.EMPTY, failedFuture( error ) ); + SingleDatabaseRoutingProcedureRunner runner = singleDatabaseRoutingProcedureRunner( RoutingContext.EMPTY, failedFuture( error ) ); Exception e = assertThrows( Exception.class, () -> await( runner.run( connection(), defaultDatabase(), empty() ) ) ); assertEquals( error, e ); @@ -67,7 +67,7 @@ void shouldReturnFailedStageOnError() @Test void shouldReleaseConnectionOnSuccess() { - RoutingProcedureRunner runner = routingProcedureRunner( RoutingContext.EMPTY ); + SingleDatabaseRoutingProcedureRunner runner = singleDatabaseRoutingProcedureRunner( RoutingContext.EMPTY ); Connection connection = connection(); RoutingProcedureResponse response = await( runner.run( connection, defaultDatabase(), empty() ) ); @@ -79,7 +79,7 @@ void shouldReleaseConnectionOnSuccess() @Test void shouldPropagateReleaseError() { - RoutingProcedureRunner runner = routingProcedureRunner( RoutingContext.EMPTY ); + SingleDatabaseRoutingProcedureRunner runner = singleDatabaseRoutingProcedureRunner( RoutingContext.EMPTY ); RuntimeException releaseError = new RuntimeException( "Release failed" ); Connection connection = connection( failedFuture( releaseError ) ); @@ -89,9 +89,9 @@ void shouldPropagateReleaseError() verify( connection ).release(); } - abstract RoutingProcedureRunner routingProcedureRunner( RoutingContext context ); + abstract SingleDatabaseRoutingProcedureRunner singleDatabaseRoutingProcedureRunner( RoutingContext context ); - abstract RoutingProcedureRunner routingProcedureRunner( RoutingContext context, CompletionStage> runProcedureResult ); + abstract SingleDatabaseRoutingProcedureRunner singleDatabaseRoutingProcedureRunner( RoutingContext context, CompletionStage> runProcedureResult ); static Connection connection() { diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunnerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunnerTest.java index 9a00a395f8..19a1e333cf 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunnerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunnerTest.java @@ -50,7 +50,7 @@ import static org.neo4j.driver.internal.InternalBookmark.empty; import static org.neo4j.driver.internal.cluster.MultiDatabasesRoutingProcedureRunner.DATABASE_NAME; import static org.neo4j.driver.internal.cluster.MultiDatabasesRoutingProcedureRunner.MULTI_DB_GET_ROUTING_TABLE; -import static org.neo4j.driver.internal.cluster.RoutingProcedureRunner.ROUTING_CONTEXT; +import static org.neo4j.driver.internal.cluster.SingleDatabaseRoutingProcedureRunner.ROUTING_CONTEXT; import static org.neo4j.driver.util.TestUtil.await; class MultiDatabasesRoutingProcedureRunnerTest extends AbstractRoutingProcedureRunnerTest @@ -96,13 +96,13 @@ void shouldCallGetRoutingTableWithParamOnSystemDatabaseForDatabase( String db ) } @Override - RoutingProcedureRunner routingProcedureRunner( RoutingContext context ) + SingleDatabaseRoutingProcedureRunner singleDatabaseRoutingProcedureRunner( RoutingContext context ) { return new TestRoutingProcedureRunner( context ); } @Override - RoutingProcedureRunner routingProcedureRunner( RoutingContext context, CompletionStage> runProcedureResult ) + SingleDatabaseRoutingProcedureRunner singleDatabaseRoutingProcedureRunner( RoutingContext context, CompletionStage> runProcedureResult ) { return new TestRoutingProcedureRunner( context, runProcedureResult ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RouteMessageRoutingProcedureRunnerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RouteMessageRoutingProcedureRunnerTest.java new file mode 100644 index 0000000000..f06be41075 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RouteMessageRoutingProcedureRunnerTest.java @@ -0,0 +1,156 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.cluster; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.neo4j.driver.Record; +import org.neo4j.driver.Value; +import org.neo4j.driver.Values; +import org.neo4j.driver.internal.DatabaseName; +import org.neo4j.driver.internal.DatabaseNameUtil; +import org.neo4j.driver.internal.handlers.RouteMessageResponseHandler; +import org.neo4j.driver.internal.messaging.request.RouteMessage; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.util.TestUtil; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +class RouteMessageRoutingProcedureRunnerTest +{ + + private static Stream shouldRequestRoutingTableForAllValidInputScenarios() + { + return Stream.of( + Arguments.arguments( RoutingContext.EMPTY, DatabaseNameUtil.defaultDatabase() ), + Arguments.arguments( RoutingContext.EMPTY, DatabaseNameUtil.systemDatabase() ), + Arguments.arguments( RoutingContext.EMPTY, DatabaseNameUtil.database( "neo4j" ) ), + Arguments.arguments( new RoutingContext( URI.create( "localhost:17601" ) ), DatabaseNameUtil.defaultDatabase() ), + Arguments.arguments( new RoutingContext( URI.create( "localhost:17602" ) ), DatabaseNameUtil.systemDatabase() ), + Arguments.arguments( new RoutingContext( URI.create( "localhost:17603" ) ), DatabaseNameUtil.database( "neo4j" ) ) + ); + } + + @ParameterizedTest + @MethodSource + void shouldRequestRoutingTableForAllValidInputScenarios( RoutingContext routingContext, DatabaseName databaseName ) + { + Map routingTable = getRoutingTable(); + CompletableFuture> completableFuture = CompletableFuture.completedFuture( routingTable ); + RouteMessageRoutingProcedureRunner runner = new RouteMessageRoutingProcedureRunner( routingContext, () -> completableFuture ); + Connection connection = mock( Connection.class ); + CompletableFuture releaseConnectionFuture = CompletableFuture.completedFuture( null ); + doReturn( releaseConnectionFuture ).when( connection ).release(); + + RoutingProcedureResponse response = TestUtil.await( runner.run( connection, databaseName, null ) ); + + assertNotNull( response ); + assertTrue( response.isSuccess() ); + assertNotNull( response.procedure() ); + assertEquals( 1, response.records().size() ); + assertNotNull( response.records().get( 0 ) ); + + Record record = response.records().get( 0 ); + assertEquals( routingTable.get( "ttl" ), record.get( "ttl" ) ); + assertEquals( routingTable.get( "servers" ), record.get( "servers" ) ); + + verifyMessageWasWrittenAndFlushed( connection, completableFuture, routingContext, databaseName ); + verify( connection ).release(); + } + + @Test + void shouldReturnFailureWhenSomethingHappensGettingTheRoutingTable() + { + Throwable reason = new RuntimeException( "Some error" ); + CompletableFuture> completableFuture = new CompletableFuture<>(); + completableFuture.completeExceptionally( reason ); + RouteMessageRoutingProcedureRunner runner = new RouteMessageRoutingProcedureRunner( RoutingContext.EMPTY, () -> completableFuture ); + Connection connection = mock( Connection.class ); + CompletableFuture releaseConnectionFuture = CompletableFuture.completedFuture( null ); + doReturn( releaseConnectionFuture ).when( connection ).release(); + + RoutingProcedureResponse response = TestUtil.await( runner.run( connection, DatabaseNameUtil.defaultDatabase(), null ) ); + + assertNotNull( response ); + assertFalse( response.isSuccess() ); + assertNotNull( response.procedure() ); + assertEquals( reason, response.error() ); + assertThrows( IllegalStateException.class, () -> response.records().size() ); + + verifyMessageWasWrittenAndFlushed( connection, completableFuture, RoutingContext.EMPTY, DatabaseNameUtil.defaultDatabase() ); + verify( connection ).release(); + } + + private void verifyMessageWasWrittenAndFlushed( Connection connection, CompletableFuture> completableFuture, + RoutingContext routingContext, DatabaseName databaseName ) + { + Map context = routingContext.toMap() + .entrySet() + .stream() + .collect( Collectors.toMap( Map.Entry::getKey, entry -> Values.value( entry.getValue() ) ) ); + + verify( connection ).writeAndFlush( eq( new RouteMessage( context, databaseName.databaseName().orElse( null ) ) ), + eq( new RouteMessageResponseHandler( completableFuture ) ) ); + } + + private Map getRoutingTable() + { + Map routingTable = new HashMap<>(); + routingTable.put( "ttl", Values.value( 300 ) ); + routingTable.put( "servers", Values.value( getServers() ) ); + return routingTable; + } + + private List> getServers() + { + List> servers = new ArrayList<>(); + servers.add( getServer( "WRITE", "localhost:17601" ) ); + servers.add( getServer( "READ", "localhost:17601", "localhost:17602", "localhost:17603" ) ); + servers.add( getServer( "ROUTE", "localhost:17601", "localhost:17602", "localhost:17603" ) ); + return servers; + } + + private Map getServer( String role, String... addresses ) + { + Map server = new HashMap<>(); + server.put( "role", Values.value( role ) ); + server.put( "addresses", Values.value( addresses ) ); + return server; + } +} \ No newline at end of file diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProviderTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProviderTest.java index 22ac2d6bb7..5ebb9d9497 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProviderTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProviderTest.java @@ -36,6 +36,7 @@ import org.neo4j.driver.internal.InternalRecord; import org.neo4j.driver.internal.messaging.v3.BoltProtocolV3; import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4; +import org.neo4j.driver.internal.messaging.v43.BoltProtocolV43; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.util.Clock; import org.neo4j.driver.internal.util.ServerVersion; @@ -65,7 +66,7 @@ class RoutingProcedureClusterCompositionProviderTest void shouldProtocolErrorWhenNoRecord() { // Given - RoutingProcedureRunner mockedRunner = newProcedureRunnerMock(); + SingleDatabaseRoutingProcedureRunner mockedRunner = newProcedureRunnerMock(); Connection connection = mock( Connection.class ); ClusterCompositionProvider provider = newClusterCompositionProvider( mockedRunner, connection ); @@ -76,7 +77,7 @@ void shouldProtocolErrorWhenNoRecord() // When & Then ProtocolException error = assertThrows( ProtocolException.class, - () -> await( provider.getClusterComposition( connection, defaultDatabase(), empty() ) ) ); + () -> await( provider.getClusterComposition( connection, defaultDatabase(), empty() ) ) ); assertThat( error.getMessage(), containsString( "records received '0' is too few or too many." ) ); } @@ -84,18 +85,18 @@ void shouldProtocolErrorWhenNoRecord() void shouldProtocolErrorWhenMoreThanOneRecord() { // Given - RoutingProcedureRunner mockedRunner = newProcedureRunnerMock(); + SingleDatabaseRoutingProcedureRunner mockedRunner = newProcedureRunnerMock(); Connection connection = mock( Connection.class ); ClusterCompositionProvider provider = newClusterCompositionProvider( mockedRunner, connection ); - Record aRecord = new InternalRecord( asList( "key1", "key2" ), new Value[]{ new StringValue( "a value" ) } ); + Record aRecord = new InternalRecord( asList( "key1", "key2" ), new Value[]{new StringValue( "a value" )} ); RoutingProcedureResponse routingResponse = newRoutingResponse( aRecord, aRecord ); - when( mockedRunner.run( eq( connection ), any( DatabaseName.class ), any(InternalBookmark.class ) ) ).thenReturn( completedFuture( routingResponse ) ); + when( mockedRunner.run( eq( connection ), any( DatabaseName.class ), any( InternalBookmark.class ) ) ).thenReturn( completedFuture( routingResponse ) ); // When ProtocolException error = assertThrows( ProtocolException.class, - () -> await( provider.getClusterComposition( connection, defaultDatabase(), empty() ) ) ); + () -> await( provider.getClusterComposition( connection, defaultDatabase(), empty() ) ) ); assertThat( error.getMessage(), containsString( "records received '2' is too few or too many." ) ); } @@ -103,18 +104,18 @@ void shouldProtocolErrorWhenMoreThanOneRecord() void shouldProtocolErrorWhenUnparsableRecord() { // Given - RoutingProcedureRunner mockedRunner = newProcedureRunnerMock(); + SingleDatabaseRoutingProcedureRunner mockedRunner = newProcedureRunnerMock(); Connection connection = mock( Connection.class ); ClusterCompositionProvider provider = newClusterCompositionProvider( mockedRunner, connection ); - Record aRecord = new InternalRecord( asList( "key1", "key2" ), new Value[]{ new StringValue( "a value" ) } ); + Record aRecord = new InternalRecord( asList( "key1", "key2" ), new Value[]{new StringValue( "a value" )} ); RoutingProcedureResponse routingResponse = newRoutingResponse( aRecord ); - when( mockedRunner.run( eq( connection ), any( DatabaseName.class ), any(InternalBookmark.class ) ) ).thenReturn( completedFuture( routingResponse ) ); + when( mockedRunner.run( eq( connection ), any( DatabaseName.class ), any( InternalBookmark.class ) ) ).thenReturn( completedFuture( routingResponse ) ); // When ProtocolException error = assertThrows( ProtocolException.class, - () -> await( provider.getClusterComposition( connection, defaultDatabase(), empty() ) ) ); + () -> await( provider.getClusterComposition( connection, defaultDatabase(), empty() ) ) ); assertThat( error.getMessage(), containsString( "unparsable record received." ) ); } @@ -134,12 +135,37 @@ void shouldProtocolErrorWhenNoRouters() serverInfo( "WRITE", "one:1337" ) ) ) } ); RoutingProcedureResponse routingResponse = newRoutingResponse( record ); - when( mockedRunner.run( eq( connection ), any( DatabaseName.class ), any(InternalBookmark.class ) ) ).thenReturn( completedFuture( routingResponse ) ); + when( mockedRunner.run( eq( connection ), any( DatabaseName.class ), any( InternalBookmark.class ) ) ).thenReturn( completedFuture( routingResponse ) ); when( mockedClock.millis() ).thenReturn( 12345L ); // When ProtocolException error = assertThrows( ProtocolException.class, - () -> await( provider.getClusterComposition( connection, defaultDatabase(), empty() ) ) ); + () -> await( provider.getClusterComposition( connection, defaultDatabase(), empty() ) ) ); + assertThat( error.getMessage(), containsString( "no router or reader found in response." ) ); + } + + @Test + void routeMessageRoutingProcedureShouldProtocolErrorWhenNoRouters() + { + // Given + RouteMessageRoutingProcedureRunner mockedRunner = newRouteMessageRoutingProcedureRunnerMock(); + Connection connection = mock( Connection.class ); + Clock mockedClock = mock( Clock.class ); + ClusterCompositionProvider provider = + newClusterCompositionProvider( mockedRunner, connection, mockedClock ); + + Record record = new InternalRecord( asList( "ttl", "servers" ), new Value[]{ + value( 100 ), value( asList( + serverInfo( "READ", "one:1337", "two:1337" ), + serverInfo( "WRITE", "one:1337" ) ) ) + } ); + RoutingProcedureResponse routingResponse = newRoutingResponse( record ); + when( mockedRunner.run( eq( connection ), any( DatabaseName.class ), any( InternalBookmark.class ) ) ).thenReturn( completedFuture( routingResponse ) ); + when( mockedClock.millis() ).thenReturn( 12345L ); + + // When + ProtocolException error = assertThrows( ProtocolException.class, + () -> await( provider.getClusterComposition( connection, defaultDatabase(), empty() ) ) ); assertThat( error.getMessage(), containsString( "no router or reader found in response." ) ); } @@ -159,12 +185,37 @@ void shouldProtocolErrorWhenNoReaders() serverInfo( "ROUTE", "one:1337", "two:1337" ) ) ) } ); RoutingProcedureResponse routingResponse = newRoutingResponse( record ); - when( mockedRunner.run( eq( connection ), any( DatabaseName.class ), any(InternalBookmark.class ) ) ).thenReturn( completedFuture( routingResponse ) ); + when( mockedRunner.run( eq( connection ), any( DatabaseName.class ), any( InternalBookmark.class ) ) ).thenReturn( completedFuture( routingResponse ) ); + when( mockedClock.millis() ).thenReturn( 12345L ); + + // When + ProtocolException error = assertThrows( ProtocolException.class, + () -> await( provider.getClusterComposition( connection, defaultDatabase(), empty() ) ) ); + assertThat( error.getMessage(), containsString( "no router or reader found in response." ) ); + } + + @Test + void routeMessageRoutingProcedureShouldProtocolErrorWhenNoReaders() + { + // Given + RouteMessageRoutingProcedureRunner mockedRunner = newRouteMessageRoutingProcedureRunnerMock(); + Connection connection = mock( Connection.class ); + Clock mockedClock = mock( Clock.class ); + ClusterCompositionProvider provider = + newClusterCompositionProvider( mockedRunner, connection, mockedClock ); + + Record record = new InternalRecord( asList( "ttl", "servers" ), new Value[]{ + value( 100 ), value( asList( + serverInfo( "WRITE", "one:1337" ), + serverInfo( "ROUTE", "one:1337", "two:1337" ) ) ) + } ); + RoutingProcedureResponse routingResponse = newRoutingResponse( record ); + when( mockedRunner.run( eq( connection ), any( DatabaseName.class ), any( InternalBookmark.class ) ) ).thenReturn( completedFuture( routingResponse ) ); when( mockedClock.millis() ).thenReturn( 12345L ); // When ProtocolException error = assertThrows( ProtocolException.class, - () -> await( provider.getClusterComposition( connection, defaultDatabase(), empty() ) ) ); + () -> await( provider.getClusterComposition( connection, defaultDatabase(), empty() ) ) ); assertThat( error.getMessage(), containsString( "no router or reader found in response." ) ); } @@ -172,17 +223,17 @@ void shouldProtocolErrorWhenNoReaders() void shouldPropagateConnectionFailureExceptions() { // Given - RoutingProcedureRunner mockedRunner = newProcedureRunnerMock(); + SingleDatabaseRoutingProcedureRunner mockedRunner = newProcedureRunnerMock(); Connection connection = mock( Connection.class ); ClusterCompositionProvider provider = newClusterCompositionProvider( mockedRunner, connection ); - when( mockedRunner.run( eq( connection ), any( DatabaseName.class ), any(InternalBookmark.class ) ) ).thenReturn( failedFuture( + when( mockedRunner.run( eq( connection ), any( DatabaseName.class ), any( InternalBookmark.class ) ) ).thenReturn( failedFuture( new ServiceUnavailableException( "Connection breaks during cypher execution" ) ) ); // When & Then ServiceUnavailableException e = assertThrows( ServiceUnavailableException.class, - () -> await( provider.getClusterComposition( connection, defaultDatabase(), empty() ) ) ); + () -> await( provider.getClusterComposition( connection, defaultDatabase(), empty() ) ) ); assertThat( e.getMessage(), containsString( "Connection breaks during cypher execution" ) ); } @@ -203,7 +254,38 @@ void shouldReturnSuccessResultWhenNoError() serverInfo( "ROUTE", "one:1337", "two:1337" ) ) ) } ); RoutingProcedureResponse routingResponse = newRoutingResponse( record ); - when( mockedRunner.run( eq( connection ), any( DatabaseName.class ), any(InternalBookmark.class ) ) ) + when( mockedRunner.run( eq( connection ), any( DatabaseName.class ), any( InternalBookmark.class ) ) ) + .thenReturn( completedFuture( routingResponse ) ); + when( mockedClock.millis() ).thenReturn( 12345L ); + + // When + ClusterComposition cluster = await( provider.getClusterComposition( connection, defaultDatabase(), empty() ) ); + + // Then + assertEquals( 12345 + 100_000, cluster.expirationTimestamp() ); + assertEquals( serverSet( "one:1337", "two:1337" ), cluster.readers() ); + assertEquals( serverSet( "one:1337" ), cluster.writers() ); + assertEquals( serverSet( "one:1337", "two:1337" ), cluster.routers() ); + } + + @Test + void routeMessageRoutingProcedureShouldReturnSuccessResultWhenNoError() + { + // Given + Clock mockedClock = mock( Clock.class ); + Connection connection = mock( Connection.class ); + RouteMessageRoutingProcedureRunner mockedRunner = newRouteMessageRoutingProcedureRunnerMock(); + ClusterCompositionProvider provider = + newClusterCompositionProvider( mockedRunner, connection, mockedClock ); + + Record record = new InternalRecord( asList( "ttl", "servers" ), new Value[]{ + value( 100 ), value( asList( + serverInfo( "READ", "one:1337", "two:1337" ), + serverInfo( "WRITE", "one:1337" ), + serverInfo( "ROUTE", "one:1337", "two:1337" ) ) ) + } ); + RoutingProcedureResponse routingResponse = newRoutingResponse( record ); + when( mockedRunner.run( eq( connection ), any( DatabaseName.class ), any( InternalBookmark.class ) ) ) .thenReturn( completedFuture( routingResponse ) ); when( mockedClock.millis() ).thenReturn( 12345L ); @@ -220,18 +302,18 @@ void shouldReturnSuccessResultWhenNoError() @Test void shouldReturnFailureWhenProcedureRunnerFails() { - RoutingProcedureRunner procedureRunner = newProcedureRunnerMock(); + SingleDatabaseRoutingProcedureRunner procedureRunner = newProcedureRunnerMock(); Connection connection = mock( Connection.class ); RuntimeException error = new RuntimeException( "hi" ); - when( procedureRunner.run( eq( connection ), any( DatabaseName.class ), any(InternalBookmark.class ) ) ) + when( procedureRunner.run( eq( connection ), any( DatabaseName.class ), any( InternalBookmark.class ) ) ) .thenReturn( completedFuture( newRoutingResponse( error ) ) ); RoutingProcedureClusterCompositionProvider provider = newClusterCompositionProvider( procedureRunner, connection ); RuntimeException e = assertThrows( RuntimeException.class, - () -> await( provider.getClusterComposition( connection, defaultDatabase(), empty() ) ) ); + () -> await( provider.getClusterComposition( connection, defaultDatabase(), empty() ) ) ); assertEquals( error, e ); } @@ -244,7 +326,7 @@ void shouldUseMultiDBProcedureRunnerWhenConnectingWith40Server() throws Throwabl RoutingProcedureClusterCompositionProvider provider = newClusterCompositionProvider( procedureRunner, connection ); - when( procedureRunner.run( eq( connection ), any( DatabaseName.class ), any(InternalBookmark.class ) ) ).thenReturn( completedWithNull() ); + when( procedureRunner.run( eq( connection ), any( DatabaseName.class ), any( InternalBookmark.class ) ) ).thenReturn( completedWithNull() ); provider.getClusterComposition( connection, defaultDatabase(), empty() ); verify( procedureRunner ).run( eq( connection ), any( DatabaseName.class ), any( InternalBookmark.class ) ); @@ -253,13 +335,28 @@ void shouldUseMultiDBProcedureRunnerWhenConnectingWith40Server() throws Throwabl @Test void shouldUseProcedureRunnerWhenConnectingWith35AndPreviousServers() throws Throwable { - RoutingProcedureRunner procedureRunner = newProcedureRunnerMock(); + SingleDatabaseRoutingProcedureRunner procedureRunner = newProcedureRunnerMock(); Connection connection = mock( Connection.class ); RoutingProcedureClusterCompositionProvider provider = newClusterCompositionProvider( procedureRunner, connection ); - when( procedureRunner.run( eq( connection ), any( DatabaseName.class ), any(InternalBookmark.class ) ) ).thenReturn( completedWithNull() ); + when( procedureRunner.run( eq( connection ), any( DatabaseName.class ), any( InternalBookmark.class ) ) ).thenReturn( completedWithNull() ); + provider.getClusterComposition( connection, defaultDatabase(), empty() ); + + verify( procedureRunner ).run( eq( connection ), any( DatabaseName.class ), any( InternalBookmark.class ) ); + } + + @Test + void shouldUseRouteMessageProcedureRunnerWhenConnectingWithProtocol43() throws Throwable + { + RouteMessageRoutingProcedureRunner procedureRunner = newRouteMessageRoutingProcedureRunnerMock(); + Connection connection = mock( Connection.class ); + + RoutingProcedureClusterCompositionProvider provider = + newClusterCompositionProvider( procedureRunner, connection ); + + when( procedureRunner.run( eq( connection ), any( DatabaseName.class ), any( InternalBookmark.class ) ) ).thenReturn( completedWithNull() ); provider.getClusterComposition( connection, defaultDatabase(), empty() ); verify( procedureRunner ).run( eq( connection ), any( DatabaseName.class ), any( InternalBookmark.class ) ); @@ -283,9 +380,9 @@ private static Set serverSet( String... addresses ) return result; } - private static RoutingProcedureRunner newProcedureRunnerMock() + private static SingleDatabaseRoutingProcedureRunner newProcedureRunnerMock() { - return mock( RoutingProcedureRunner.class ); + return mock( SingleDatabaseRoutingProcedureRunner.class ); } private static MultiDatabasesRoutingProcedureRunner newMultiDBProcedureRunnerMock() @@ -293,6 +390,11 @@ private static MultiDatabasesRoutingProcedureRunner newMultiDBProcedureRunnerMoc return mock( MultiDatabasesRoutingProcedureRunner.class ); } + private static RouteMessageRoutingProcedureRunner newRouteMessageRoutingProcedureRunnerMock() + { + return mock( RouteMessageRoutingProcedureRunner.class ); + } + private static RoutingProcedureResponse newRoutingResponse( Record... records ) { return new RoutingProcedureResponse( new Query( "procedure" ), asList( records ) ); @@ -302,25 +404,43 @@ private static RoutingProcedureResponse newRoutingResponse( Throwable error ) { return new RoutingProcedureResponse( new Query( "procedure" ), error ); } - - private static RoutingProcedureClusterCompositionProvider newClusterCompositionProvider( RoutingProcedureRunner runner, Connection connection ) + + private static RoutingProcedureClusterCompositionProvider newClusterCompositionProvider( SingleDatabaseRoutingProcedureRunner runner, + Connection connection ) { when( connection.serverVersion() ).thenReturn( ServerVersion.v3_5_0 ); when( connection.protocol() ).thenReturn( BoltProtocolV3.INSTANCE ); - return new RoutingProcedureClusterCompositionProvider( mock( Clock.class ), runner, newMultiDBProcedureRunnerMock() ); + return new RoutingProcedureClusterCompositionProvider( mock( Clock.class ), runner, newMultiDBProcedureRunnerMock(), + newRouteMessageRoutingProcedureRunnerMock() ); } - private static RoutingProcedureClusterCompositionProvider newClusterCompositionProvider( MultiDatabasesRoutingProcedureRunner runner, Connection connection ) + private static RoutingProcedureClusterCompositionProvider newClusterCompositionProvider( MultiDatabasesRoutingProcedureRunner runner, + Connection connection ) { when( connection.serverVersion() ).thenReturn( ServerVersion.v4_0_0 ); when( connection.protocol() ).thenReturn( BoltProtocolV4.INSTANCE ); - return new RoutingProcedureClusterCompositionProvider( mock( Clock.class ), newProcedureRunnerMock(), runner ); + return new RoutingProcedureClusterCompositionProvider( mock( Clock.class ), newProcedureRunnerMock(), runner, + newRouteMessageRoutingProcedureRunnerMock() ); } - private static RoutingProcedureClusterCompositionProvider newClusterCompositionProvider( MultiDatabasesRoutingProcedureRunner runner, Connection connection, Clock clock ) + private static RoutingProcedureClusterCompositionProvider newClusterCompositionProvider( MultiDatabasesRoutingProcedureRunner runner, Connection connection, + Clock clock ) { when( connection.serverVersion() ).thenReturn( ServerVersion.v4_0_0 ); when( connection.protocol() ).thenReturn( BoltProtocolV4.INSTANCE ); - return new RoutingProcedureClusterCompositionProvider( clock, newProcedureRunnerMock(), runner ); + return new RoutingProcedureClusterCompositionProvider( clock, newProcedureRunnerMock(), runner, newRouteMessageRoutingProcedureRunnerMock() ); + } + + private static RoutingProcedureClusterCompositionProvider newClusterCompositionProvider( RouteMessageRoutingProcedureRunner runner, Connection connection ) + { + + return newClusterCompositionProvider( runner, connection, mock( Clock.class ) ); + } + + private static RoutingProcedureClusterCompositionProvider newClusterCompositionProvider( RouteMessageRoutingProcedureRunner runner, + Connection connection, Clock clock ) + { + when( connection.protocol() ).thenReturn( BoltProtocolV43.INSTANCE ); + return new RoutingProcedureClusterCompositionProvider( clock, newProcedureRunnerMock(), newMultiDBProcedureRunnerMock(), runner ); } } diff --git a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunnerTest.java b/driver/src/test/java/org/neo4j/driver/internal/cluster/SingleDatabaseRoutingProcedureRunnerTest.java similarity index 89% rename from driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunnerTest.java rename to driver/src/test/java/org/neo4j/driver/internal/cluster/SingleDatabaseRoutingProcedureRunnerTest.java index b0100ecb54..23e615bc12 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunnerTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/cluster/SingleDatabaseRoutingProcedureRunnerTest.java @@ -50,11 +50,11 @@ import static org.neo4j.driver.internal.DatabaseNameUtil.database; import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase; import static org.neo4j.driver.internal.InternalBookmark.empty; -import static org.neo4j.driver.internal.cluster.RoutingProcedureRunner.GET_ROUTING_TABLE; -import static org.neo4j.driver.internal.cluster.RoutingProcedureRunner.ROUTING_CONTEXT; +import static org.neo4j.driver.internal.cluster.SingleDatabaseRoutingProcedureRunner.GET_ROUTING_TABLE; +import static org.neo4j.driver.internal.cluster.SingleDatabaseRoutingProcedureRunner.ROUTING_CONTEXT; import static org.neo4j.driver.util.TestUtil.await; -class RoutingProcedureRunnerTest extends AbstractRoutingProcedureRunnerTest +class SingleDatabaseRoutingProcedureRunnerTest extends AbstractRoutingProcedureRunnerTest { @Test void shouldCallGetRoutingTableWithEmptyMap() @@ -102,12 +102,12 @@ void shouldErrorWhenDatabaseIsNotAbsent( String db ) throws Throwable assertThrows( FatalDiscoveryException.class, () -> await( runner.run( connection(), database( db ), empty() ) ) ); } - RoutingProcedureRunner routingProcedureRunner( RoutingContext context ) + SingleDatabaseRoutingProcedureRunner singleDatabaseRoutingProcedureRunner( RoutingContext context ) { return new TestRoutingProcedureRunner( context ); } - RoutingProcedureRunner routingProcedureRunner( RoutingContext context, CompletionStage> runProcedureResult ) + SingleDatabaseRoutingProcedureRunner singleDatabaseRoutingProcedureRunner( RoutingContext context, CompletionStage> runProcedureResult ) { return new TestRoutingProcedureRunner( context, runProcedureResult ); } @@ -123,7 +123,7 @@ private static Query generateRoutingQuery(Map context ) return new Query( GET_ROUTING_TABLE, parameters ); } - private static class TestRoutingProcedureRunner extends RoutingProcedureRunner + private static class TestRoutingProcedureRunner extends SingleDatabaseRoutingProcedureRunner { final CompletionStage> runProcedureResult; private Connection connection; diff --git a/driver/src/test/java/org/neo4j/driver/internal/handlers/RouteMessageResponseHandlerTest.java b/driver/src/test/java/org/neo4j/driver/internal/handlers/RouteMessageResponseHandlerTest.java new file mode 100644 index 0000000000..2dfa487a5a --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/handlers/RouteMessageResponseHandlerTest.java @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.handlers; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +import org.neo4j.driver.Value; +import org.neo4j.driver.Values; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class RouteMessageResponseHandlerTest +{ + + @Test + void onSuccessShouldSuccessFullyCompleteFutureWithRoutingTable() throws Exception + { + CompletableFuture> completableFuture = new CompletableFuture<>(); + RouteMessageResponseHandler responseHandler = new RouteMessageResponseHandler( completableFuture ); + Map routingTable = getRoutingTable(); + Map metadata = getMetadataWithRoutingTable( routingTable ); + + responseHandler.onSuccess( metadata ); + + assertEquals( routingTable, completableFuture.getNow( null ) ); + } + + @Test + void onSuccessShouldExceptionallyCompleteFutureWhenMetadataDoesNotHaveRoutingTable() throws Exception + { + CompletableFuture> completableFuture = new CompletableFuture<>(); + RouteMessageResponseHandler responseHandler = new RouteMessageResponseHandler( completableFuture ); + Map metadata = new HashMap<>(); + + responseHandler.onSuccess( metadata ); + + assertThrows( CompletionException.class, () -> completableFuture.getNow( null ) ); + } + + @Test + void onFailureShouldCompleteExceptionallyWithTheOriginalException() + { + CompletableFuture> completableFuture = new CompletableFuture<>(); + RouteMessageResponseHandler responseHandler = new RouteMessageResponseHandler( completableFuture ); + RuntimeException expectedException = new RuntimeException( "Test exception" ); + + responseHandler.onFailure( expectedException ); + + assertTrue( completableFuture.isCompletedExceptionally() ); + completableFuture.handle( ( value, ex ) -> + { + assertNull( value ); + assertEquals( expectedException, ex ); + return null; + } ); + } + + @Test + void onRecordShouldThrowUnsupportedOperation() + { + CompletableFuture> completableFuture = new CompletableFuture<>(); + RouteMessageResponseHandler responseHandler = new RouteMessageResponseHandler( completableFuture ); + + responseHandler.onRecord( new Value[0] ); + + assertTrue( completableFuture.isCompletedExceptionally() ); + completableFuture.handle( ( value, ex ) -> + { + assertNull( value ); + assertEquals( UnsupportedOperationException.class, ex.getClass() ); + return null; + } ); + } + + private Map getMetadataWithRoutingTable( Map routingTable ) + { + Map metadata = new HashMap<>(); + metadata.put( "rt", Values.value( routingTable ) ); + return metadata; + } + + private Map getRoutingTable() + { + Map routingTable = new HashMap<>(); + routingTable.put( "ttl", Values.value( 300 ) ); + routingTable.put( "addresses", Values.value( new ArrayList<>() ) ); + return routingTable; + } +} \ No newline at end of file diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/BoltProtocolTest.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/BoltProtocolTest.java index 1bf8666736..18158df68d 100644 --- a/driver/src/test/java/org/neo4j/driver/internal/messaging/BoltProtocolTest.java +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/BoltProtocolTest.java @@ -23,6 +23,10 @@ import org.neo4j.driver.exceptions.ClientException; import org.neo4j.driver.internal.messaging.v3.BoltProtocolV3; +import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4; +import org.neo4j.driver.internal.messaging.v41.BoltProtocolV41; +import org.neo4j.driver.internal.messaging.v42.BoltProtocolV42; +import org.neo4j.driver.internal.messaging.v43.BoltProtocolV43; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; @@ -36,7 +40,11 @@ class BoltProtocolTest void shouldCreateProtocolForKnownVersions() { assertAll( - () -> assertThat( BoltProtocol.forVersion( BoltProtocolV3.VERSION ), instanceOf( BoltProtocolV3.class ) ) + () -> assertThat( BoltProtocol.forVersion( BoltProtocolV3.VERSION ), instanceOf( BoltProtocolV3.class ) ), + () -> assertThat( BoltProtocol.forVersion( BoltProtocolV4.VERSION ), instanceOf( BoltProtocolV4.class ) ), + () -> assertThat( BoltProtocol.forVersion( BoltProtocolV41.VERSION ), instanceOf( BoltProtocolV41.class ) ), + () -> assertThat( BoltProtocol.forVersion( BoltProtocolV42.VERSION ), instanceOf( BoltProtocolV42.class ) ), + () -> assertThat( BoltProtocol.forVersion( BoltProtocolV43.VERSION ), instanceOf( BoltProtocolV43.class ) ) ); } diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/encode/RouteMessageEncoderTest.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/encode/RouteMessageEncoderTest.java new file mode 100644 index 0000000000..8a2db2f241 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/encode/RouteMessageEncoderTest.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.messaging.encode; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.NullSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.InOrder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.neo4j.driver.Value; +import org.neo4j.driver.Values; +import org.neo4j.driver.internal.messaging.Message; +import org.neo4j.driver.internal.messaging.ValuePacker; +import org.neo4j.driver.internal.messaging.request.RouteMessage; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; + +class RouteMessageEncoderTest +{ + private final ValuePacker packer = mock( ValuePacker.class ); + private final RouteMessageEncoder encoder = new RouteMessageEncoder(); + + + @ParameterizedTest + @ValueSource(strings = { "neo4j"}) + @NullSource + void shouldEncodeRouteMessage(String databaseName) throws IOException + { + Map routingContext = getRoutingContext(); + + encoder.encode( new RouteMessage( getRoutingContext(), databaseName ), packer ); + + InOrder inOrder = inOrder( packer ); + + inOrder.verify( packer ).packStructHeader( 2, (byte) 0x66 ); + inOrder.verify( packer ).pack( routingContext ); + inOrder.verify( packer ).pack( databaseName ); + } + + @Test + void shouldThrowIllegalArgumentIfMessageIsNotRouteMessage() + { + Message message = mock( Message.class ); + + assertThrows(IllegalArgumentException.class, () -> encoder.encode( message, packer )); + } + + private Map getRoutingContext() { + Map routingContext = new HashMap<>(); + routingContext.put( "ip", Values.value( "127.0.0.1" ) ); + return routingContext; + } + +} \ No newline at end of file diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/v43/BoltProtocolV43Test.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/v43/BoltProtocolV43Test.java new file mode 100644 index 0000000000..a9bf36d201 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/v43/BoltProtocolV43Test.java @@ -0,0 +1,535 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.messaging.v43; + +import io.netty.channel.ChannelPromise; +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.mockito.ArgumentCaptor; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import org.neo4j.driver.AccessMode; +import org.neo4j.driver.AuthTokens; +import org.neo4j.driver.Bookmark; +import org.neo4j.driver.Logging; +import org.neo4j.driver.Query; +import org.neo4j.driver.TransactionConfig; +import org.neo4j.driver.Value; +import org.neo4j.driver.internal.BookmarkHolder; +import org.neo4j.driver.internal.DatabaseName; +import org.neo4j.driver.internal.DefaultBookmarkHolder; +import org.neo4j.driver.internal.InternalBookmark; +import org.neo4j.driver.internal.async.UnmanagedTransaction; +import org.neo4j.driver.internal.async.connection.ChannelAttributes; +import org.neo4j.driver.internal.async.inbound.InboundMessageDispatcher; +import org.neo4j.driver.internal.cluster.RoutingContext; +import org.neo4j.driver.internal.cursor.AsyncResultCursor; +import org.neo4j.driver.internal.cursor.ResultCursorFactory; +import org.neo4j.driver.internal.handlers.BeginTxResponseHandler; +import org.neo4j.driver.internal.handlers.CommitTxResponseHandler; +import org.neo4j.driver.internal.handlers.NoOpResponseHandler; +import org.neo4j.driver.internal.handlers.PullAllResponseHandler; +import org.neo4j.driver.internal.handlers.RollbackTxResponseHandler; +import org.neo4j.driver.internal.handlers.RunResponseHandler; +import org.neo4j.driver.internal.messaging.BoltProtocol; +import org.neo4j.driver.internal.messaging.MessageFormat; +import org.neo4j.driver.internal.messaging.request.BeginMessage; +import org.neo4j.driver.internal.messaging.request.CommitMessage; +import org.neo4j.driver.internal.messaging.request.GoodbyeMessage; +import org.neo4j.driver.internal.messaging.request.HelloMessage; +import org.neo4j.driver.internal.messaging.request.PullMessage; +import org.neo4j.driver.internal.messaging.request.RollbackMessage; +import org.neo4j.driver.internal.messaging.request.RunWithMetadataMessage; +import org.neo4j.driver.internal.security.InternalAuthToken; +import org.neo4j.driver.internal.spi.Connection; +import org.neo4j.driver.internal.spi.ResponseHandler; + +import static java.time.Duration.ofSeconds; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.junit.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.neo4j.driver.AccessMode.WRITE; +import static org.neo4j.driver.Values.value; +import static org.neo4j.driver.internal.DatabaseNameUtil.database; +import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase; +import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.UNLIMITED_FETCH_SIZE; +import static org.neo4j.driver.util.TestUtil.anyServerVersion; +import static org.neo4j.driver.util.TestUtil.await; +import static org.neo4j.driver.util.TestUtil.connectionMock; + +public final class BoltProtocolV43Test +{ + protected static final String QUERY_TEXT = "RETURN $x"; + protected static final Map PARAMS = singletonMap( "x", value( 42 ) ); + protected static final Query QUERY = new Query( QUERY_TEXT, value( PARAMS ) ); + + protected final BoltProtocol protocol = createProtocol(); + private final EmbeddedChannel channel = new EmbeddedChannel(); + private final InboundMessageDispatcher messageDispatcher = new InboundMessageDispatcher( channel, Logging.none() ); + + private final TransactionConfig txConfig = TransactionConfig.builder() + .withTimeout( ofSeconds( 12 ) ) + .withMetadata( singletonMap( "key", value( 42 ) ) ) + .build(); + + protected BoltProtocol createProtocol() + { + return BoltProtocolV43.INSTANCE; + } + + @BeforeEach + void beforeEach() + { + ChannelAttributes.setMessageDispatcher( channel, messageDispatcher ); + } + + @AfterEach + void afterEach() + { + channel.finishAndReleaseAll(); + } + + @Test + void shouldCreateMessageFormat() + { + assertThat( protocol.createMessageFormat(), instanceOf( expectedMessageFormatType() ) ); + } + + @Test + void shouldInitializeChannel() + { + ChannelPromise promise = channel.newPromise(); + + protocol.initializeChannel( "MyDriver/0.0.1", dummyAuthToken(), RoutingContext.EMPTY, promise ); + + assertThat( channel.outboundMessages(), hasSize( 1 ) ); + assertThat( channel.outboundMessages().poll(), instanceOf( HelloMessage.class ) ); + assertEquals( 1, messageDispatcher.queuedHandlersCount() ); + assertFalse( promise.isDone() ); + + Map metadata = new HashMap<>(); + metadata.put( "server", value( anyServerVersion().toString() ) ); + metadata.put( "connection_id", value( "bolt-42" ) ); + + messageDispatcher.handleSuccessMessage( metadata ); + + assertTrue( promise.isDone() ); + assertTrue( promise.isSuccess() ); + } + + @Test + void shouldPrepareToCloseChannel() + { + protocol.prepareToCloseChannel( channel ); + + assertThat( channel.outboundMessages(), hasSize( 1 ) ); + assertThat( channel.outboundMessages().poll(), instanceOf( GoodbyeMessage.class ) ); + assertEquals( 1, messageDispatcher.queuedHandlersCount() ); + } + + @Test + void shouldFailToInitializeChannelWhenErrorIsReceived() + { + ChannelPromise promise = channel.newPromise(); + + protocol.initializeChannel( "MyDriver/2.2.1", dummyAuthToken(), RoutingContext.EMPTY, promise ); + + assertThat( channel.outboundMessages(), hasSize( 1 ) ); + assertThat( channel.outboundMessages().poll(), instanceOf( HelloMessage.class ) ); + assertEquals( 1, messageDispatcher.queuedHandlersCount() ); + assertFalse( promise.isDone() ); + + messageDispatcher.handleFailureMessage( "Neo.TransientError.General.DatabaseUnavailable", "Error!" ); + + assertTrue( promise.isDone() ); + assertFalse( promise.isSuccess() ); + } + + @Test + void shouldBeginTransactionWithoutBookmark() + { + Connection connection = connectionMock( protocol ); + + CompletionStage stage = protocol.beginTransaction( connection, InternalBookmark.empty(), TransactionConfig.empty() ); + + verify( connection ) + .write( new BeginMessage( InternalBookmark.empty(), TransactionConfig.empty(), defaultDatabase(), WRITE ), NoOpResponseHandler.INSTANCE ); + assertNull( await( stage ) ); + } + + @Test + void shouldBeginTransactionWithBookmarks() + { + Connection connection = connectionMock( protocol ); + Bookmark bookmark = InternalBookmark.parse( "neo4j:bookmark:v1:tx100" ); + + CompletionStage stage = protocol.beginTransaction( connection, bookmark, TransactionConfig.empty() ); + + verify( connection ) + .writeAndFlush( eq( new BeginMessage( bookmark, TransactionConfig.empty(), defaultDatabase(), WRITE ) ), any( BeginTxResponseHandler.class ) ); + assertNull( await( stage ) ); + } + + @Test + void shouldBeginTransactionWithConfig() + { + Connection connection = connectionMock( protocol ); + + CompletionStage stage = protocol.beginTransaction( connection, InternalBookmark.empty(), txConfig ); + + verify( connection ).write( new BeginMessage( InternalBookmark.empty(), txConfig, defaultDatabase(), WRITE ), NoOpResponseHandler.INSTANCE ); + assertNull( await( stage ) ); + } + + @Test + void shouldBeginTransactionWithBookmarksAndConfig() + { + Connection connection = connectionMock( protocol ); + Bookmark bookmark = InternalBookmark.parse( "neo4j:bookmark:v1:tx4242" ); + + CompletionStage stage = protocol.beginTransaction( connection, bookmark, txConfig ); + + verify( connection ).writeAndFlush( eq( new BeginMessage( bookmark, txConfig, defaultDatabase(), WRITE ) ), any( BeginTxResponseHandler.class ) ); + assertNull( await( stage ) ); + } + + @Test + void shouldCommitTransaction() + { + String bookmarkString = "neo4j:bookmark:v1:tx4242"; + + Connection connection = connectionMock( protocol ); + when( connection.protocol() ).thenReturn( protocol ); + doAnswer( invocation -> + { + ResponseHandler commitHandler = invocation.getArgument( 1 ); + commitHandler.onSuccess( singletonMap( "bookmark", value( bookmarkString ) ) ); + return null; + } ).when( connection ).writeAndFlush( eq( CommitMessage.COMMIT ), any() ); + + CompletionStage stage = protocol.commitTransaction( connection ); + + verify( connection ).writeAndFlush( eq( CommitMessage.COMMIT ), any( CommitTxResponseHandler.class ) ); + assertEquals( InternalBookmark.parse( bookmarkString ), await( stage ) ); + } + + @Test + void shouldRollbackTransaction() + { + Connection connection = connectionMock( protocol ); + + CompletionStage stage = protocol.rollbackTransaction( connection ); + + verify( connection ).writeAndFlush( eq( RollbackMessage.ROLLBACK ), any( RollbackTxResponseHandler.class ) ); + assertNull( await( stage ) ); + } + + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldRunInAutoCommitTransactionWithoutWaitingForRunResponse( AccessMode mode ) throws Exception + { + testRunWithoutWaitingForRunResponse( true, TransactionConfig.empty(), mode ); + } + + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldRunInAutoCommitWithConfigTransactionWithoutWaitingForRunResponse( AccessMode mode ) throws Exception + { + testRunWithoutWaitingForRunResponse( true, txConfig, mode ); + } + + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldRunInAutoCommitTransactionAndWaitForSuccessRunResponse( AccessMode mode ) throws Exception + { + testSuccessfulRunInAutoCommitTxWithWaitingForResponse( InternalBookmark.empty(), TransactionConfig.empty(), mode ); + } + + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldRunInAutoCommitTransactionWithBookmarkAndConfigAndWaitForSuccessRunResponse( AccessMode mode ) throws Exception + { + testSuccessfulRunInAutoCommitTxWithWaitingForResponse( InternalBookmark.parse( "neo4j:bookmark:v1:tx65" ), txConfig, mode ); + } + + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldRunInAutoCommitTransactionAndWaitForFailureRunResponse( AccessMode mode ) throws Exception + { + testFailedRunInAutoCommitTxWithWaitingForResponse( InternalBookmark.empty(), TransactionConfig.empty(), mode ); + } + + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldRunInAutoCommitTransactionWithBookmarkAndConfigAndWaitForFailureRunResponse( AccessMode mode ) throws Exception + { + testFailedRunInAutoCommitTxWithWaitingForResponse( InternalBookmark.parse( "neo4j:bookmark:v1:tx163" ), txConfig, mode ); + } + + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldRunInUnmanagedTransactionWithoutWaitingForRunResponse( AccessMode mode ) throws Exception + { + testRunWithoutWaitingForRunResponse( false, TransactionConfig.empty(), mode ); + } + + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldRunInUnmanagedTransactionAndWaitForSuccessRunResponse( AccessMode mode ) throws Exception + { + testRunInUnmanagedTransactionAndWaitForRunResponse( true, mode ); + } + + @ParameterizedTest + @EnumSource( AccessMode.class ) + void shouldRunInUnmanagedTransactionAndWaitForFailureRunResponse( AccessMode mode ) throws Exception + { + testRunInUnmanagedTransactionAndWaitForRunResponse( false, mode ); + } + + @Test + void databaseNameInBeginTransaction() + { + testDatabaseNameSupport( false ); + } + + @Test + void databaseNameForAutoCommitTransactions() + { + testDatabaseNameSupport( true ); + } + + @Test + void shouldSupportDatabaseNameInBeginTransaction() + { + CompletionStage txStage = protocol.beginTransaction( connectionMock( "foo", protocol ), InternalBookmark.empty(), TransactionConfig.empty() ); + + assertDoesNotThrow( () -> await( txStage ) ); + } + + @Test + void shouldNotSupportDatabaseNameForAutoCommitTransactions() + { + assertDoesNotThrow( + () -> protocol.runInAutoCommitTransaction( connectionMock( "foo", protocol ), + new Query( "RETURN 1" ), BookmarkHolder.NO_OP, TransactionConfig.empty(), true, + UNLIMITED_FETCH_SIZE ) ); + } + + private Class expectedMessageFormatType() + { + return MessageFormatV43.class; + } + + private void testFailedRunInAutoCommitTxWithWaitingForResponse( Bookmark bookmark, TransactionConfig config, AccessMode mode ) throws Exception + { + // Given + Connection connection = connectionMock( mode, protocol ); + BookmarkHolder bookmarkHolder = new DefaultBookmarkHolder( bookmark ); + + CompletableFuture cursorFuture = + protocol.runInAutoCommitTransaction( connection, QUERY, bookmarkHolder, config, true, UNLIMITED_FETCH_SIZE ) + .asyncResult() + .toCompletableFuture(); + + ResponseHandler runHandler = verifySessionRunInvoked( connection, bookmark, config, mode, defaultDatabase() ); + assertFalse( cursorFuture.isDone() ); + + // When I response to Run message with a failure + runHandler.onFailure( new RuntimeException() ); + + // Then + assertEquals( bookmark, bookmarkHolder.getBookmark() ); + assertTrue( cursorFuture.isDone() ); + assertNotNull( cursorFuture.get() ); + } + + private void testSuccessfulRunInAutoCommitTxWithWaitingForResponse( Bookmark bookmark, TransactionConfig config, AccessMode mode ) throws Exception + { + // Given + Connection connection = connectionMock( mode, protocol ); + BookmarkHolder bookmarkHolder = new DefaultBookmarkHolder( bookmark ); + + CompletableFuture cursorFuture = + protocol.runInAutoCommitTransaction( connection, QUERY, bookmarkHolder, config, true, UNLIMITED_FETCH_SIZE ) + .asyncResult() + .toCompletableFuture(); + + ResponseHandler runHandler = verifySessionRunInvoked( connection, bookmark, config, mode, defaultDatabase() ); + assertFalse( cursorFuture.isDone() ); + + // When I response to the run message + runHandler.onSuccess( emptyMap() ); + + // Then + assertEquals( bookmark, bookmarkHolder.getBookmark() ); + assertTrue( cursorFuture.isDone() ); + assertNotNull( cursorFuture.get() ); + } + + private void testRunInUnmanagedTransactionAndWaitForRunResponse( boolean success, AccessMode mode ) throws Exception + { + // Given + Connection connection = connectionMock( mode, protocol ); + + CompletableFuture cursorFuture = + protocol.runInUnmanagedTransaction( connection, QUERY, mock( UnmanagedTransaction.class ), true, UNLIMITED_FETCH_SIZE ) + .asyncResult() + .toCompletableFuture(); + + ResponseHandler runHandler = verifyTxRunInvoked( connection ); + assertFalse( cursorFuture.isDone() ); + + if ( success ) + { + runHandler.onSuccess( emptyMap() ); + } + else + { + // When responded with a failure + runHandler.onFailure( new RuntimeException() ); + } + + // Then + assertTrue( cursorFuture.isDone() ); + assertNotNull( cursorFuture.get() ); + } + + private void testRunWithoutWaitingForRunResponse( boolean autoCommitTx, TransactionConfig config, AccessMode mode ) throws Exception + { + // Given + Connection connection = connectionMock( mode, protocol ); + Bookmark initialBookmark = InternalBookmark.parse( "neo4j:bookmark:v1:tx987" ); + + CompletionStage cursorStage; + if ( autoCommitTx ) + { + BookmarkHolder bookmarkHolder = new DefaultBookmarkHolder( initialBookmark ); + cursorStage = protocol.runInAutoCommitTransaction( connection, QUERY, bookmarkHolder, config, false, UNLIMITED_FETCH_SIZE ) + .asyncResult(); + } + else + { + cursorStage = protocol.runInUnmanagedTransaction( connection, QUERY, mock( UnmanagedTransaction.class ), false, UNLIMITED_FETCH_SIZE ) + .asyncResult(); + } + + // When I complete it immediately without waiting for any responses to run message + CompletableFuture cursorFuture = cursorStage.toCompletableFuture(); + assertTrue( cursorFuture.isDone() ); + assertNotNull( cursorFuture.get() ); + + // Then + if ( autoCommitTx ) + { + verifySessionRunInvoked( connection, initialBookmark, config, mode, defaultDatabase() ); + } + else + { + verifyTxRunInvoked( connection ); + } + } + + private void testDatabaseNameSupport( boolean autoCommitTx ) + { + Connection connection = connectionMock( "foo", protocol ); + if ( autoCommitTx ) + { + ResultCursorFactory factory = + protocol.runInAutoCommitTransaction( connection, QUERY, BookmarkHolder.NO_OP, TransactionConfig.empty(), false, UNLIMITED_FETCH_SIZE ); + await( factory.asyncResult() ); + verifySessionRunInvoked( connection, InternalBookmark.empty(), TransactionConfig.empty(), AccessMode.WRITE, database( "foo" ) ); + } + else + { + CompletionStage txStage = protocol.beginTransaction( connection, InternalBookmark.empty(), TransactionConfig.empty() ); + await( txStage ); + verifyBeginInvoked( connection, InternalBookmark.empty(), TransactionConfig.empty(), AccessMode.WRITE, database( "foo" ) ); + } + } + + private ResponseHandler verifyTxRunInvoked( Connection connection ) + { + return verifyRunInvoked( connection, RunWithMetadataMessage.unmanagedTxRunMessage( QUERY ) ); + } + + private ResponseHandler verifySessionRunInvoked( Connection connection, Bookmark bookmark, TransactionConfig config, AccessMode mode, + DatabaseName databaseName ) + { + RunWithMetadataMessage runMessage = RunWithMetadataMessage.autoCommitTxRunMessage( QUERY, config, databaseName, mode, bookmark ); + return verifyRunInvoked( connection, runMessage ); + } + + private ResponseHandler verifyRunInvoked( Connection connection, RunWithMetadataMessage runMessage ) + { + ArgumentCaptor runHandlerCaptor = ArgumentCaptor.forClass( ResponseHandler.class ); + ArgumentCaptor pullHandlerCaptor = ArgumentCaptor.forClass( ResponseHandler.class ); + + verify( connection ).write( eq( runMessage ), runHandlerCaptor.capture() ); + verify( connection ).writeAndFlush( any( PullMessage.class ), pullHandlerCaptor.capture() ); + + assertThat( runHandlerCaptor.getValue(), instanceOf( RunResponseHandler.class ) ); + assertThat( pullHandlerCaptor.getValue(), instanceOf( PullAllResponseHandler.class ) ); + + return runHandlerCaptor.getValue(); + } + + private void verifyBeginInvoked( Connection connection, Bookmark bookmark, TransactionConfig config, AccessMode mode, DatabaseName databaseName ) + { + ArgumentCaptor beginHandlerCaptor = ArgumentCaptor.forClass( ResponseHandler.class ); + BeginMessage beginMessage = new BeginMessage( bookmark, config, databaseName, mode ); + + if ( bookmark.isEmpty() ) + { + verify( connection ).write( eq( beginMessage ), eq( NoOpResponseHandler.INSTANCE ) ); + } + else + { + verify( connection ).write( eq( beginMessage ), beginHandlerCaptor.capture() ); + assertThat( beginHandlerCaptor.getValue(), instanceOf( BeginTxResponseHandler.class ) ); + } + } + + private static InternalAuthToken dummyAuthToken() + { + return (InternalAuthToken) AuthTokens.basic( "hello", "world" ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/v43/MessageFormatV43Test.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/v43/MessageFormatV43Test.java new file mode 100644 index 0000000000..b8186378f8 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/v43/MessageFormatV43Test.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.messaging.v43; + +import org.junit.jupiter.api.Test; + +import org.neo4j.driver.internal.messaging.MessageFormat; +import org.neo4j.driver.internal.messaging.common.CommonMessageReader; +import org.neo4j.driver.internal.packstream.PackInput; +import org.neo4j.driver.internal.packstream.PackOutput; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Mockito.mock; + +/** + * The MessageFormat under tests is the one provided by the {@link BoltProtocolV43} and not an specific class implementation. + *

+ * It's done on this way to make easy to replace the implementation and still getting the same behaviour. + */ +class MessageFormatV43Test +{ + private static final MessageFormat format = BoltProtocolV43.INSTANCE.createMessageFormat(); + + @Test + void shouldCreateCorrectWriter() + { + MessageFormat.Writer writer = format.newWriter( mock( PackOutput.class ) ); + + assertThat( writer, instanceOf( MessageWriterV43.class ) ); + } + + @Test + void shouldCreateCorrectReader() + { + MessageFormat.Reader reader = format.newReader( mock( PackInput.class ) ); + + assertThat( reader, instanceOf( CommonMessageReader.class ) ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/v43/MessageReaderV43Test.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/v43/MessageReaderV43Test.java new file mode 100644 index 0000000000..7499c6cbfb --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/v43/MessageReaderV43Test.java @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.messaging.v43; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.HashMap; +import java.util.stream.Stream; + +import org.neo4j.driver.Value; +import org.neo4j.driver.Values; +import org.neo4j.driver.internal.InternalPoint2D; +import org.neo4j.driver.internal.InternalPoint3D; +import org.neo4j.driver.internal.messaging.Message; +import org.neo4j.driver.internal.messaging.MessageFormat; +import org.neo4j.driver.internal.messaging.request.DiscardAllMessage; +import org.neo4j.driver.internal.messaging.request.RunMessage; +import org.neo4j.driver.internal.messaging.response.FailureMessage; +import org.neo4j.driver.internal.messaging.response.IgnoredMessage; +import org.neo4j.driver.internal.messaging.response.RecordMessage; +import org.neo4j.driver.internal.messaging.response.SuccessMessage; +import org.neo4j.driver.internal.messaging.v42.BoltProtocolV42; +import org.neo4j.driver.internal.packstream.PackInput; +import org.neo4j.driver.internal.util.messaging.AbstractMessageReaderTestBase; + +import static java.util.Arrays.asList; +import static java.util.Calendar.APRIL; +import static java.util.Calendar.AUGUST; +import static org.neo4j.driver.Values.parameters; +import static org.neo4j.driver.Values.value; +import static org.neo4j.driver.internal.util.ValueFactory.emptyNodeValue; +import static org.neo4j.driver.internal.util.ValueFactory.emptyPathValue; +import static org.neo4j.driver.internal.util.ValueFactory.emptyRelationshipValue; +import static org.neo4j.driver.internal.util.ValueFactory.filledNodeValue; +import static org.neo4j.driver.internal.util.ValueFactory.filledPathValue; +import static org.neo4j.driver.internal.util.ValueFactory.filledRelationshipValue; + +/** + * The MessageReader under tests is the one provided by the {@link BoltProtocolV43} and not an specific class implementation. + *

+ * It's done on this way to make easy to replace the implementation and still getting the same behaviour. + */ +public class MessageReaderV43Test extends AbstractMessageReaderTestBase +{ + + @Override + protected Stream supportedMessages() + { + return Stream.of( + // V2 Record types + record( value( new InternalPoint2D( 42, 120.65, -99.2 ) ) ), + record( value( new InternalPoint3D( 42, 85.391, 98.8, 11.1 ) ) ), + record( value( LocalDate.of( 2012, AUGUST, 3 ) ) ), + record( value( OffsetTime.of( 23, 59, 59, 999, ZoneOffset.MAX ) ) ), + record( value( LocalTime.of( 12, 25 ) ) ), + record( value( LocalDateTime.of( 1999, APRIL, 3, 19, 5, 5, 100_200_300 ) ) ), + record( value( ZonedDateTime.of( 1823, 1, 12, 23, 59, 59, 999_999_999, ZoneOffset.ofHoursMinutes( -7, -15 ) ) ) ), + record( value( ZonedDateTime.of( 1823, 1, 12, 23, 59, 59, 999_999_999, ZoneId.of( "Europe/Stockholm" ) ) ) ), + record( value( Values.isoDuration( Long.MAX_VALUE - 1, Integer.MAX_VALUE - 1, Short.MAX_VALUE - 1, Byte.MAX_VALUE - 1 ).asIsoDuration() ) ), + record( value( Values.isoDuration( 17, 22, 99, 15 ).asIsoDuration() ) ), + + // Bolt previous versions valid messages + new FailureMessage( "Hello", "World!" ), + IgnoredMessage.IGNORED, + new SuccessMessage( new HashMap<>() ), + record( value( 1337L ) ), + record( value( parameters( "cat", null, "dog", null ) ) ), + record( value( parameters( "k", 12, "a", "banana" ) ) ), + record( value( asList( "k", 12, "a", "banana" ) ) ), + + // V3 Record Types + record( emptyNodeValue() ), + record( filledNodeValue() ), + record( emptyRelationshipValue() ), + record( filledRelationshipValue() ), + record( filledPathValue() ), + record( emptyPathValue() ) + ); + } + + @Override + protected Stream unsupportedMessages() + { + return Stream.of( + DiscardAllMessage.DISCARD_ALL, + new RunMessage( "RETURN 42" ) + ); + } + + @Override + protected MessageFormat.Reader newReader( PackInput input ) + { + return BoltProtocolV42.INSTANCE.createMessageFormat().newReader( input ); + } + + private Message record( Value value ) + { + return new RecordMessage( new Value[]{value} ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/internal/messaging/v43/MessageWriterV43Test.java b/driver/src/test/java/org/neo4j/driver/internal/messaging/v43/MessageWriterV43Test.java new file mode 100644 index 0000000000..ac836a9218 --- /dev/null +++ b/driver/src/test/java/org/neo4j/driver/internal/messaging/v43/MessageWriterV43Test.java @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2002-2020 "Neo4j," + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.neo4j.driver.internal.messaging.v43; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; + +import org.neo4j.driver.Query; +import org.neo4j.driver.Value; +import org.neo4j.driver.Values; +import org.neo4j.driver.internal.InternalBookmark; +import org.neo4j.driver.internal.messaging.Message; +import org.neo4j.driver.internal.messaging.MessageFormat; +import org.neo4j.driver.internal.messaging.request.BeginMessage; +import org.neo4j.driver.internal.messaging.request.DiscardMessage; +import org.neo4j.driver.internal.messaging.request.HelloMessage; +import org.neo4j.driver.internal.messaging.request.InitMessage; +import org.neo4j.driver.internal.messaging.request.PullMessage; +import org.neo4j.driver.internal.messaging.request.RouteMessage; +import org.neo4j.driver.internal.messaging.request.RunMessage; +import org.neo4j.driver.internal.packstream.PackOutput; +import org.neo4j.driver.internal.security.InternalAuthToken; +import org.neo4j.driver.internal.util.messaging.AbstractMessageWriterTestBase; + +import static java.time.Duration.ofSeconds; +import static java.util.Calendar.DECEMBER; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; +import static org.neo4j.driver.AccessMode.READ; +import static org.neo4j.driver.AccessMode.WRITE; +import static org.neo4j.driver.AuthTokens.basic; +import static org.neo4j.driver.Values.point; +import static org.neo4j.driver.Values.value; +import static org.neo4j.driver.internal.DatabaseNameUtil.database; +import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase; +import static org.neo4j.driver.internal.messaging.request.CommitMessage.COMMIT; +import static org.neo4j.driver.internal.messaging.request.DiscardAllMessage.DISCARD_ALL; +import static org.neo4j.driver.internal.messaging.request.GoodbyeMessage.GOODBYE; +import static org.neo4j.driver.internal.messaging.request.PullAllMessage.PULL_ALL; +import static org.neo4j.driver.internal.messaging.request.ResetMessage.RESET; +import static org.neo4j.driver.internal.messaging.request.RollbackMessage.ROLLBACK; +import static org.neo4j.driver.internal.messaging.request.RunWithMetadataMessage.autoCommitTxRunMessage; +import static org.neo4j.driver.internal.messaging.request.RunWithMetadataMessage.unmanagedTxRunMessage; + +/** + * The MessageWriter under tests is the one provided by the {@link BoltProtocolV43} and not an specific class implementation. + *

+ * It's done on this way to make easy to replace the implementation and still getting the same behaviour. + */ +class MessageWriterV43Test extends AbstractMessageWriterTestBase +{ + @Override + protected MessageFormat.Writer newWriter( PackOutput output ) + { + return BoltProtocolV43.INSTANCE.createMessageFormat().newWriter( output ); + } + + @Override + protected Stream supportedMessages() + { + return Stream.of( + // Bolt V2 Data Types + unmanagedTxRunMessage( new Query( "RETURN $point", singletonMap( "point", point( 42, 12.99, -180.0 ) ) ) ), + unmanagedTxRunMessage( new Query( "RETURN $point", singletonMap( "point", point( 42, 0.51, 2.99, 100.123 ) ) ) ), + unmanagedTxRunMessage( new Query( "RETURN $date", singletonMap( "date", value( LocalDate.ofEpochDay( 2147483650L ) ) ) ) ), + unmanagedTxRunMessage( new Query( "RETURN $time", singletonMap( "time", value( OffsetTime.of( 4, 16, 20, 999, ZoneOffset.MIN ) ) ) ) ), + unmanagedTxRunMessage( new Query( "RETURN $time", singletonMap( "time", value( LocalTime.of( 12, 9, 18, 999_888 ) ) ) ) ), + unmanagedTxRunMessage( + new Query( "RETURN $dateTime", singletonMap( "dateTime", value( LocalDateTime.of( 2049, DECEMBER, 12, 17, 25, 49, 199 ) ) ) ) ), + unmanagedTxRunMessage( new Query( "RETURN $dateTime", singletonMap( "dateTime", value( ZonedDateTime.of( 2000, 1, 10, 12, 2, 49, 300, ZoneOffset + .ofHoursMinutes( 9, 30 ) ) ) ) ) ), + unmanagedTxRunMessage( new Query( "RETURN $dateTime", singletonMap( "dateTime", value( ZonedDateTime.of( 2000, 1, 10, 12, 2, 49, 300, ZoneId.of( + "Europe/Stockholm" ) ) ) ) ) ), + + // New Bolt V4 messages + new PullMessage( 100, 200 ), + new DiscardMessage( 300, 400 ), + + // Bolt V3 messages + new HelloMessage( "MyDriver/1.2.3", ((InternalAuthToken) basic( "neo4j", "neo4j" )).toMap(), Collections.emptyMap() ), + GOODBYE, + new BeginMessage( InternalBookmark.parse( "neo4j:bookmark:v1:tx123" ), ofSeconds( 5 ), singletonMap( "key", value( 42 ) ), READ, + defaultDatabase() ), + new BeginMessage( InternalBookmark.parse( "neo4j:bookmark:v1:tx123" ), ofSeconds( 5 ), singletonMap( "key", value( 42 ) ), WRITE, + database( "foo" ) ), + COMMIT, + ROLLBACK, + + RESET, + autoCommitTxRunMessage( new Query( "RETURN 1" ), ofSeconds( 5 ), singletonMap( "key", value( 42 ) ), defaultDatabase(), READ, + InternalBookmark.parse( "neo4j:bookmark:v1:tx1" ) ), + autoCommitTxRunMessage( new Query( "RETURN 1" ), ofSeconds( 5 ), singletonMap( "key", value( 42 ) ), database( "foo" ), WRITE, + InternalBookmark.parse( "neo4j:bookmark:v1:tx1" ) ), + unmanagedTxRunMessage( new Query( "RETURN 1" ) ), + + // Bolt V3 messages with struct values + autoCommitTxRunMessage( new Query( "RETURN $x", singletonMap( "x", value( ZonedDateTime.now() ) ) ), ofSeconds( 1 ), emptyMap(), + defaultDatabase(), READ, InternalBookmark.empty() ), + autoCommitTxRunMessage( new Query( "RETURN $x", singletonMap( "x", value( ZonedDateTime.now() ) ) ), ofSeconds( 1 ), emptyMap(), + database( "foo" ), + WRITE, InternalBookmark.empty() ), + unmanagedTxRunMessage( new Query( "RETURN $x", singletonMap( "x", point( 42, 1, 2, 3 ) ) ) ), + + // New 4.3 Messages + routeMessage() + ); + } + + @Override + protected Stream unsupportedMessages() + { + return Stream.of( + // Bolt V1, V2 and V3 messages + new InitMessage( "Apa", emptyMap() ), + new RunMessage( "RETURN 1" ), + PULL_ALL, + DISCARD_ALL + ); + } + + private RouteMessage routeMessage() + { + Map routeContext = new HashMap<>(); + routeContext.put( "someContext", Values.value( 124 ) ); + return new RouteMessage( routeContext, "dbName" ); + } +} diff --git a/driver/src/test/java/org/neo4j/driver/util/TestUtil.java b/driver/src/test/java/org/neo4j/driver/util/TestUtil.java index 3620b4f4bd..6b59c9c12c 100644 --- a/driver/src/test/java/org/neo4j/driver/util/TestUtil.java +++ b/driver/src/test/java/org/neo4j/driver/util/TestUtil.java @@ -71,6 +71,7 @@ import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4; import org.neo4j.driver.internal.messaging.v41.BoltProtocolV41; import org.neo4j.driver.internal.messaging.v42.BoltProtocolV42; +import org.neo4j.driver.internal.messaging.v43.BoltProtocolV43; import org.neo4j.driver.internal.retry.RetryLogic; import org.neo4j.driver.internal.spi.Connection; import org.neo4j.driver.internal.spi.ConnectionProvider; @@ -525,7 +526,8 @@ public static Connection connectionMock( String databaseName, AccessMode mode, B when( connection.databaseName() ).thenReturn( database( databaseName ) ); BoltProtocolVersion version = protocol.version(); if ( version.equals( BoltProtocolV3.VERSION ) || version.equals( BoltProtocolV4.VERSION ) || - version.equals( BoltProtocolV41.VERSION ) || version.equals( BoltProtocolV42.VERSION ) ) + version.equals( BoltProtocolV41.VERSION ) || version.equals( BoltProtocolV42.VERSION ) || + version.equals( BoltProtocolV43.VERSION ) ) { setupSuccessResponse( connection, CommitMessage.class ); setupSuccessResponse( connection, RollbackMessage.class );