diff --git a/testkit-backend/pom.xml b/testkit-backend/pom.xml index 4e25d949d5..16add06f4c 100644 --- a/testkit-backend/pom.xml +++ b/testkit-backend/pom.xml @@ -25,6 +25,10 @@ neo4j-java-driver ${project.version} + + io.netty + netty-handler + com.fasterxml.jackson.core jackson-core diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncBackendServer.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncBackendServer.java new file mode 100644 index 0000000000..495624e5a0 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncBackendServer.java @@ -0,0 +1,63 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import neo4j.org.testkit.backend.channel.handler.TestkitMessageInboundHandler; +import neo4j.org.testkit.backend.channel.handler.TestkitMessageOutboundHandler; +import neo4j.org.testkit.backend.channel.handler.TestkitRequestProcessorHandler; +import neo4j.org.testkit.backend.channel.handler.TestkitRequestResponseMapperHandler; + +public class AsyncBackendServer +{ + public void run() throws InterruptedException + { + EventLoopGroup group = new NioEventLoopGroup(); + try + { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group( group ) + .channel( NioServerSocketChannel.class ) + .localAddress( 9876 ) + .childHandler( new ChannelInitializer() + { + @Override + protected void initChannel( SocketChannel channel ) + { + channel.pipeline().addLast( new TestkitMessageInboundHandler() ); + channel.pipeline().addLast( new TestkitMessageOutboundHandler() ); + channel.pipeline().addLast( new TestkitRequestResponseMapperHandler() ); + channel.pipeline().addLast( new TestkitRequestProcessorHandler() ); + } + } ); + ChannelFuture server = bootstrap.bind().sync(); + server.channel().closeFuture().sync(); + } + finally + { + group.shutdownGracefully().sync(); + } + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncSessionState.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncSessionState.java new file mode 100644 index 0000000000..2cde77bef6 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/AsyncSessionState.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend; + +import lombok.Getter; +import lombok.Setter; + +import java.util.concurrent.CompletableFuture; + +import org.neo4j.driver.async.AsyncSession; + +@Getter +@Setter +public class AsyncSessionState +{ + public AsyncSession session; + public CompletableFuture txWorkFuture; + + public AsyncSessionState( AsyncSession session ) + { + this.session = session; + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/BackendServer.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/BackendServer.java new file mode 100644 index 0000000000..d35f955705 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/BackendServer.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.UncheckedIOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.CompletableFuture; + +public class BackendServer +{ + public void run() throws IOException + { + ServerSocket serverSocket = new ServerSocket( 9876 ); + + System.out.println( "Java TestKit Backend Started on port: " + serverSocket.getLocalPort() ); + + while ( true ) + { + final Socket clientSocket = serverSocket.accept(); + CompletableFuture.runAsync( () -> handleClient( clientSocket ) ); + } + } + + private void handleClient( Socket clientSocket ) + { + try + { + System.out.println( "Handling connection from: " + clientSocket.getRemoteSocketAddress() ); + BufferedReader in = new BufferedReader( new InputStreamReader( clientSocket.getInputStream() ) ); + BufferedWriter out = new BufferedWriter( new OutputStreamWriter( clientSocket.getOutputStream() ) ); + CommandProcessor commandProcessor = new CommandProcessor( in, out ); + + boolean cont = true; + while ( cont ) + { + try + { + cont = commandProcessor.process(); + } + catch ( Exception e ) + { + e.printStackTrace(); + clientSocket.close(); + cont = false; + } + } + } + catch ( IOException ex ) + { + throw new UncheckedIOException( ex ); + } + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java index faf6b53df9..2c60fb8cd2 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/Runner.java @@ -18,58 +18,19 @@ */ package neo4j.org.testkit.backend; -import java.io.BufferedReader; -import java.io.BufferedWriter; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.io.UncheckedIOException; -import java.net.ServerSocket; -import java.net.Socket; -import java.util.concurrent.CompletableFuture; public class Runner { - public static void main( String[] args ) throws IOException + public static void main( String[] args ) throws IOException, InterruptedException { - ServerSocket serverSocket = new ServerSocket( 9876 ); - - System.out.println( "Java TestKit Backend Started on port: " + serverSocket.getLocalPort() ); - - while ( true ) - { - final Socket clientSocket = serverSocket.accept(); - CompletableFuture.runAsync( () -> handleClient( clientSocket ) ); - } - } - - private static void handleClient( Socket clientSocket ) - { - try + if ( args.length > 0 && args[0].equals( "async" ) ) { - System.out.println( "Handling connection from: " + clientSocket.getRemoteSocketAddress() ); - BufferedReader in = new BufferedReader( new InputStreamReader( clientSocket.getInputStream() ) ); - BufferedWriter out = new BufferedWriter( new OutputStreamWriter( clientSocket.getOutputStream() ) ); - CommandProcessor commandProcessor = new CommandProcessor( in, out ); - - boolean cont = true; - while ( cont ) - { - try - { - cont = commandProcessor.process(); - } - catch ( Exception e ) - { - e.printStackTrace(); - clientSocket.close(); - cont = false; - } - } + new AsyncBackendServer().run(); } - catch ( IOException ex ) + else { - throw new UncheckedIOException( ex ); + new BackendServer().run(); } } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java index b42c72cfd9..2d14850180 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/TestkitState.java @@ -31,6 +31,8 @@ import org.neo4j.driver.Driver; import org.neo4j.driver.Result; import org.neo4j.driver.Transaction; +import org.neo4j.driver.async.AsyncTransaction; +import org.neo4j.driver.async.ResultCursor; import org.neo4j.driver.exceptions.Neo4jException; import org.neo4j.driver.internal.cluster.RoutingTableRegistry; import org.neo4j.driver.net.ServerAddress; @@ -41,8 +43,11 @@ public class TestkitState private final Map drivers = new HashMap<>(); private final Map routingTableRegistry = new HashMap<>(); private final Map sessionStates = new HashMap<>(); + private final Map asyncSessionStates = new HashMap<>(); private final Map results = new HashMap<>(); + private final Map resultCursors = new HashMap<>(); private final Map transactions = new HashMap<>(); + private final Map asyncTransactions = new HashMap<>(); private final Map errors = new HashMap<>(); private int idGenerator = 0; private final Consumer responseWriter; diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitMessageInboundHandler.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitMessageInboundHandler.java new file mode 100644 index 0000000000..f1c06a815c --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitMessageInboundHandler.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.channel.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.util.CharsetUtil; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +public class TestkitMessageInboundHandler extends SimpleChannelInboundHandler +{ + private final StringBuilder requestBuffer = new StringBuilder(); + + @Override + public void channelRead0( ChannelHandlerContext ctx, ByteBuf byteBuf ) + { + String requestStr = byteBuf.toString( CharsetUtil.UTF_8 ); + requestBuffer.append( requestStr ); + + List testkitMessages = new ArrayList<>(); + Optional testkitMessageOpt = extractTestkitMessage(); + while ( testkitMessageOpt.isPresent() ) + { + testkitMessages.add( testkitMessageOpt.get() ); + testkitMessageOpt = extractTestkitMessage(); + } + + testkitMessages.forEach( ctx::fireChannelRead ); + } + + private Optional extractTestkitMessage() + { + String requestEndMarker = "#request end\n"; + int endMarkerIndex = requestBuffer.indexOf( requestEndMarker ); + if ( endMarkerIndex < 0 ) + { + return Optional.empty(); + } + String requestBeginMarker = "#request begin\n"; + int beginMarkerIndex = requestBuffer.indexOf( requestBeginMarker ); + if ( beginMarkerIndex != 0 ) + { + throw new RuntimeException( "Unexpected data in message buffer" ); + } + // extract Testkit message without markers + String testkitMessage = requestBuffer.substring( requestBeginMarker.length(), endMarkerIndex ); + if ( testkitMessage.contains( requestBeginMarker ) || testkitMessage.contains( requestEndMarker ) ) + { + throw new RuntimeException( "Testkit message contains request markers" ); + } + // remove Testkit message from buffer + requestBuffer.delete( 0, endMarkerIndex + requestEndMarker.length() + 1 ); + return Optional.of( testkitMessage ); + } + + @Override + public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) + { + ctx.close(); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitMessageOutboundHandler.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitMessageOutboundHandler.java new file mode 100644 index 0000000000..905b69f754 --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitMessageOutboundHandler.java @@ -0,0 +1,39 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.channel.handler; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; + +import java.nio.charset.StandardCharsets; + +public class TestkitMessageOutboundHandler extends ChannelOutboundHandlerAdapter +{ + @Override + public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) + { + String testkitResponseStr = (String) msg; + String testkitMessage = String.format( "#response begin\n%s\n#response end\n", testkitResponseStr ); + ByteBuf byteBuf = Unpooled.copiedBuffer( testkitMessage, StandardCharsets.UTF_8 ); + ctx.writeAndFlush( byteBuf, promise ); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java new file mode 100644 index 0000000000..ccac59c4fb --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestProcessorHandler.java @@ -0,0 +1,121 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.channel.handler; + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import neo4j.org.testkit.backend.TestkitState; +import neo4j.org.testkit.backend.messages.requests.TestkitRequest; +import neo4j.org.testkit.backend.messages.responses.BackendError; +import neo4j.org.testkit.backend.messages.responses.DriverError; +import neo4j.org.testkit.backend.messages.responses.TestkitResponse; + +import java.util.concurrent.CompletionException; + +import org.neo4j.driver.exceptions.Neo4jException; +import org.neo4j.driver.exceptions.UntrustedServerException; +import org.neo4j.driver.internal.async.pool.ConnectionPoolImpl; + +public class TestkitRequestProcessorHandler extends ChannelInboundHandlerAdapter +{ + private final TestkitState testkitState = new TestkitState( this::writeAndFlush, () -> true ); + private Channel channel; + + @Override + public void channelRegistered( ChannelHandlerContext ctx ) throws Exception + { + channel = ctx.channel(); + super.channelRegistered( ctx ); + } + + @Override + public void channelRead( ChannelHandlerContext ctx, Object msg ) + { + TestkitRequest testkitRequest = (TestkitRequest) msg; + try + { + testkitRequest.processAsync( testkitState ) + .thenAccept( responseOpt -> responseOpt.ifPresent( ctx::writeAndFlush ) ) + .exceptionally( throwable -> + { + ctx.writeAndFlush( createErrorResponse( throwable ) ); + return null; + } ); + } + catch ( Throwable throwable ) + { + ctx.writeAndFlush( createErrorResponse( throwable ) ); + } + } + + private TestkitResponse createErrorResponse( Throwable throwable ) + { + if ( throwable instanceof CompletionException ) + { + throwable = throwable.getCause(); + } + if ( throwable instanceof Neo4jException ) + { + String id = testkitState.newId(); + Neo4jException e = (Neo4jException) throwable; + testkitState.getErrors().put( id, e ); + return DriverError.builder() + .data( DriverError.DriverErrorBody.builder() + .id( id ) + .errorType( e.getClass().getName() ) + .code( e.code() ) + .msg( e.getMessage() ) + .build() ) + .build(); + } + else if ( isConnectionPoolClosedException( throwable ) || throwable instanceof UntrustedServerException ) + { + String id = testkitState.newId(); + return DriverError.builder() + .data( + DriverError.DriverErrorBody.builder() + .id( id ) + .errorType( throwable.getClass().getName() ) + .msg( throwable.getMessage() ) + .build() + ) + .build(); + } + else + { + return BackendError.builder().data( BackendError.BackendErrorBody.builder().msg( throwable.toString() ).build() ).build(); + } + } + + private boolean isConnectionPoolClosedException( Throwable throwable ) + { + return throwable instanceof IllegalStateException && throwable.getMessage() != null && + throwable.getMessage().equals( ConnectionPoolImpl.CONNECTION_POOL_CLOSED_ERROR_MESSAGE ); + } + + private void writeAndFlush( TestkitResponse response ) + { + if ( channel == null ) + { + throw new IllegalStateException( "Called before channel is initialized" ); + } + channel.writeAndFlush( response ); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestResponseMapperHandler.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestResponseMapperHandler.java new file mode 100644 index 0000000000..f627bc546b --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/channel/handler/TestkitRequestResponseMapperHandler.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.channel.handler; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import neo4j.org.testkit.backend.messages.TestkitModule; +import neo4j.org.testkit.backend.messages.requests.TestkitRequest; +import neo4j.org.testkit.backend.messages.responses.TestkitResponse; + +public class TestkitRequestResponseMapperHandler extends ChannelDuplexHandler +{ + private final ObjectMapper objectMapper; + + public TestkitRequestResponseMapperHandler() + { + objectMapper = new ObjectMapper(); + TestkitModule testkitModule = new TestkitModule(); + this.objectMapper.registerModule( testkitModule ); + this.objectMapper.disable( DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES ); + } + + @Override + public void channelRead( ChannelHandlerContext ctx, Object msg ) + { + String testkitMessage = (String) msg; + TestkitRequest testkitRequest; + try + { + testkitRequest = objectMapper.readValue( testkitMessage, TestkitRequest.class ); + } + catch ( JsonProcessingException e ) + { + throw new RuntimeException( "Failed to deserialize Testkit message", e ); + } + ctx.fireChannelRead( testkitRequest ); + } + + @Override + public void write( ChannelHandlerContext ctx, Object msg, ChannelPromise promise ) throws Exception + { + TestkitResponse testkitResponse = (TestkitResponse) msg; + String responseStr = objectMapper.writeValueAsString( testkitResponse ); + ctx.writeAndFlush( responseStr, promise ); + } +} diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java index e16baa9523..9e75b62845 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/CheckMultiDBSupport.java @@ -25,6 +25,9 @@ import neo4j.org.testkit.backend.messages.responses.MultiDBSupport; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.Optional; +import java.util.concurrent.CompletionStage; + @Setter @Getter @NoArgsConstructor @@ -37,7 +40,25 @@ public TestkitResponse process( TestkitState testkitState ) { String driverId = data.getDriverId(); boolean available = testkitState.getDrivers().get( driverId ).supportsMultiDb(); - return MultiDBSupport.builder().data( MultiDBSupport.MultiDBSupportBody.builder().available( available ).build() ).build(); + return createResponse( available ); + } + + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return testkitState.getDrivers().get( data.getDriverId() ) + .supportsMultiDbAsync() + .thenApply( this::createResponse ) + .thenApply( Optional::of ); + } + + private MultiDBSupport createResponse( boolean available ) + { + return MultiDBSupport.builder() + .data( MultiDBSupport.MultiDBSupportBody.builder() + .available( available ) + .build() ) + .build(); } @Setter diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DomainNameResolutionCompleted.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DomainNameResolutionCompleted.java index b5605a64d4..cde9a111b3 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DomainNameResolutionCompleted.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DomainNameResolutionCompleted.java @@ -27,6 +27,8 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletionStage; @Setter @Getter @@ -58,6 +60,12 @@ public TestkitResponse process( TestkitState testkitState ) return null; } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + throw new UnsupportedOperationException(); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java index f4906e98c3..16a4a0dfe4 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/DriverClose.java @@ -25,6 +25,9 @@ import neo4j.org.testkit.backend.messages.responses.Driver; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.Optional; +import java.util.concurrent.CompletionStage; + @Setter @Getter @NoArgsConstructor @@ -36,6 +39,20 @@ public class DriverClose implements TestkitRequest public TestkitResponse process( TestkitState testkitState ) { testkitState.getDrivers().get( data.getDriverId() ).close(); + return createResponse(); + } + + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return testkitState.getDrivers().get( data.getDriverId() ) + .closeAsync() + .thenApply( ignored -> createResponse() ) + .thenApply( Optional::of ); + } + + private Driver createResponse() + { return Driver.builder().data( Driver.DriverBody.builder().id( data.getDriverId() ).build() ).build(); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java index 211f07f665..c25313fb22 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetFeatures.java @@ -26,26 +26,46 @@ import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; @Setter @Getter @NoArgsConstructor public class GetFeatures implements TestkitRequest { - private static final Set FEATURES = new HashSet<>( Arrays.asList( + private static final Set COMMON_FEATURES = new HashSet<>( Arrays.asList( "AuthorizationExpiredTreatment", "Optimization:PullPipelining", "ConfHint:connection.recv_timeout_seconds", - "Temporary:TransactionClose", "Temporary:DriverFetchSize", "Temporary:DriverMaxTxRetryTime" ) ); + private static final Set SYNC_FEATURES = new HashSet<>( Collections.singletonList( + "Temporary:TransactionClose" + ) ); + @Override public TestkitResponse process( TestkitState testkitState ) { - return FeatureList.builder().data( FeatureList.FeatureListBody.builder().features( FEATURES ).build() ).build(); + Set features = new HashSet<>( COMMON_FEATURES ); + features.addAll( SYNC_FEATURES ); + return createResponse( features ); + } + + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return CompletableFuture.completedFuture( Optional.of( createResponse( COMMON_FEATURES ) ) ); + } + + private FeatureList createResponse( Set features ) + { + return FeatureList.builder().data( FeatureList.FeatureListBody.builder().features( features ).build() ).build(); } } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java index 64457bf42f..9327a53895 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/GetRoutingTable.java @@ -27,6 +27,9 @@ import java.util.Arrays; import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.function.Function; import java.util.stream.Collectors; @@ -75,6 +78,12 @@ public TestkitResponse process( TestkitState testkitState ) ).build(); } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return CompletableFuture.completedFuture( Optional.of( process( testkitState ) ) ); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java index 14bca56974..86ab421d19 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewDriver.java @@ -32,6 +32,8 @@ import java.net.URI; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import org.neo4j.driver.AuthToken; @@ -106,6 +108,12 @@ public TestkitResponse process( TestkitState testkitState ) return Driver.builder().data( Driver.DriverBody.builder().id( id ).build() ).build(); } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return CompletableFuture.completedFuture( Optional.of( process( testkitState ) ) ); + } + private ServerAddressResolver callbackResolver( TestkitState testkitState ) { return address -> diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java index b9bcd26874..4b4089f1e9 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/NewSession.java @@ -21,13 +21,18 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import neo4j.org.testkit.backend.AsyncSessionState; import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.Session; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import java.util.List; +import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.function.BiFunction; import java.util.stream.Collectors; import org.neo4j.driver.AccessMode; @@ -44,6 +49,19 @@ public class NewSession implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) + { + return createSessionStateAndResponse( testkitState, this::createSessionState, testkitState.getSessionStates() ); + } + + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return CompletableFuture.completedFuture( + Optional.of( createSessionStateAndResponse( testkitState, this::createAsyncSessionState, testkitState.getAsyncSessionStates() ) ) ); + } + + private TestkitResponse createSessionStateAndResponse( TestkitState testkitState, BiFunction sessionStateProducer, + Map sessionStateContainer ) { Driver driver = testkitState.getDrivers().get( data.getDriverId() ); AccessMode formattedAccessMode = data.getAccessMode().equals( "r" ) ? AccessMode.READ : AccessMode.WRITE; @@ -53,7 +71,7 @@ public TestkitResponse process( TestkitState testkitState ) Optional.ofNullable( data.bookmarks ) .map( bookmarks -> bookmarks.stream().map( InternalBookmark::parse ).collect( Collectors.toList() ) ) .ifPresent( builder::withBookmarks ); - + Optional.ofNullable( data.database ).ifPresent( builder::withDatabase ); if ( data.getFetchSize() != 0 ) @@ -61,13 +79,23 @@ public TestkitResponse process( TestkitState testkitState ) builder.withFetchSize( data.getFetchSize() ); } - org.neo4j.driver.Session session = driver.session( builder.build() ); String newId = testkitState.newId(); - testkitState.getSessionStates().put( newId, new SessionState( session ) ); + T sessionState = sessionStateProducer.apply( driver, builder.build() ); + sessionStateContainer.put( newId, sessionState ); return Session.builder().data( Session.SessionBody.builder().id( newId ).build() ).build(); } + private SessionState createSessionState( Driver driver, SessionConfig sessionConfig ) + { + return new SessionState( driver.session( sessionConfig ) ); + } + + private AsyncSessionState createAsyncSessionState( Driver driver, SessionConfig sessionConfig ) + { + return new AsyncSessionState( driver.asyncSession( sessionConfig ) ); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResolverResolutionCompleted.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResolverResolutionCompleted.java index 9408cea5f4..86810f63f2 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResolverResolutionCompleted.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResolverResolutionCompleted.java @@ -26,6 +26,8 @@ import java.util.LinkedHashSet; import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletionStage; import java.util.stream.Collectors; import org.neo4j.driver.internal.BoltServerAddress; @@ -45,6 +47,12 @@ public TestkitResponse process( TestkitState testkitState ) return null; } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + throw new UnsupportedOperationException(); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java index 6722ebd5ab..fe672b7d4c 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultConsume.java @@ -26,6 +26,9 @@ import neo4j.org.testkit.backend.messages.responses.Summary; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.Optional; +import java.util.concurrent.CompletionStage; + import org.neo4j.driver.Result; import org.neo4j.driver.exceptions.NoSuchRecordException; @@ -42,17 +45,7 @@ public TestkitResponse process( TestkitState testkitState ) try { Result result = testkitState.getResults().get( data.getResultId() ); - org.neo4j.driver.summary.ResultSummary summary = result.consume(); - Summary.ServerInfo serverInfo = Summary.ServerInfo.builder() - .protocolVersion( summary.server().protocolVersion() ) - .agent( summary.server().agent() ) - .build(); - Summary.SummaryBody data = Summary.SummaryBody.builder() - .serverInfo( serverInfo ) - .build(); - return Summary.builder() - .data( data ) - .build(); + return createResponse( result.consume() ); } catch ( NoSuchRecordException ignored ) { @@ -60,6 +53,29 @@ public TestkitResponse process( TestkitState testkitState ) } } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return testkitState.getResultCursors().get( data.getResultId() ) + .consumeAsync() + .thenApply( this::createResponse ) + .thenApply( Optional::of ); + } + + private Summary createResponse( org.neo4j.driver.summary.ResultSummary summary ) + { + Summary.ServerInfo serverInfo = Summary.ServerInfo.builder() + .protocolVersion( summary.server().protocolVersion() ) + .agent( summary.server().agent() ) + .build(); + Summary.SummaryBody data = Summary.SummaryBody.builder() + .serverInfo( serverInfo ) + .build(); + return Summary.builder() + .data( data ) + .build(); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java index ef35fd99f7..cbdaae3070 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/ResultNext.java @@ -26,6 +26,9 @@ import neo4j.org.testkit.backend.messages.responses.Record; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.Optional; +import java.util.concurrent.CompletionStage; + import org.neo4j.driver.Result; import org.neo4j.driver.exceptions.NoSuchRecordException; @@ -42,8 +45,7 @@ public TestkitResponse process( TestkitState testkitState ) try { Result result = testkitState.getResults().get( data.getResultId() ); - org.neo4j.driver.Record record = result.next(); - return Record.builder().data( Record.RecordBody.builder().values( record ).build() ).build(); + return createResponse( result.next() ); } catch ( NoSuchRecordException ignored ) { @@ -51,6 +53,20 @@ public TestkitResponse process( TestkitState testkitState ) } } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return testkitState.getResultCursors().get( data.getResultId() ) + .nextAsync() + .thenApply( record -> record != null ? createResponse( record ) : NullRecord.builder().build() ) + .thenApply( Optional::of ); + } + + private Record createResponse( org.neo4j.driver.Record record ) + { + return Record.builder().data( Record.RecordBody.builder().values( record ).build() ).build(); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java index f1b3d692c9..e7c63c6d37 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryableNegative.java @@ -21,10 +21,15 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import neo4j.org.testkit.backend.AsyncSessionState; import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + @Setter @Getter @NoArgsConstructor @@ -45,6 +50,23 @@ public TestkitResponse process( TestkitState testkitState ) return null; } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + AsyncSessionState sessionState = testkitState.getAsyncSessionStates().get( data.getSessionId() ); + Throwable throwable; + if ( !"".equals( data.getErrorId() ) ) + { + throwable = testkitState.getErrors().get( data.getErrorId() ); + } + else + { + throwable = new RuntimeException( "Error from client in retryable tx" ); + } + sessionState.getTxWorkFuture().completeExceptionally( throwable ); + return CompletableFuture.completedFuture( Optional.empty() ); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java index d56f8c21f5..474720251b 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/RetryablePositive.java @@ -25,6 +25,10 @@ import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + @Setter @Getter @NoArgsConstructor @@ -44,6 +48,13 @@ public TestkitResponse process( TestkitState testkitState ) return null; } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + testkitState.getAsyncSessionStates().get( data.getSessionId() ).getTxWorkFuture().complete( null ); + return CompletableFuture.completedFuture( Optional.empty() ); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java index 93d0811a1d..66227f2b6f 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionBeginTransaction.java @@ -21,6 +21,7 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import neo4j.org.testkit.backend.AsyncSessionState; import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; @@ -29,8 +30,10 @@ import java.time.Duration; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletionStage; import org.neo4j.driver.TransactionConfig; +import org.neo4j.driver.async.AsyncSession; @Setter @Getter @@ -62,6 +65,36 @@ public TestkitResponse process( TestkitState testkitState ) .orElseThrow( () -> new RuntimeException( "Could not find session" ) ); } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + AsyncSessionState sessionState = testkitState.getAsyncSessionStates().get( data.getSessionId() ); + if ( sessionState != null ) + { + AsyncSession session = sessionState.getSession(); + TransactionConfig.Builder builder = TransactionConfig.builder(); + Optional.ofNullable( data.txMeta ).ifPresent( builder::withMetadata ); + + if ( data.getTimeout() != null ) + { + builder.withTimeout( Duration.ofMillis( data.getTimeout() ) ); + } + + String txId = testkitState.newId(); + return session.beginTransactionAsync( builder.build() ) + .thenApply( tx -> + { + testkitState.getAsyncTransactions().put( txId, tx ); + return transaction( txId ); + } ) + .thenApply( Optional::of ); + } + else + { + return null; + } + } + private Transaction transaction( String txId ) { return Transaction.builder().data( Transaction.TransactionBody.builder().id( txId ).build() ).build(); diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java index ec117640ea..ba96189e05 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionClose.java @@ -25,6 +25,9 @@ import neo4j.org.testkit.backend.messages.responses.Session; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.Optional; +import java.util.concurrent.CompletionStage; + @Setter @Getter @NoArgsConstructor @@ -36,6 +39,20 @@ public class SessionClose implements TestkitRequest public TestkitResponse process( TestkitState testkitState ) { testkitState.getSessionStates().get( data.getSessionId() ).getSession().close(); + return createResponse(); + } + + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return testkitState.getAsyncSessionStates().get( data.getSessionId() ).getSession() + .closeAsync() + .thenApply( ignored -> createResponse() ) + .thenApply( Optional::of ); + } + + private Session createResponse() + { return Session.builder().data( Session.SessionBody.builder().id( data.getSessionId() ).build() ).build(); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java index 51666cc46f..0a69142894 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionLastBookmarks.java @@ -27,6 +27,8 @@ import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import org.neo4j.driver.Bookmark; @@ -45,11 +47,23 @@ public TestkitResponse process( TestkitState testkitState ) .map( session -> { Bookmark bookmark = testkitState.getSessionStates().get( data.getSessionId() ).getSession().lastBookmark(); - return Bookmarks.builder().data( Bookmarks.BookmarksBody.builder().bookmarks( bookmark ).build() ).build(); + return createResponse( bookmark ); } ) .orElseThrow( () -> new RuntimeException( "Could not find session" ) ); } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + Bookmark bookmark = testkitState.getAsyncSessionStates().get( data.getSessionId() ).getSession().lastBookmark(); + return CompletableFuture.completedFuture( Optional.of( createResponse( bookmark ) ) ); + } + + private Bookmarks createResponse( Bookmark bookmark ) + { + return Bookmarks.builder().data( Bookmarks.BookmarksBody.builder().bookmarks( bookmark ).build() ).build(); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java index b57ff63bcf..605fe247e3 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionReadTransaction.java @@ -21,6 +21,7 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import neo4j.org.testkit.backend.AsyncSessionState; import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.RetryableDone; @@ -28,9 +29,13 @@ import neo4j.org.testkit.backend.messages.responses.TestkitResponse; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import org.neo4j.driver.Session; import org.neo4j.driver.TransactionWork; +import org.neo4j.driver.async.AsyncSession; +import org.neo4j.driver.async.AsyncTransactionWork; @Setter @Getter @@ -42,7 +47,7 @@ public class SessionReadTransaction implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return Optional.ofNullable( testkitState.getSessionStates().getOrDefault( data.sessionId, null ) ) + return Optional.ofNullable( testkitState.getSessionStates().getOrDefault( data.getSessionId(), null ) ) .map( sessionState -> { Session session = sessionState.getSession(); @@ -51,6 +56,27 @@ public TestkitResponse process( TestkitState testkitState ) } ).orElseThrow( () -> new RuntimeException( "Could not find session" ) ); } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + AsyncSessionState sessionState = testkitState.getAsyncSessionStates().get( data.getSessionId() ); + AsyncSession session = sessionState.getSession(); + + AsyncTransactionWork> workWrapper = tx -> + { + String txId = testkitState.newId(); + testkitState.getAsyncTransactions().put( txId, tx ); + testkitState.getResponseWriter().accept( retryableTry( txId ) ); + CompletableFuture txWorkFuture = new CompletableFuture<>(); + sessionState.setTxWorkFuture( txWorkFuture ); + return txWorkFuture; + }; + + return session.readTransactionAsync( workWrapper ) + .thenApply( nothing -> retryableDone() ) + .thenApply( Optional::of ); + } + private TransactionWork handle( TestkitState testkitState, SessionState sessionState ) { return tx -> diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java index d119e4c552..2d2f0d0807 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionRun.java @@ -30,10 +30,12 @@ import java.time.Duration; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletionStage; import org.neo4j.driver.Query; import org.neo4j.driver.Session; import org.neo4j.driver.TransactionConfig; +import org.neo4j.driver.async.AsyncSession; @Setter @Getter @@ -59,6 +61,27 @@ public TestkitResponse process( TestkitState testkitState ) return Result.builder().data( Result.ResultBody.builder().id( newId ).build() ).build(); } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + AsyncSession session = testkitState.getAsyncSessionStates().get( data.getSessionId() ).getSession(); + Query query = Optional.ofNullable( data.params ) + .map( params -> new Query( data.cypher, data.params ) ) + .orElseGet( () -> new Query( data.cypher ) ); + TransactionConfig.Builder transactionConfig = TransactionConfig.builder(); + Optional.ofNullable( data.getTxMeta() ).ifPresent( transactionConfig::withMetadata ); + Optional.ofNullable( data.getTimeout() ).ifPresent( to -> transactionConfig.withTimeout( Duration.ofMillis( to ) ) ); + + return session.runAsync( query, transactionConfig.build() ) + .thenApply( resultCursor -> + { + String newId = testkitState.newId(); + testkitState.getResultCursors().put( newId, resultCursor ); + return Result.builder().data( Result.ResultBody.builder().id( newId ).build() ).build(); + } ) + .thenApply( Optional::of ); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java index 64c053591c..077a0076ce 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/SessionWriteTransaction.java @@ -21,6 +21,7 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; +import neo4j.org.testkit.backend.AsyncSessionState; import neo4j.org.testkit.backend.SessionState; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.RetryableDone; @@ -29,9 +30,13 @@ import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import org.neo4j.driver.Session; import org.neo4j.driver.TransactionWork; +import org.neo4j.driver.async.AsyncSession; +import org.neo4j.driver.async.AsyncTransactionWork; @Setter @Getter @@ -52,6 +57,28 @@ public TestkitResponse process( TestkitState testkitState ) } ).orElseThrow( () -> new RuntimeException( "Could not find session" ) ); } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + AsyncSessionState sessionState = testkitState.getAsyncSessionStates().get( data.getSessionId() ); + AsyncSession session = sessionState.getSession(); + + AsyncTransactionWork> workWrapper = + tx -> + { + String txId = testkitState.newId(); + testkitState.getAsyncTransactions().put( txId, tx ); + testkitState.getResponseWriter().accept( retryableTry( txId ) ); + CompletableFuture tryResult = new CompletableFuture<>(); + sessionState.setTxWorkFuture( tryResult ); + return tryResult; + }; + + return session.writeTransactionAsync( workWrapper ) + .thenApply( nothing -> retryableDone() ) + .thenApply( Optional::of ); + } + private TransactionWork handle( TestkitState testkitState, SessionState sessionState ) { return tx -> diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java index 2637c3ec7b..af45836492 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/StartTest.java @@ -23,13 +23,33 @@ import lombok.Setter; import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.RunTest; +import neo4j.org.testkit.backend.messages.responses.SkipTest; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + @Setter @Getter @NoArgsConstructor public class StartTest implements TestkitRequest { + private static final Map ASYNC_SKIP_PATTERN_TO_REASON = new HashMap<>(); + + static + { + ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_fail_when_driver_closed_using_session_run$", "Does not throw error" ); + ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_read_successfully_on_empty_discovery_result_using_session_run$", "Resolver not implemented" ); + ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_request_rt_from_all_initial_routers_until_successful", "Resolver not implemented" ); + ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_revert_to_initial_router_if_known_router_throws_protocol_errors", "Resolver not implemented" ); + ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_successfully_acquire_rt_when_router_ip_changes$", "Resolver not implemented" ); + ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_use_resolver_during_rediscovery_when_existing_routers_fail$", "Resolver not implemented" ); + ASYNC_SKIP_PATTERN_TO_REASON.put( "^.*.test_should_reject_server_using_verify_connectivity_bolt_3x0", "Does not error as expected" ); + } + private StartTestBody data; @Override @@ -38,6 +58,24 @@ public TestkitResponse process( TestkitState testkitState ) return RunTest.builder().build(); } + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + TestkitResponse testkitResponse = ASYNC_SKIP_PATTERN_TO_REASON + .entrySet() + .stream() + .filter( entry -> data.getTestName().matches( entry.getKey() ) ) + .findFirst() + .map( entry -> (TestkitResponse) SkipTest.builder() + .data( SkipTest.SkipTestBody.builder() + .reason( entry.getValue() ) + .build() ) + .build() ) + .orElseGet( () -> RunTest.builder().build() ); + + return CompletableFuture.completedFuture( Optional.of( testkitResponse ) ); + } + @Setter @Getter @NoArgsConstructor diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java index c2a22a7d6e..52cd16590c 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TestkitRequest.java @@ -23,6 +23,9 @@ import neo4j.org.testkit.backend.TestkitState; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.Optional; +import java.util.concurrent.CompletionStage; + @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "name" ) @JsonSubTypes( { @JsonSubTypes.Type( NewDriver.class ), @JsonSubTypes.Type( NewSession.class ), @@ -41,4 +44,6 @@ public interface TestkitRequest { TestkitResponse process( TestkitState testkitState ); + + CompletionStage> processAsync( TestkitState testkitState ); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java index d047e49d19..7bab055f97 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionClose.java @@ -26,6 +26,7 @@ import neo4j.org.testkit.backend.messages.responses.Transaction; import java.util.Optional; +import java.util.concurrent.CompletionStage; @Setter @Getter @@ -37,16 +38,22 @@ public class TransactionClose implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return Optional.ofNullable( testkitState.getTransactions().get( data.txId ) ) + return Optional.ofNullable( testkitState.getTransactions().get( data.getTxId() ) ) .map( tx -> { tx.close(); - return transaction( data.txId ); + return createResponse( data.getTxId() ); } ) .orElseThrow( () -> new RuntimeException( "Could not find transaction" ) ); } - private Transaction transaction( String txId ) + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + throw new UnsupportedOperationException(); + } + + private Transaction createResponse( String txId ) { return Transaction.builder().data( Transaction.TransactionBody.builder().id( txId ).build() ).build(); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java index b1f825fc66..ebe5c18c6a 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionCommit.java @@ -26,6 +26,7 @@ import neo4j.org.testkit.backend.messages.responses.Transaction; import java.util.Optional; +import java.util.concurrent.CompletionStage; @Getter @NoArgsConstructor @@ -37,16 +38,25 @@ public class TransactionCommit implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return Optional.ofNullable( testkitState.getTransactions().get( data.txId ) ) + return Optional.ofNullable( testkitState.getTransactions().get( data.getTxId() ) ) .map( tx -> { tx.commit(); - return transaction( data.txId ); + return createResponse( data.getTxId() ); } ) .orElseThrow( () -> new RuntimeException( "Could not find transaction" ) ); } - private Transaction transaction( String txId ) + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return testkitState.getAsyncTransactions().get( data.getTxId() ) + .commitAsync() + .thenApply( ignored -> createResponse( data.getTxId() ) ) + .thenApply( Optional::of ); + } + + private Transaction createResponse( String txId ) { return Transaction.builder().data( Transaction.TransactionBody.builder().id( txId ).build() ).build(); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java index 148a515886..75e24e2b57 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRollback.java @@ -26,6 +26,7 @@ import neo4j.org.testkit.backend.messages.responses.Transaction; import java.util.Optional; +import java.util.concurrent.CompletionStage; @Getter @NoArgsConstructor @@ -37,16 +38,25 @@ public class TransactionRollback implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return Optional.ofNullable( testkitState.getTransactions().get( data.txId ) ) + return Optional.ofNullable( testkitState.getTransactions().get( data.getTxId() ) ) .map( tx -> { tx.rollback(); - return transaction( data.txId ); + return createResponse( data.getTxId() ); } ) .orElseThrow( () -> new RuntimeException( "Could not find transaction" ) ); } - private Transaction transaction( String txId ) + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return testkitState.getAsyncTransactions().get( data.getTxId() ) + .rollbackAsync() + .thenApply( ignored -> createResponse( data.getTxId() ) ) + .thenApply( Optional::of ); + } + + private Transaction createResponse( String txId ) { return Transaction.builder().data( Transaction.TransactionBody.builder().id( txId ).build() ).build(); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java index 7d797147d2..2e993e320f 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/TransactionRun.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CompletionStage; @Setter @Getter @@ -41,19 +42,33 @@ public class TransactionRun implements TestkitRequest @Override public TestkitResponse process( TestkitState testkitState ) { - return Optional.ofNullable( testkitState.getTransactions().get( data.txId ) ) + return Optional.ofNullable( testkitState.getTransactions().get( data.getTxId() ) ) .map( tx -> - tx.run( data.cypher, data.getParams() != null ? data.getParams() : Collections.emptyMap() ) ) + tx.run( data.getCypher(), data.getParams() != null ? data.getParams() : Collections.emptyMap() ) ) .map( result -> { String resultId = testkitState.newId(); testkitState.getResults().put( resultId, result ); - return result( resultId ); + return createResponse( resultId ); } ) .orElseThrow( () -> new RuntimeException( "Could not find transaction" ) ); } - private Result result( String resultId ) + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + return testkitState.getAsyncTransactions().get( data.getTxId() ) + .runAsync( data.getCypher(), data.getParams() != null ? data.getParams() : Collections.emptyMap() ) + .thenApply( resultCursor -> + { + String resultId = testkitState.newId(); + testkitState.getResultCursors().put( resultId, resultCursor ); + return createResponse( resultId ); + } ) + .thenApply( Optional::of ); + } + + private Result createResponse( String resultId ) { return Result.builder().data( Result.ResultBody.builder().id( resultId ).build() ).build(); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java index e985452acc..acd3907aa4 100644 --- a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/requests/VerifyConnectivity.java @@ -25,6 +25,9 @@ import neo4j.org.testkit.backend.messages.responses.Driver; import neo4j.org.testkit.backend.messages.responses.TestkitResponse; +import java.util.Optional; +import java.util.concurrent.CompletionStage; + @Setter @Getter @NoArgsConstructor @@ -37,6 +40,21 @@ public TestkitResponse process( TestkitState testkitState ) { String id = data.getDriverId(); testkitState.getDrivers().get( id ).verifyConnectivity(); + return createResponse( id ); + } + + @Override + public CompletionStage> processAsync( TestkitState testkitState ) + { + String id = data.getDriverId(); + return testkitState.getDrivers().get( id ) + .verifyConnectivityAsync() + .thenApply( ignored -> createResponse( id ) ) + .thenApply( Optional::of ); + } + + private Driver createResponse( String id ) + { return Driver.builder().data( Driver.DriverBody.builder().id( id ).build() ).build(); } diff --git a/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/SkipTest.java b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/SkipTest.java new file mode 100644 index 0000000000..7f2762c66a --- /dev/null +++ b/testkit-backend/src/main/java/neo4j/org/testkit/backend/messages/responses/SkipTest.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) "Neo4j" + * Neo4j Sweden AB [http://neo4j.com] + * + * This file is part of Neo4j. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package neo4j.org.testkit.backend.messages.responses; + +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +@Setter +@Getter +@Builder +public class SkipTest implements TestkitResponse +{ + private SkipTestBody data; + + @Override + public String testkitName() + { + return "SkipTest"; + } + + @Setter + @Getter + @Builder + public static class SkipTestBody + { + private final String reason; + } +} diff --git a/testkit-tests/pom.xml b/testkit-tests/pom.xml index 9c52bfc907..da3c9ae51f 100644 --- a/testkit-tests/pom.xml +++ b/testkit-tests/pom.xml @@ -23,6 +23,7 @@ --tests TESTKIT_TESTS INTEGRATION_TESTS STUB_TESTS STRESS_TESTS TLS_TESTS 7200000 + %a-async 0.36.1 true @@ -46,6 +47,7 @@ + tklnchr testkit-launcher:%v @@ -54,11 +56,11 @@ - %n + %a 0 @@ -109,11 +111,37 @@ start + + + run-testkit-async + integration-test + + + start + + + + + tklnchr + + ${testkit.async.name.pattern} + + ${project.build.directory}/testkit-async + async + + + ${testkit.async.name.pattern}> + + + + + + remove-testkit-launcher post-integration-test - + stop diff --git a/testkit/backend.py b/testkit/backend.py index beeb957468..ccce69d5cf 100644 --- a/testkit/backend.py +++ b/testkit/backend.py @@ -10,5 +10,5 @@ err = open("/artifacts/backenderr.log", "w") out = open("/artifacts/backendout.log", "w") subprocess.check_call( - ["java", "-jar", "testkit-backend/target/testkit-backend.jar"], stdout=out, stderr=err) + ["java", "-jar", "testkit-backend/target/testkit-backend.jar", os.getenv('TEST_BACKEND_SERVER', '')], stdout=out, stderr=err)