diff --git a/test/integration_tests/src/test_outage.cpp b/test/integration_tests/src/test_outage.cpp index 067a64449..df4ac24c3 100644 --- a/test/integration_tests/src/test_outage.cpp +++ b/test/integration_tests/src/test_outage.cpp @@ -62,10 +62,6 @@ struct OutageTests : public test_utils::MultipleNodesTest { test_utils::CassLog::set_output_log_level(CASS_LOG_DISABLED); printf("Warning! This test is going to take %d minutes\n", TEST_DURATION_SECS / 60); std::fill(nodes_states, nodes_states + NUM_NODES, UP); - // TODO(mpenick): This is a stopgap. To be fixed in CPP-140 -#if !defined(WIN32) && !defined(_WIN32) - signal(SIGPIPE, SIG_IGN); -#endif } int random_int(int s, int e) { diff --git a/test/integration_tests/src/test_pool.cpp b/test/integration_tests/src/test_pool.cpp index 7aebf63d2..a3d8bf331 100644 --- a/test/integration_tests/src/test_pool.cpp +++ b/test/integration_tests/src/test_pool.cpp @@ -22,12 +22,34 @@ #include "test_utils.hpp" #include "cluster.hpp" +#include + #include +#include #include struct TestPool : public test_utils::MultipleNodesTest { TestPool() : MultipleNodesTest(1, 0) {} + + /** + * Execute a select statement against the system tables for a specified amount + * of time. + * + * NOTE: Results and errors are ignored + * + * @param duration Duration in seconds to execute queries + * @param session Session instance + */ + void execute_system_query(int duration, test_utils::CassSessionPtr session) { + boost::posix_time::ptime start = boost::posix_time::second_clock::universal_time(); + while ((boost::posix_time::second_clock::universal_time() - start).total_seconds() < duration) { + test_utils::CassStatementPtr statement(cass_statement_new("SELECT * FROM system.local", 0)); + cass_statement_set_consistency(statement.get(), CASS_CONSISTENCY_ONE); + test_utils::CassFuturePtr future(cass_session_execute(session.get(), statement.get())); + cass_future_wait_timed(future.get(), test_utils::ONE_SECOND_IN_MICROS); + } + } }; BOOST_FIXTURE_TEST_SUITE(pool, TestPool) @@ -125,4 +147,87 @@ BOOST_AUTO_TEST_CASE(connection_spawn) } BOOST_CHECK_EQUAL(test_utils::CassLog::message_count(), 2u); } + +/** + * Data for performing the connection interruption + */ +struct ConnectionInterruptionData { + cql::cql_ccm_bridge_t* ccm_ptr; + int node; + int duration; + int delay; +}; + +/** + * Create connection interruptions using CCM + * + * @param data Connection interruption data structure + */ +static void connection_interruptions(void *data) { + boost::posix_time::ptime start = boost::posix_time::second_clock::universal_time(); + ConnectionInterruptionData* ci_data = static_cast(data); + while ((boost::posix_time::second_clock::universal_time() - start).total_seconds() < ci_data->duration) { + ci_data->ccm_ptr->pause(ci_data->node); + boost::this_thread::sleep(boost::posix_time::seconds(ci_data->delay)); + ci_data->ccm_ptr->resume(ci_data->node); + } +} + +/** + * Don't Recycle Pool On Connection Timeout + * + * This test ensures that a pool does not completely remove itself while + * allowing partial connections to remain and reconnection attempts to use the + * existing pool. + * + * @since 2.1.0 + * @test_category connection:connection_pool + * @jira_ticket CPP-253 [https://datastax-oss.atlassian.net/browse/CPP-253] + */ +BOOST_AUTO_TEST_CASE(dont_recycle_pool_on_timeout) { + // Add a second node + ccm->bootstrap(2); + + // Create the connection interruption data + ConnectionInterruptionData ci_data = { ccm.get(), 2, 0, 0 }; + + test_utils::initialize_contact_points(cluster, conf.ip_prefix(), 2, 0); + cass_cluster_set_connect_timeout(cluster, 1000); + cass_cluster_set_num_threads_io(cluster, 32); + cass_cluster_set_core_connections_per_host(cluster, 4); + cass_cluster_set_load_balance_round_robin(cluster); + + // Create session during "connection interruptions" + test_utils::CassLog::reset("Host " + conf.ip_prefix() + "2 already present attempting to initiate immediate connection"); + { + uv_thread_t connection_interruptions_thread; + ci_data.duration = 5; + uv_thread_create(&connection_interruptions_thread, connection_interruptions, &ci_data); + test_utils::CassSessionPtr session(test_utils::create_session(cluster)); + uv_thread_join(&connection_interruptions_thread); + boost::posix_time::ptime start = boost::posix_time::second_clock::universal_time(); + execute_system_query(60, session); + } + BOOST_CHECK_GE(test_utils::CassLog::message_count(), 1); + + // Handle partial reconnects + cass_cluster_set_connection_idle_timeout(cluster, 1); + cass_cluster_set_connection_heartbeat_interval(cluster, 2); + test_utils::CassLog::reset("already present attempting to initiate immediate connection"); + { + // Create the session ignore all connection errors + test_utils::CassSessionPtr session(cass_session_new()); + test_utils::CassFuturePtr future(cass_session_connect(session.get(), cluster)); + cass_future_wait_timed(future.get(), test_utils::ONE_SECOND_IN_MICROS); + boost::posix_time::ptime start = boost::posix_time::second_clock::universal_time(); + uv_thread_t connection_interruptions_thread; + ci_data.delay = 5; + ci_data.duration = 45; + uv_thread_create(&connection_interruptions_thread, connection_interruptions, &ci_data); + execute_system_query(60, session); + uv_thread_join(&connection_interruptions_thread); + } + BOOST_CHECK_GE(test_utils::CassLog::message_count(), 1); +} + BOOST_AUTO_TEST_SUITE_END()