Skip to content

Creating protocol 4.3 and uses the Route Message to get the routing table #770

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.neo4j.driver.internal.messaging.v4.BoltProtocolV4;
import org.neo4j.driver.internal.messaging.v41.BoltProtocolV41;
import org.neo4j.driver.internal.messaging.v42.BoltProtocolV42;
import org.neo4j.driver.internal.messaging.v43.BoltProtocolV43;

import static io.netty.buffer.Unpooled.copyInt;
import static io.netty.buffer.Unpooled.unreleasableBuffer;
Expand All @@ -41,9 +42,9 @@ public final class BoltProtocolUtil

private static final ByteBuf HANDSHAKE_BUF = unreleasableBuffer( copyInt(
BOLT_MAGIC_PREAMBLE,
BoltProtocolV43.VERSION.toInt(),
BoltProtocolV42.VERSION.toInt(),
BoltProtocolV41.VERSION.toInt(),
BoltProtocolV4.VERSION.toInt(),
BoltProtocolV3.VERSION.toInt() ) ).asReadOnly();

private static final String HANDSHAKE_STRING = createHandshakeString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@
import static org.neo4j.driver.Values.value;
import static org.neo4j.driver.internal.DatabaseNameUtil.systemDatabase;

public class MultiDatabasesRoutingProcedureRunner extends RoutingProcedureRunner

