Skip to content

Commit e200e51

Browse files
committed
Merge pull request #195 from datastax/253-tests
CPP-253 Ensure entire pool is not removed with partial connections
2 parents 0f76a78 + 25f0107 commit e200e51

File tree

2 files changed

+105
-4
lines changed

2 files changed

+105
-4
lines changed

test/integration_tests/src/test_outage.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,6 @@ struct OutageTests : public test_utils::MultipleNodesTest {
6262
test_utils::CassLog::set_output_log_level(CASS_LOG_DISABLED);
6363
printf("Warning! This test is going to take %d minutes\n", TEST_DURATION_SECS / 60);
6464
std::fill(nodes_states, nodes_states + NUM_NODES, UP);
65-
// TODO(mpenick): This is a stopgap. To be fixed in CPP-140
66-
#if !defined(WIN32) && !defined(_WIN32)
67-
signal(SIGPIPE, SIG_IGN);
68-
#endif
6965
}
7066

7167
int random_int(int s, int e) {

test/integration_tests/src/test_pool.cpp

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,34 @@
2222
#include "test_utils.hpp"
2323
#include "cluster.hpp"
2424

25+
#include <uv.h>
26+
2527
#include <boost/scoped_ptr.hpp>
28+
#include <boost/thread.hpp>
2629
#include <boost/test/unit_test.hpp>
2730

2831
struct TestPool : public test_utils::MultipleNodesTest {
2932
TestPool()
3033
: MultipleNodesTest(1, 0) {}
34+
35+
/**
36+
* Execute a select statement against the system tables for a specified amount
37+
* of time.
38+
*
39+
* NOTE: Results and errors are ignored
40+
*
41+
* @param duration Duration in seconds to execute queries
42+
* @param session Session instance
43+
*/
44+
void execute_system_query(int duration, test_utils::CassSessionPtr session) {
45+
boost::posix_time::ptime start = boost::posix_time::second_clock::universal_time();
46+
while ((boost::posix_time::second_clock::universal_time() - start).total_seconds() < duration) {
47+
test_utils::CassStatementPtr statement(cass_statement_new("SELECT * FROM system.local", 0));
48+
cass_statement_set_consistency(statement.get(), CASS_CONSISTENCY_ONE);
49+
test_utils::CassFuturePtr future(cass_session_execute(session.get(), statement.get()));
50+
cass_future_wait_timed(future.get(), test_utils::ONE_SECOND_IN_MICROS);
51+
}
52+
}
3153
};
3254

3355
BOOST_FIXTURE_TEST_SUITE(pool, TestPool)
@@ -125,4 +147,87 @@ BOOST_AUTO_TEST_CASE(connection_spawn)
125147
}
126148
BOOST_CHECK_EQUAL(test_utils::CassLog::message_count(), 2u);
127149
}
150+
151+
/**
152+
* Data for performing the connection interruption
153+
*/
154+
struct ConnectionInterruptionData {
155+
cql::cql_ccm_bridge_t* ccm_ptr;
156+
int node;
157+
int duration;
158+
int delay;
159+
};
160+
161+
/**
162+
* Create connection interruptions using CCM
163+
*
164+
* @param data Connection interruption data structure
165+
*/
166+
static void connection_interruptions(void *data) {
167+
boost::posix_time::ptime start = boost::posix_time::second_clock::universal_time();
168+
ConnectionInterruptionData* ci_data = static_cast<ConnectionInterruptionData*>(data);
169+
while ((boost::posix_time::second_clock::universal_time() - start).total_seconds() < ci_data->duration) {
170+
ci_data->ccm_ptr->pause(ci_data->node);
171+
boost::this_thread::sleep(boost::posix_time::seconds(ci_data->delay));
172+
ci_data->ccm_ptr->resume(ci_data->node);
173+
}
174+
}
175+
176+
/**
177+
* Don't Recycle Pool On Connection Timeout
178+
*
179+
* This test ensures that a pool does not completely remove itself while
180+
* allowing partial connections to remain and reconnection attempts to use the
181+
* existing pool.
182+
*
183+
* @since 2.1.0
184+
* @test_category connection:connection_pool
185+
* @jira_ticket CPP-253 [https://datastax-oss.atlassian.net/browse/CPP-253]
186+
*/
187+
BOOST_AUTO_TEST_CASE(dont_recycle_pool_on_timeout) {
188+
// Add a second node
189+
ccm->bootstrap(2);
190+
191+
// Create the connection interruption data
192+
ConnectionInterruptionData ci_data = { ccm.get(), 2, 0, 0 };
193+
194+
test_utils::initialize_contact_points(cluster, conf.ip_prefix(), 2, 0);
195+
cass_cluster_set_connect_timeout(cluster, 1000);
196+
cass_cluster_set_num_threads_io(cluster, 32);
197+
cass_cluster_set_core_connections_per_host(cluster, 4);
198+
cass_cluster_set_load_balance_round_robin(cluster);
199+
200+
// Create session during "connection interruptions"
201+
test_utils::CassLog::reset("Host " + conf.ip_prefix() + "2 already present attempting to initiate immediate connection");
202+
{
203+
uv_thread_t connection_interruptions_thread;
204+
ci_data.duration = 5;
205+
uv_thread_create(&connection_interruptions_thread, connection_interruptions, &ci_data);
206+
test_utils::CassSessionPtr session(test_utils::create_session(cluster));
207+
uv_thread_join(&connection_interruptions_thread);
208+
boost::posix_time::ptime start = boost::posix_time::second_clock::universal_time();
209+
execute_system_query(60, session);
210+
}
211+
BOOST_CHECK_GE(test_utils::CassLog::message_count(), 1);
212+
213+
// Handle partial reconnects
214+
cass_cluster_set_connection_idle_timeout(cluster, 1);
215+
cass_cluster_set_connection_heartbeat_interval(cluster, 2);
216+
test_utils::CassLog::reset("already present attempting to initiate immediate connection");
217+
{
218+
// Create the session ignore all connection errors
219+
test_utils::CassSessionPtr session(cass_session_new());
220+
test_utils::CassFuturePtr future(cass_session_connect(session.get(), cluster));
221+
cass_future_wait_timed(future.get(), test_utils::ONE_SECOND_IN_MICROS);
222+
boost::posix_time::ptime start = boost::posix_time::second_clock::universal_time();
223+
uv_thread_t connection_interruptions_thread;
224+
ci_data.delay = 5;
225+
ci_data.duration = 45;
226+
uv_thread_create(&connection_interruptions_thread, connection_interruptions, &ci_data);
227+
execute_system_query(60, session);
228+
uv_thread_join(&connection_interruptions_thread);
229+
}
230+
BOOST_CHECK_GE(test_utils::CassLog::message_count(), 1);
231+
}
232+
128233
BOOST_AUTO_TEST_SUITE_END()

0 commit comments

Comments
 (0)