Skip to content

Configurable test-on-borrow for pooled connections #297

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 35 additions & 12 deletions driver/src/main/java/org/neo4j/driver/internal/DriverFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.neo4j.driver.v1.Config;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.Logger;
import org.neo4j.driver.v1.Logging;
import org.neo4j.driver.v1.exceptions.ClientException;

import static java.lang.String.format;
Expand Down Expand Up @@ -91,40 +92,62 @@ private Driver createDriver( BoltServerAddress address, String scheme, Connectio
/**
* Creates new {@link DirectDriver}.
* <p>
* <b>This method is package-private only for testing</b>
* <b>This method is protected only for testing</b>
*/
DirectDriver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
SecurityPlan securityPlan, SessionFactory sessionFactory )
protected DirectDriver createDirectDriver( BoltServerAddress address, ConnectionPool connectionPool,
Config config, SecurityPlan securityPlan, SessionFactory sessionFactory )
{
return new DirectDriver( address, connectionPool, securityPlan, sessionFactory, config.logging() );
}

/**
* Creates new {@link RoutingDriver}.
* <p>
* <b>This method is package-private only for testing</b>
* <b>This method is protected only for testing</b>
*/
RoutingDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool, Config config,
RoutingSettings routingSettings, SecurityPlan securityPlan, SessionFactory sessionFactory )
protected RoutingDriver createRoutingDriver( BoltServerAddress address, ConnectionPool connectionPool,
Config config, RoutingSettings routingSettings, SecurityPlan securityPlan, SessionFactory sessionFactory )
{
return new RoutingDriver( routingSettings, address, connectionPool, securityPlan, sessionFactory,
Clock.SYSTEM, config.logging() );
createClock(), config.logging() );
}

/**
* Creates new {@link ConnectionPool}.
* <p>
* <b>This method is package-private only for testing</b>
* <b>This method is protected only for testing</b>
*/
ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Config config )
protected ConnectionPool createConnectionPool( AuthToken authToken, SecurityPlan securityPlan, Config config )
{
authToken = authToken == null ? AuthTokens.none() : authToken;

ConnectionSettings connectionSettings = new ConnectionSettings( authToken, config.connectionTimeoutMillis() );
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize() );
Connector connector = new SocketConnector( connectionSettings, securityPlan, config.logging() );
PoolSettings poolSettings = new PoolSettings( config.maxIdleConnectionPoolSize(),
config.idleTimeBeforeConnectionTest() );
Connector connector = createConnector( connectionSettings, securityPlan, config.logging() );

return new SocketConnectionPool( poolSettings, connector, Clock.SYSTEM, config.logging() );
return new SocketConnectionPool( poolSettings, connector, createClock(), config.logging() );
}

/**
* Creates new {@link Clock}.
* <p>
* <b>This method is protected only for testing</b>
*/
protected Clock createClock()
{
return Clock.SYSTEM;
}

/**
* Creates new {@link Connector}.
* <p>
* <b>This method is protected only for testing</b>
*/
protected Connector createConnector( ConnectionSettings connectionSettings, SecurityPlan securityPlan,
Logging logging )
{
return new SocketConnector( connectionSettings, securityPlan, logging );
}

private static SessionFactory createSessionFactory( Config config )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,37 @@

public class PoolSettings
{
public static final int NO_IDLE_CONNECTION_TEST = -1;

public static final int DEFAULT_MAX_IDLE_CONNECTION_POOL_SIZE = 10;
public static final int DEFAULT_IDLE_TIME_BEFORE_CONNECTION_TEST = NO_IDLE_CONNECTION_TEST;

/**
* Maximum number of idle connections per pool.
*/
private final int maxIdleConnectionPoolSize;
private final long idleTimeBeforeConnectionTest;

public PoolSettings( int maxIdleConnectionPoolSize )
public PoolSettings( int maxIdleConnectionPoolSize, long idleTimeBeforeConnectionTest )
{
this.maxIdleConnectionPoolSize = maxIdleConnectionPoolSize;
this.idleTimeBeforeConnectionTest = idleTimeBeforeConnectionTest;
}

public int maxIdleConnectionPoolSize()
{
return maxIdleConnectionPoolSize;
}

public long idleTimeBeforeConnectionTest()
{
if ( !idleTimeBeforeConnectionTestConfigured() )
{
throw new IllegalStateException(
"Idle time before connection test is not configured: " + idleTimeBeforeConnectionTest );
}
return idleTimeBeforeConnectionTest;
}

public boolean idleTimeBeforeConnectionTestConfigured()
{
return idleTimeBeforeConnectionTest >= 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,14 @@ public class PooledConnection implements Connection
private boolean unrecoverableErrorsOccurred = false;
private Runnable onError = null;
private final Clock clock;
private long lastUsed;
private long lastUsedTimestamp;

public PooledConnection( Connection delegate, Consumer<PooledConnection> release, Clock clock )
{
this.delegate = delegate;
this.release = release;
this.clock = clock;
this.lastUsed = clock.millis();
}

public void updateTimestamp()
{
lastUsed = clock.millis();
updateLastUsedTimestamp();
}

@Override
Expand Down Expand Up @@ -192,13 +187,14 @@ public void receiveOne()
}
}

