Skip to content

Commit c945316

Browse files
committed
Creating protocol 4.3 and uses the Route Message to get the routing table
The objective of the commit/pr is create the infracture needed to support the version 4.3 of bolt protocol, including the handshake and the serialization and use of the ROUTE message to get the routing table. In manner to support the version 4.3 of the protocol, the handshake was adjusted and the version 4.0 was removed from it. Some adjustments was done into the RoutingProcedureRunner which now instead of be a base class with the implementation of the single database routing procedure runner, it's the interface used by the runners.
1 parent e275b22 commit c945316

30 files changed

+2124
-166
lines changed

driver/src/main/java/org/neo4j/driver/internal/async/connection/BoltProtocolUtil.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4;
2626
import org.neo4j.driver.internal.messaging.v41.BoltProtocolV41;
2727
import org.neo4j.driver.internal.messaging.v42.BoltProtocolV42;
28+
import org.neo4j.driver.internal.messaging.v43.BoltProtocolV43;
2829

2930
import static io.netty.buffer.Unpooled.copyInt;
3031
import static io.netty.buffer.Unpooled.unreleasableBuffer;
@@ -41,9 +42,9 @@ public final class BoltProtocolUtil
4142

4243
private static final ByteBuf HANDSHAKE_BUF = unreleasableBuffer( copyInt(
4344
BOLT_MAGIC_PREAMBLE,
45+
BoltProtocolV43.VERSION.toInt(),
4446
BoltProtocolV42.VERSION.toInt(),
4547
BoltProtocolV41.VERSION.toInt(),
46-
BoltProtocolV4.VERSION.toInt(),
4748
BoltProtocolV3.VERSION.toInt() ) ).asReadOnly();
4849

4950
private static final String HANDSHAKE_STRING = createHandshakeString();

driver/src/main/java/org/neo4j/driver/internal/cluster/MultiDatabasesRoutingProcedureRunner.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,12 @@
3434
import static org.neo4j.driver.Values.value;
3535
import static org.neo4j.driver.internal.DatabaseNameUtil.systemDatabase;
3636