/**
* This implementation of the {@link RoutingProcedureRunner} works with multi database versions of Neo4j calling
* the procedure `dbms.routing.getRoutingTable`
*/
public class MultiDatabasesRoutingProcedureRunner extends SingleDatabaseRoutingProcedureRunner
{
static final String DATABASE_NAME = "database";
static final String MULTI_DB_GET_ROUTING_TABLE = String.format( "CALL dbms.routing.getRoutingTable($%s, $%s)", ROUTING_CONTEXT, DATABASE_NAME );
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright (c) 2002-2020 "Neo4j,"
* Neo4j Sweden AB [http://neo4j.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.cluster;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.Value;
import org.neo4j.driver.Values;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.InternalRecord;
import org.neo4j.driver.internal.async.connection.DirectConnection;
import org.neo4j.driver.internal.handlers.RouteMessageResponseHandler;
import org.neo4j.driver.internal.messaging.request.RouteMessage;
import org.neo4j.driver.internal.spi.Connection;

import static java.util.Collections.singletonList;

/**
* This implementation of the {@link RoutingProcedureRunner} access the routing procedure
* through the bolt's ROUTE message.
*/
public class RouteMessageRoutingProcedureRunner implements RoutingProcedureRunner
{
private final Map<String,Value> routingContext;
private final Supplier<CompletableFuture<Map<String,Value>>> createCompletableFuture;

public RouteMessageRoutingProcedureRunner( RoutingContext routingContext )
{
this( routingContext, CompletableFuture::new );
}

protected RouteMessageRoutingProcedureRunner( RoutingContext routingContext, Supplier<CompletableFuture<Map<String,Value>>> createCompletableFuture )
{
this.routingContext = routingContext
.toMap()
.entrySet()
.stream()
.collect( Collectors.toMap( Map.Entry::getKey, entry -> Values.value( entry.getValue() ) ) );
this.createCompletableFuture = createCompletableFuture;
}

@Override
public CompletionStage<RoutingProcedureResponse> run( Connection connection, DatabaseName databaseName, Bookmark bookmark )
{
CompletableFuture<Map<String,Value>> completableFuture = createCompletableFuture.get();

DirectConnection directConnection = toDirectConnection( connection, databaseName );
directConnection.writeAndFlush( new RouteMessage( routingContext, databaseName.databaseName().orElse( null ) ),
new RouteMessageResponseHandler( completableFuture ) );
return completableFuture
.thenApply( routingTable -> new RoutingProcedureResponse( getQuery( databaseName ), singletonList( toRecord( routingTable ) ) ) )
.exceptionally( throwable -> new RoutingProcedureResponse( getQuery( databaseName ), throwable.getCause() ) )
.thenCompose( routingProcedureResponse -> directConnection.release().thenApply( ignore -> routingProcedureResponse ) );
}

private Record toRecord( Map<String,Value> routingTable )
{
return new InternalRecord( new ArrayList<>( routingTable.keySet() ), routingTable.values().toArray( new Value[0] ) );
}

private DirectConnection toDirectConnection( Connection connection, DatabaseName databaseName )
{
return new DirectConnection( connection, databaseName, AccessMode.READ );
}

private Query getQuery( DatabaseName databaseName )
{
Map<String,Object> params = new HashMap<>();
params.put( "routingContext", routingContext );
params.put( "databaseName", databaseName.databaseName().orElse( null ) );
return new Query( "ROUTE $routingContext $databaseName", params );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,43 +33,53 @@

import static java.lang.String.format;
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.supportsMultiDatabase;
import static org.neo4j.driver.internal.messaging.request.MultiDatabaseUtil.supportsRouteMessage;

public class RoutingProcedureClusterCompositionProvider implements ClusterCompositionProvider
{
private static final String PROTOCOL_ERROR_MESSAGE = "Failed to parse '%s' result received from server due to ";

private final Clock clock;
private final RoutingProcedureRunner routingProcedureRunner;
private final RoutingProcedureRunner singleDatabaseRoutingProcedureRunner;
private final RoutingProcedureRunner multiDatabaseRoutingProcedureRunner;
private final RoutingProcedureRunner routeMessageRoutingProcedureRunner;

public RoutingProcedureClusterCompositionProvider( Clock clock, RoutingContext routingContext )
{
this( clock, new RoutingProcedureRunner( routingContext ), new MultiDatabasesRoutingProcedureRunner( routingContext ) );
this( clock, new SingleDatabaseRoutingProcedureRunner( routingContext ), new MultiDatabasesRoutingProcedureRunner( routingContext ),
new RouteMessageRoutingProcedureRunner( routingContext ) );
}

RoutingProcedureClusterCompositionProvider( Clock clock, RoutingProcedureRunner routingProcedureRunner,
MultiDatabasesRoutingProcedureRunner multiDatabaseRoutingProcedureRunner )
RoutingProcedureClusterCompositionProvider( Clock clock, SingleDatabaseRoutingProcedureRunner singleDatabaseRoutingProcedureRunner,
MultiDatabasesRoutingProcedureRunner multiDatabaseRoutingProcedureRunner,
RouteMessageRoutingProcedureRunner routeMessageRoutingProcedureRunner )
{
this.clock = clock;
this.routingProcedureRunner = routingProcedureRunner;
this.singleDatabaseRoutingProcedureRunner = singleDatabaseRoutingProcedureRunner;
this.multiDatabaseRoutingProcedureRunner = multiDatabaseRoutingProcedureRunner;
this.routeMessageRoutingProcedureRunner = routeMessageRoutingProcedureRunner;
}

@Override
public CompletionStage<ClusterComposition> getClusterComposition( Connection connection, DatabaseName databaseName, Bookmark bookmark )
{
RoutingProcedureRunner runner;
if ( supportsMultiDatabase( connection ) )

if ( supportsRouteMessage( connection ) )
{
runner = routeMessageRoutingProcedureRunner;
}
else if ( supportsMultiDatabase( connection ) )
{
runner = multiDatabaseRoutingProcedureRunner;
}
else
{
runner = routingProcedureRunner;
runner = singleDatabaseRoutingProcedureRunner;
}

return runner.run( connection, databaseName, bookmark )
.thenApply( this::processRoutingResponse );
.thenApply( this::processRoutingResponse );
}

private ClusterComposition processRoutingResponse( RoutingProcedureResponse response )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,112 +18,24 @@
*/
package org.neo4j.driver.internal.cluster;

import java.util.List;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;

import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.async.ResultCursor;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.FatalDiscoveryException;
import org.neo4j.driver.internal.BookmarkHolder;
import org.neo4j.driver.internal.DatabaseName;
import org.neo4j.driver.internal.async.connection.DirectConnection;
import org.neo4j.driver.internal.spi.Connection;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.internal.util.ServerVersion;

import static org.neo4j.driver.Values.parameters;
import static org.neo4j.driver.internal.DatabaseNameUtil.defaultDatabase;
import static org.neo4j.driver.internal.handlers.pulln.FetchSizeUtil.UNLIMITED_FETCH_SIZE;

public class RoutingProcedureRunner
/**
* Interface which defines the standard way to get the routing table
*/
public interface RoutingProcedureRunner
{
static final String ROUTING_CONTEXT = "context";
static final String GET_ROUTING_TABLE = "CALL dbms.cluster.routing.getRoutingTable($" + ROUTING_CONTEXT + ")";

final RoutingContext context;

public RoutingProcedureRunner( RoutingContext context )
{
this.context = context;
}

public CompletionStage<RoutingProcedureResponse> run( Connection connection, DatabaseName databaseName, Bookmark bookmark )
{
DirectConnection delegate = connection( connection );
Query procedure = procedureQuery( connection.serverVersion(), databaseName );
BookmarkHolder bookmarkHolder = bookmarkHolder( bookmark );
return runProcedure( delegate, procedure, bookmarkHolder )
.thenCompose( records -> releaseConnection( delegate, records ) )
.handle( ( records, error ) -> processProcedureResponse( procedure, records, error ) );
}

DirectConnection connection( Connection connection )
{
return new DirectConnection( connection, defaultDatabase(), AccessMode.WRITE );
}

Query procedureQuery(ServerVersion serverVersion, DatabaseName databaseName )
{
if ( databaseName.databaseName().isPresent() )
{
throw new FatalDiscoveryException( String.format(
"Refreshing routing table for multi-databases is not supported in server version lower than 4.0. " +
"Current server version: %s. Database name: '%s'", serverVersion, databaseName.description() ) );
}
return new Query( GET_ROUTING_TABLE, parameters( ROUTING_CONTEXT, context.toMap() ) );
}

BookmarkHolder bookmarkHolder( Bookmark ignored )
{
return BookmarkHolder.NO_OP;
}

CompletionStage<List<Record>> runProcedure(Connection connection, Query procedure, BookmarkHolder bookmarkHolder )
{
return connection.protocol()
.runInAutoCommitTransaction( connection, procedure, bookmarkHolder, TransactionConfig.empty(), true, UNLIMITED_FETCH_SIZE )
.asyncResult().thenCompose( ResultCursor::listAsync );
}

private CompletionStage<List<Record>> releaseConnection( Connection connection, List<Record> records )
{
// It is not strictly required to release connection after routing procedure invocation because it'll
// be released by the PULL_ALL response handler after result is fully fetched. Such release will happen
// in background. However, releasing it early as part of whole chain makes it easier to reason about
// rediscovery in stub server tests. Some of them assume connections to instances not present in new
// routing table will be closed immediately.
return connection.release().thenApply( ignore -> records );
}

private static RoutingProcedureResponse processProcedureResponse(Query procedure, List<Record> records,
Throwable error )
{
Throwable cause = Futures.completionExceptionCause( error );
if ( cause != null )
{
return handleError( procedure, cause );
}
else
{
return new RoutingProcedureResponse( procedure, records );
}
}

private static RoutingProcedureResponse handleError(Query procedure, Throwable error )
{
if ( error instanceof ClientException )
{
return new RoutingProcedureResponse( procedure, error );
}
else
{
throw new CompletionException( error );
}
}
/**
* Run the calls to the server
*
* @param connection The connection which will be used to call the server
* @param databaseName The database name
* @param bookmark The bookmark used to query the routing information
* @return The routing table
*/
CompletionStage<RoutingProcedureResponse> run( Connection connection, DatabaseName databaseName, Bookmark bookmark );
}
Loading