@Override
/**
* Make sure only close the connection once on each session to avoid releasing the connection twice, a.k.a.
* adding back the connection twice into the pool.
*/
@Override
public void close()
{
updateLastUsedTimestamp();
release.accept( this );
// put the full logic of deciding whether to dispose the connection or to put it back to
// the pool into the release object
Expand Down Expand Up @@ -286,6 +282,11 @@ public void onError( Runnable runnable )
this.onError = runnable;
}

public long lastUsedTimestamp()
{
return lastUsedTimestamp;
}

private boolean isProtocolViolationError(RuntimeException e )
{
return e instanceof Neo4jException
Expand All @@ -300,8 +301,8 @@ private boolean isClientOrTransientError( RuntimeException e )
|| ((Neo4jException) e).code().contains( "TransientError" ));
}

public long idleTime()
private void updateLastUsedTimestamp()
{
return clock.millis() - lastUsed;
this.lastUsedTimestamp = clock.millis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
*/
package org.neo4j.driver.internal.net.pooling;

import java.util.concurrent.atomic.AtomicBoolean;

import org.neo4j.driver.internal.spi.ConnectionValidator;
import org.neo4j.driver.internal.util.Consumer;
import org.neo4j.driver.v1.util.Function;

/**
* The responsibility of the PooledConnectionReleaseConsumer is to release valid connections
Expand All @@ -30,19 +28,19 @@
class PooledConnectionReleaseConsumer implements Consumer<PooledConnection>
{
private final BlockingPooledConnectionQueue connections;
private final Function<PooledConnection, Boolean> validConnection;
private final ConnectionValidator<PooledConnection> connectionValidator;

PooledConnectionReleaseConsumer( BlockingPooledConnectionQueue connections,
Function<PooledConnection, Boolean> validConnection)
ConnectionValidator<PooledConnection> connectionValidator )
{
this.connections = connections;
this.validConnection = validConnection;
this.connectionValidator = connectionValidator;
}

@Override
public void accept( PooledConnection pooledConnection )
{
if ( validConnection.apply( pooledConnection ) )
if ( connectionValidator.isReusable( pooledConnection ) )
{
connections.offer( pooledConnection );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.neo4j.driver.internal.net.pooling;

import org.neo4j.driver.internal.spi.ConnectionPool;
import org.neo4j.driver.v1.util.Function;
import org.neo4j.driver.internal.spi.ConnectionValidator;

class PooledConnectionValidator implements Function<PooledConnection,Boolean>
class PooledConnectionValidator implements ConnectionValidator<PooledConnection>
{
private final ConnectionPool pool;

Expand All @@ -31,28 +31,25 @@ class PooledConnectionValidator implements Function<PooledConnection,Boolean>
}

@Override
public Boolean apply( PooledConnection pooledConnection )
public boolean isReusable( PooledConnection pooledConnection )
{
// once the pooledConn has marked to have unrecoverable errors, there is no way to remove the error
// and we should close the conn without bothering to reset the conn at all
return pool.hasAddress( pooledConnection.boltServerAddress() ) &&
!pooledConnection.hasUnrecoverableErrors() &&
reset( pooledConnection );
isConnected( pooledConnection );
}

/**
* In case this session has an open result or transaction or something,
* make sure it's reset to a nice state before we reuse it.
*
* @param conn the PooledConnection
* @return true if the connection is reset successfully without any error, otherwise false.
*/
private static boolean reset( PooledConnection conn )
@Override
public boolean isConnected( PooledConnection connection )
{
try
{
conn.reset();
conn.sync();
// try to use this connection for RESET message
// in case this session has an open result or transaction or something,
// make sure it's reset to a nice state before we reuse it.
connection.reset();
connection.sync();
return true;
}
catch ( Throwable e )
Expand Down
Loading