37-
public class MultiDatabasesRoutingProcedureRunner extends RoutingProcedureRunner
37+
38+
/**
39+
* This implementation of the {@link RoutingProcedureRunner} works with multi database versions of Neo4j calling
40+
* the procedure `dbms.routing.getRoutingTable`
41+
*/
42+
public class MultiDatabasesRoutingProcedureRunner extends SingleDatabaseRoutingProcedureRunner
3843
{
3944
static final String DATABASE_NAME = "database";
4045
static final String MULTI_DB_GET_ROUTING_TABLE = String.format( "CALL dbms.routing.getRoutingTable($%s, $%s)", ROUTING_CONTEXT, DATABASE_NAME );
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* Copyright (c) 2002-2020 "Neo4j,"
3+
* Neo4j Sweden AB [http://neo4j.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.cluster;
20+
21+
import java.util.ArrayList;
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.CompletionStage;
26+
import java.util.function.Supplier;
27+
import java.util.stream.Collectors;
28+
29+
import org.neo4j.driver.AccessMode;
30+
import org.neo4j.driver.Bookmark;
31+
import org.neo4j.driver.Query;
32+
import org.neo4j.driver.Record;
33+
import org.neo4j.driver.Value;
34+
import org.neo4j.driver.Values;
35+
import org.neo4j.driver.internal.DatabaseName;
36+
import org.neo4j.driver.internal.InternalRecord;
37+
import org.neo4j.driver.internal.async.connection.DirectConnection;
38+
import org.neo4j.driver.internal.handlers.RouteMessageResponseHandler;
39+
import org.neo4j.driver.internal.messaging.request.RouteMessage;
40+
import org.neo4j.driver.internal.spi.Connection;
41+
42+
import static java.util.Collections.singletonList;
43+
44+
/**
45+
* This implementation of the {@link RoutingProcedureRunner} access the routing procedure
46+
* through the bolt's ROUTE message.
47+
*/
48+
public class RouteMessageRoutingProcedureRunner implements RoutingProcedureRunner
49+
{
50+
private final Map<String,Value> routingContext;
51+
private final Supplier<CompletableFuture<Map<String,Value>>> createCompletableFuture;
52+
53+
public RouteMessageRoutingProcedureRunner( RoutingContext routingContext )
54+
{
55+
this( routingContext, CompletableFuture::new );
56+
}
57+
58+
protected RouteMessageRoutingProcedureRunner( RoutingContext routingContext, Supplier<CompletableFuture<Map<String,Value>>> createCompletableFuture )
59+
{
60+
this.routingContext = routingContext
61+
.toMap()
62+
.entrySet()
63+
.stream()
64+
.collect( Collectors.toMap( Map.Entry::getKey, entry -> Values.value( entry.getValue() ) ) );
65+
this.createCompletableFuture = createCompletableFuture;
66+
}
67+
68+
@Override
69+
public CompletionStage<RoutingProcedureResponse> run( Connection connection, DatabaseName databaseName, Bookmark bookmark )
70+
{
71+
CompletableFuture<Map<String,Value>> completableFuture = createCompletableFuture.get();
72+
73+
DirectConnection directConnection = toDirectConnection( connection, databaseName );
74+
directConnection.writeAndFlush( new RouteMessage( routingContext, databaseName.databaseName().orElse( null ) ),
75+
new RouteMessageResponseHandler( completableFuture ) );
76+
return completableFuture
77+
.thenApply( routingTable -> new RoutingProcedureResponse( getQuery( databaseName ), singletonList( toRecord( routingTable ) ) ) )
78+
.exceptionally( throwable -> new RoutingProcedureResponse( getQuery( databaseName ), throwable.getCause() ) )
79+
.thenCompose( routingProcedureResponse -> directConnection.release().thenApply( ignore -> routingProcedureResponse ) );
80+
}
81+
82+
private Record toRecord( Map<String,Value> routingTable )
83+
{
84+
return new InternalRecord( new ArrayList<>( routingTable.keySet() ), routingTable.values().toArray( new Value[0] ) );
85+
}
86+
87+
private DirectConnection toDirectConnection( Connection connection, DatabaseName databaseName )
88+
{
89+
return new DirectConnection( connection, databaseName, AccessMode.READ );
90+
}
91+
92+
private Query getQuery( DatabaseName databaseName )
93+
{
94+
Map<String,Object> params = new HashMap<>();
95+
params.put( "routingContext", routingContext );
96+
params.put( "databaseName", databaseName.databaseName().orElse( null ) );
97+
return new Query( "ROUTE $routingContext $databaseName", params );
98+
}
99+
}

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureClusterCompositionProvider.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,43 +33,53 @@
3333

3434
import static java.lang.String.format;
3535
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.supportsMultiDatabase;
36+
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.supportsRouteMessage;
3637

3738
public class RoutingProcedureClusterCompositionProvider implements ClusterCompositionProvider
3839
{
3940
private static final String PROTOCOL_ERROR_MESSAGE = "Failed to parse '%s' result received from server due to ";
4041

4142
private final Clock clock;
42-
private final RoutingProcedureRunner routingProcedureRunner;
43+
private final RoutingProcedureRunner singleDatabaseRoutingProcedureRunner;
4344
private final RoutingProcedureRunner multiDatabaseRoutingProcedureRunner;
45+
private final RoutingProcedureRunner routeMessageRoutingProcedureRunner;
4446

4547
public RoutingProcedureClusterCompositionProvider( Clock clock, RoutingContext routingContext )
4648
{
47-
this( clock, new RoutingProcedureRunner( routingContext ), new MultiDatabasesRoutingProcedureRunner( routingContext ) );
49+
this( clock, new SingleDatabaseRoutingProcedureRunner( routingContext ), new MultiDatabasesRoutingProcedureRunner( routingContext ),
50+
new RouteMessageRoutingProcedureRunner( routingContext ) );
4851
}
4952

50-
RoutingProcedureClusterCompositionProvider( Clock clock, RoutingProcedureRunner routingProcedureRunner,
51-
MultiDatabasesRoutingProcedureRunner multiDatabaseRoutingProcedureRunner )
53+
RoutingProcedureClusterCompositionProvider( Clock clock, SingleDatabaseRoutingProcedureRunner singleDatabaseRoutingProcedureRunner,
54+
MultiDatabasesRoutingProcedureRunner multiDatabaseRoutingProcedureRunner,
55+
RouteMessageRoutingProcedureRunner routeMessageRoutingProcedureRunner )
5256
{
5357
this.clock = clock;
54-
this.routingProcedureRunner = routingProcedureRunner;
58+
this.singleDatabaseRoutingProcedureRunner = singleDatabaseRoutingProcedureRunner;
5559
this.multiDatabaseRoutingProcedureRunner = multiDatabaseRoutingProcedureRunner;
60+
this.routeMessageRoutingProcedureRunner = routeMessageRoutingProcedureRunner;
5661
}
5762

5863
@Override
5964
public CompletionStage<ClusterComposition> getClusterComposition( Connection connection, DatabaseName databaseName, Bookmark bookmark )
6065
{
6166
RoutingProcedureRunner runner;
62-
if ( supportsMultiDatabase( connection ) )
67+
68+
if ( supportsRouteMessage( connection ) )
69+
{
70+
runner = routeMessageRoutingProcedureRunner;
71+
}
72+
else if ( supportsMultiDatabase( connection ) )
6373
{
6474
runner = multiDatabaseRoutingProcedureRunner;
6575
}
6676
else
6777
{
68-
runner = routingProcedureRunner;
78+
runner = singleDatabaseRoutingProcedureRunner;
6979
}
7080

7181
return runner.run( connection, databaseName, bookmark )
72-
.thenApply( this::processRoutingResponse );
82+
.thenApply( this::processRoutingResponse );
7383
}
7484

7585
private ClusterComposition processRoutingResponse( RoutingProcedureResponse response )

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingProcedureRunner.java

Lines changed: 13 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -18,112 +18,24 @@
1818
*/
1919
package org.neo4j.driver.internal.cluster;
2020

21-
import java.util.List;
22-
import java.util.concurrent.CompletionException;
2321
import java.util.concurrent.CompletionStage;
2422

25-
import org.neo4j.driver.AccessMode;
2623
import org.neo4j.driver.Bookmark;
27-
import org.neo4j.driver.Query;
28-
import org.neo4j.driver.Record;
29-
import org.neo4j.driver.TransactionConfig;
30-
import org.neo4j.driver.async.ResultCursor;
31-
import org.neo4j.driver.exceptions.ClientException;
32-
import org.neo4j.driver.exceptions.FatalDiscoveryException;
33-
import org.neo4j.driver.internal.BookmarkHolder;
3424
import org.neo4j.driver.internal.DatabaseName;
35-
import org.neo4j.driver.internal.async.connection.DirectConnection;
3625
import org.neo4j.driver.internal.spi.Connection;
37-
import org.neo4j.driver.internal.util.Futures;
38-
import org.neo4j.driver.internal.util.ServerVersion;
3926

40-
import static org.neo4j.driver.Values.parameters;
41-
import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase;
42-
import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.UNLIMITED_FETCH_SIZE;
43-
44-
public class RoutingProcedureRunner
27+
/**
28+
* Interface which defines the standard way to get the routing table
29+
*/
30+
public interface RoutingProcedureRunner
4531
{
46-
static final String ROUTING_CONTEXT = "context";
47-
static final String GET_ROUTING_TABLE = "CALL dbms.cluster.routing.getRoutingTable($" + ROUTING_CONTEXT + ")";
48-
49-
final RoutingContext context;
50-
51-
public RoutingProcedureRunner( RoutingContext context )
52-
{
53-
this.context = context;
54-
}
55-
56-
public CompletionStage<RoutingProcedureResponse> run( Connection connection, DatabaseName databaseName, Bookmark bookmark )
57-
{
58-
DirectConnection delegate = connection( connection );
59-
Query procedure = procedureQuery( connection.serverVersion(), databaseName );
60-
BookmarkHolder bookmarkHolder = bookmarkHolder( bookmark );
61-
return runProcedure( delegate, procedure, bookmarkHolder )
62-
.thenCompose( records -> releaseConnection( delegate, records ) )
63-
.handle( ( records, error ) -> processProcedureResponse( procedure, records, error ) );
64-
}
65-
66-
DirectConnection connection( Connection connection )
67-
{
68-
return new DirectConnection( connection, defaultDatabase(), AccessMode.WRITE );
69-
}
70-
71-
Query procedureQuery(ServerVersion serverVersion, DatabaseName databaseName )
72-
{
73-
if ( databaseName.databaseName().isPresent() )
74-
{
75-
throw new FatalDiscoveryException( String.format(
76-
"Refreshing routing table for multi-databases is not supported in server version lower than 4.0. " +
77-
"Current server version: %s. Database name: '%s'", serverVersion, databaseName.description() ) );
78-
}
79-
return new Query( GET_ROUTING_TABLE, parameters( ROUTING_CONTEXT, context.toMap() ) );
80-
}
81-
82-
BookmarkHolder bookmarkHolder( Bookmark ignored )
83-
{
84-
return BookmarkHolder.NO_OP;
85-
}
86-
87-
CompletionStage<List<Record>> runProcedure(Connection connection, Query procedure, BookmarkHolder bookmarkHolder )
88-
{
89-
return connection.protocol()
90-
.runInAutoCommitTransaction( connection, procedure, bookmarkHolder, TransactionConfig.empty(), true, UNLIMITED_FETCH_SIZE )
91-
.asyncResult().thenCompose( ResultCursor::listAsync );
92-
}
93-
94-
private CompletionStage<List<Record>> releaseConnection( Connection connection, List<Record> records )
95-
{
96-
// It is not strictly required to release connection after routing procedure invocation because it'll
97-
// be released by the PULL_ALL response handler after result is fully fetched. Such release will happen
98-
// in background. However, releasing it early as part of whole chain makes it easier to reason about
99-
// rediscovery in stub server tests. Some of them assume connections to instances not present in new
100-
// routing table will be closed immediately.
101-
return connection.release().thenApply( ignore -> records );
102-
}
103-
104-
private static RoutingProcedureResponse processProcedureResponse(Query procedure, List<Record> records,
105-
Throwable error )
106-
{
107-
Throwable cause = Futures.completionExceptionCause( error );
108-
if ( cause != null )
109-
{
110-
return handleError( procedure, cause );
111-
}
112-
else
113-
{
114-
return new RoutingProcedureResponse( procedure, records );
115-
}
116-
}
117-
118-
private static RoutingProcedureResponse handleError(Query procedure, Throwable error )
119-
{
120-
if ( error instanceof ClientException )
121-
{
122-
return new RoutingProcedureResponse( procedure, error );
123-
}
124-
else
125-
{
126-
throw new CompletionException( error );
127-
}
128-
}
32+
/**
33+
* Run the calls to the server
34+
*
35+
* @param connection The connection which will be used to call the server
36+
* @param databaseName The database name
37+
* @param bookmark The bookmark used to query the routing information
38+
* @return The routing table
39+
*/
40+
CompletionStage<RoutingProcedureResponse> run( Connection connection, DatabaseName databaseName, Bookmark bookmark );
12941
}

0 commit comments

Comments
 (0)