diff --git a/.evergreen/scripts/run-tests.sh b/.evergreen/scripts/run-tests.sh index 4c3c2948e5..a52e8c89ee 100755 --- a/.evergreen/scripts/run-tests.sh +++ b/.evergreen/scripts/run-tests.sh @@ -110,7 +110,12 @@ fi # Sanitizer environment variables. export ASAN_OPTIONS="detect_leaks=1 abort_on_error=1 symbolize=1" -export ASAN_SYMBOLIZER_PATH="/opt/mongodbtoolchain/v3/bin/llvm-symbolizer" +export ASAN_SYMBOLIZER_PATH +if command -v "/opt/mongodbtoolchain/v4/bin/llvm-symbolizer" > /dev/null; then + ASAN_SYMBOLIZER_PATH="/opt/mongodbtoolchain/v4/bin/llvm-symbolizer" +elif command -v "/opt/mongodbtoolchain/v3/bin/llvm-symbolizer" > /dev/null; then + ASAN_SYMBOLIZER_PATH="/opt/mongodbtoolchain/v3/bin/llvm-symbolizer" +fi export TSAN_OPTIONS="suppressions=.tsan-suppressions" ubsan_opts=( diff --git a/src/libmongoc/CMakeLists.txt b/src/libmongoc/CMakeLists.txt index 1df9ec4993..38dc6f3fbe 100644 --- a/src/libmongoc/CMakeLists.txt +++ b/src/libmongoc/CMakeLists.txt @@ -1032,6 +1032,7 @@ set (test-libmongoc-sources ${PROJECT_SOURCE_DIR}/tests/mock_server/mock-server.c ${PROJECT_SOURCE_DIR}/tests/mock_server/request.c ${PROJECT_SOURCE_DIR}/tests/mock_server/sync-queue.c + ${PROJECT_SOURCE_DIR}/tests/stream-tracker.c ${PROJECT_SOURCE_DIR}/tests/test-conveniences.c ${PROJECT_SOURCE_DIR}/tests/test-happy-eyeballs.c ${PROJECT_SOURCE_DIR}/tests/test-libmongoc.c diff --git a/src/libmongoc/src/mongoc/mongoc-client-pool.c b/src/libmongoc/src/mongoc/mongoc-client-pool.c index 93a056b9f4..275747b6a7 100644 --- a/src/libmongoc/src/mongoc/mongoc-client-pool.c +++ b/src/libmongoc/src/mongoc/mongoc-client-pool.c @@ -506,6 +506,9 @@ _mongoc_client_pool_set_stream_initiator(mongoc_client_pool_t *pool, mongoc_stre { BSON_ASSERT_PARAM(pool); + // Do not permit overriding initializer after calls to `mongoc_client_pool_pop`. + BSON_ASSERT(!pool->client_initialized); + mongoc_topology_scanner_set_stream_initiator(pool->topology->scanner, si, context); } diff --git a/src/libmongoc/tests/TestSuite.c b/src/libmongoc/tests/TestSuite.c index 11f7778f7c..c82bc33836 100644 --- a/src/libmongoc/tests/TestSuite.c +++ b/src/libmongoc/tests/TestSuite.c @@ -1232,35 +1232,3 @@ test_bulkwriteexception_str(const mongoc_bulkwriteexception_t *bwe) tmp_json(mongoc_bulkwriteexception_writeconcernerrors(bwe)), tmp_json(mongoc_bulkwriteexception_errorreply(bwe))); } - -int32_t -get_current_connection_count(const char *host_and_port) -{ - char *uri_str = bson_strdup_printf("mongodb://%s", host_and_port); - char *uri_str_with_auth = test_framework_add_user_password_from_env(uri_str); - mongoc_client_t *client = mongoc_client_new(uri_str_with_auth); - test_framework_set_ssl_opts(client); - bson_t *cmd = BCON_NEW("serverStatus", BCON_INT32(1)); - bson_t reply; - bson_error_t error; - bool ok = mongoc_client_command_simple(client, "admin", cmd, NULL, &reply, &error); - if (!ok) { - printf("serverStatus failed: %s\n", error.message); - abort(); - } - int32_t conns; - // Get `connections.current` from the reply. - { - bson_iter_t iter; - BSON_ASSERT(bson_iter_init_find(&iter, &reply, "connections")); - BSON_ASSERT(bson_iter_recurse(&iter, &iter)); - BSON_ASSERT(bson_iter_find(&iter, "current")); - conns = bson_iter_int32(&iter); - } - bson_destroy(&reply); - bson_destroy(cmd); - mongoc_client_destroy(client); - bson_free(uri_str_with_auth); - bson_free(uri_str); - return conns; -} diff --git a/src/libmongoc/tests/TestSuite.h b/src/libmongoc/tests/TestSuite.h index ef040efff0..06b5b8fd11 100644 --- a/src/libmongoc/tests/TestSuite.h +++ b/src/libmongoc/tests/TestSuite.h @@ -623,43 +623,6 @@ test_bulkwriteexception_str(const mongoc_bulkwriteexception_t *bwe); } else \ (void)0 -// `get_current_connection_count` returns the server reported connection count. -int32_t -get_current_connection_count(const char *host_and_port); - -#define ASSERT_CONN_COUNT(host, expect) \ - if (1) { \ - int32_t _got = get_current_connection_count(host); \ - if (_got != expect) { \ - test_error("Got unexpected connection count to %s:\n" \ - " Expected %" PRId32 ", got %" PRId32 "\n", \ - host, \ - expect, \ - _got); \ - } \ - } else \ - (void)0 - -#define ASSERT_EVENTUAL_CONN_COUNT(host, expect) \ - if (1) { \ - int64_t _start = bson_get_monotonic_time(); \ - while (true) { \ - int32_t _got = get_current_connection_count(host); \ - if (_got == expect) { \ - break; \ - } \ - int64_t _now = bson_get_monotonic_time(); \ - if (_now - _start > 5 * 1000 * 1000 /* five seconds */) { \ - test_error("Timed out waiting for expected connection count to %s:\n" \ - " Expected %" PRId32 ", got %" PRId32 "\n", \ - host, \ - expect, \ - _got); \ - } \ - } \ - } else \ - (void)0 - #define MAX_TEST_NAME_LENGTH 500 #define MAX_TEST_CHECK_FUNCS 10 diff --git a/src/libmongoc/tests/stream-tracker.c b/src/libmongoc/tests/stream-tracker.c new file mode 100644 index 0000000000..5f4107d2b9 --- /dev/null +++ b/src/libmongoc/tests/stream-tracker.c @@ -0,0 +1,446 @@ +/* + * Copyright 2009-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include // _mongoc_client_pool_set_stream_initiator +#include // mongoc_client_default_stream_initiator +#include + +#include // ASSERT_OR_PRINT +#include // tmp_bson +#include // test_framework_* + +typedef struct { + mongoc_host_list_t host; + int count_active; + int count_total; +} stream_tracker_entry; + +struct stream_tracker_t { +#define STREAM_TRACKER_MAX_ENTRIES 10 // Arbitrary + stream_tracker_entry entries[STREAM_TRACKER_MAX_ENTRIES]; + bson_mutex_t lock; + mongoc_client_pool_t *pool; + mongoc_client_t *client; +}; + +stream_tracker_t * +stream_tracker_new(void) +{ + stream_tracker_t *st = bson_malloc0(sizeof(stream_tracker_t)); + bson_mutex_init(&st->lock); + return st; +} + +static mongoc_stream_t * +stream_tracker_initiator(const mongoc_uri_t *uri, const mongoc_host_list_t *host, void *user_data, bson_error_t *error); + +void +stream_tracker_track_client(stream_tracker_t *st, mongoc_client_t *client) +{ + BSON_ASSERT_PARAM(st); + BSON_ASSERT_PARAM(client); + + // Can only track one pool or single-threaded client: + BSON_ASSERT(!st->pool); + BSON_ASSERT(!st->client); + + st->client = client; + mongoc_client_set_stream_initiator(client, stream_tracker_initiator, st); +} + +void +stream_tracker_track_pool(stream_tracker_t *st, mongoc_client_pool_t *pool) +{ + BSON_ASSERT_PARAM(st); + BSON_ASSERT_PARAM(pool); + + // Can only track one pool or single-threaded client: + BSON_ASSERT(!st->pool); + BSON_ASSERT(!st->client); + + st->pool = pool; + _mongoc_client_pool_set_stream_initiator(pool, stream_tracker_initiator, st); +} + +int +stream_tracker_count_active(stream_tracker_t *st, const char *host_) +{ + BSON_ASSERT_PARAM(st); + BSON_ASSERT_PARAM(host_); + + bson_error_t error; + mongoc_host_list_t host; + ASSERT_OR_PRINT(_mongoc_host_list_from_string_with_err(&host, host_, &error), error); + + int count = 0; + + // Find matching entry (if present): + { + bson_mutex_lock(&st->lock); + for (size_t i = 0; i < STREAM_TRACKER_MAX_ENTRIES; i++) { + if (_mongoc_host_list_compare_one(&st->entries[i].host, &host)) { + count = st->entries[i].count_active; + break; + } + } + bson_mutex_unlock(&st->lock); + } + + return count; +} + +int +stream_tracker_count_total(stream_tracker_t *st, const char *host_) +{ + BSON_ASSERT_PARAM(st); + BSON_ASSERT_PARAM(host_); + + bson_error_t error; + mongoc_host_list_t host; + ASSERT_OR_PRINT(_mongoc_host_list_from_string_with_err(&host, host_, &error), error); + + int count = 0; + + // Find matching entry (if present): + { + bson_mutex_lock(&st->lock); + for (size_t i = 0; i < STREAM_TRACKER_MAX_ENTRIES; i++) { + if (_mongoc_host_list_compare_one(&st->entries[i].host, &host)) { + count = st->entries[i].count_total; + break; + } + } + bson_mutex_unlock(&st->lock); + } + + return count; +} + +static void +stream_tracker_increment(stream_tracker_t *st, const mongoc_host_list_t *host) +{ + BSON_ASSERT_PARAM(st); + BSON_ASSERT_PARAM(host); + + bson_mutex_lock(&st->lock); + // Find (or create) matching entry. + for (size_t i = 0; i < STREAM_TRACKER_MAX_ENTRIES; i++) { + if (0 == strlen(st->entries[i].host.host_and_port)) { + // No matching entry. Create one. + st->entries[i].host = *host; + st->entries[i].count_active = 1; + st->entries[i].count_total = 1; + bson_mutex_unlock(&st->lock); + return; + } + if (_mongoc_host_list_compare_one(&st->entries[i].host, host)) { + st->entries[i].count_active++; + st->entries[i].count_total++; + bson_mutex_unlock(&st->lock); + return; + } + } + + test_error("No room to add %s. Increase STREAM_TRACKER_MAX_ENTRIES.", host->host_and_port); +} + +static void +stream_tracker_decrement(stream_tracker_t *st, const mongoc_host_list_t *host) +{ + BSON_ASSERT_PARAM(st); + BSON_ASSERT_PARAM(host); + + bson_mutex_lock(&st->lock); + // Find matching entry. + for (size_t i = 0; i < STREAM_TRACKER_MAX_ENTRIES; i++) { + if (0 == strlen(st->entries[i].host.host_and_port)) { + test_error("Unexpected: no matching entry for %s", st->entries[i].host.host_and_port); + } + if (_mongoc_host_list_compare_one(&st->entries[i].host, host)) { + ASSERT(st->entries[i].count_active > 0); + st->entries[i].count_active--; + bson_mutex_unlock(&st->lock); + return; + } + } + + test_error("Unexpected. No matching entry to decrement!"); +} + +void +stream_tracker_destroy(stream_tracker_t *st) +{ + if (!st) { + return; + } + bson_mutex_destroy(&st->lock); + bson_free(st); +} + +// tracked_stream_t wraps a mongoc_stream_t and updates a linked stream_tracker. +#define MONGOC_STREAM_TRACKED 8 +typedef struct { + mongoc_stream_t vtable; + mongoc_stream_t *wrapped; + mongoc_host_list_t host; + stream_tracker_t *st; +} tracked_stream_t; + +static int +tracked_stream_close(mongoc_stream_t *stream) +{ + BSON_ASSERT_PARAM(stream); + return mongoc_stream_close(((tracked_stream_t *)stream)->wrapped); +} + + +static void +tracked_stream_destroy(mongoc_stream_t *stream) +{ + BSON_ASSERT_PARAM(stream); + tracked_stream_t *ts = (tracked_stream_t *)stream; + stream_tracker_decrement(ts->st, &ts->host); + mongoc_stream_destroy(ts->wrapped); + bson_free(ts); +} + + +static void +tracked_stream_failed(mongoc_stream_t *stream) +{ + BSON_ASSERT_PARAM(stream); + tracked_stream_t *ts = (tracked_stream_t *)stream; + stream_tracker_decrement(ts->st, &ts->host); + mongoc_stream_failed(ts->wrapped); + bson_free(ts); +} + + +static int +tracked_stream_setsockopt(mongoc_stream_t *stream, int level, int optname, void *optval, mongoc_socklen_t optlen) +{ + BSON_ASSERT_PARAM(stream); + return mongoc_stream_setsockopt(((tracked_stream_t *)stream)->wrapped, level, optname, optval, optlen); +} + + +static int +tracked_stream_flush(mongoc_stream_t *stream) +{ + BSON_ASSERT_PARAM(stream); + return mongoc_stream_flush(((tracked_stream_t *)stream)->wrapped); +} + + +static ssize_t +tracked_stream_readv( + mongoc_stream_t *stream, mongoc_iovec_t *iov, size_t iovcnt, size_t min_bytes, int32_t timeout_msec) +{ + BSON_ASSERT_PARAM(stream); + return mongoc_stream_readv(((tracked_stream_t *)stream)->wrapped, iov, iovcnt, min_bytes, timeout_msec); +} + + +static ssize_t +tracked_stream_writev(mongoc_stream_t *stream, mongoc_iovec_t *iov, size_t iovcnt, int32_t timeout_msec) +{ + BSON_ASSERT_PARAM(stream); + return mongoc_stream_writev(((tracked_stream_t *)stream)->wrapped, iov, iovcnt, timeout_msec); +} + + +static bool +tracked_stream_check_closed(mongoc_stream_t *stream) +{ + BSON_ASSERT_PARAM(stream); + return mongoc_stream_check_closed(((tracked_stream_t *)stream)->wrapped); +} + + +static bool +tracked_stream_timed_out(mongoc_stream_t *stream) +{ + BSON_ASSERT_PARAM(stream); + return mongoc_stream_timed_out(((tracked_stream_t *)stream)->wrapped); +} + + +static bool +tracked_stream_should_retry(mongoc_stream_t *stream) +{ + BSON_ASSERT_PARAM(stream); + return mongoc_stream_should_retry(((tracked_stream_t *)stream)->wrapped); +} + + +static mongoc_stream_t * +tracked_stream_get_base_stream(mongoc_stream_t *stream) +{ + BSON_ASSERT_PARAM(stream); + mongoc_stream_t *wrapped = ((tracked_stream_t *)stream)->wrapped; + + if (wrapped->get_base_stream) { + return wrapped->get_base_stream(wrapped); + } + + return wrapped; +} + + +static mongoc_stream_t * +tracked_stream_new(mongoc_stream_t *stream, stream_tracker_t *st, const mongoc_host_list_t *host) +{ + BSON_ASSERT_PARAM(stream); + BSON_ASSERT_PARAM(st); + BSON_ASSERT_PARAM(host); + + tracked_stream_t *ts = (tracked_stream_t *)bson_malloc0(sizeof(tracked_stream_t)); + + // Set vtable to wrapper functions: + ts->vtable.type = MONGOC_STREAM_TRACKED; + ts->vtable.close = tracked_stream_close; + ts->vtable.destroy = tracked_stream_destroy; + ts->vtable.failed = tracked_stream_failed; + ts->vtable.flush = tracked_stream_flush; + ts->vtable.readv = tracked_stream_readv; + ts->vtable.writev = tracked_stream_writev; + ts->vtable.setsockopt = tracked_stream_setsockopt; + ts->vtable.check_closed = tracked_stream_check_closed; + ts->vtable.timed_out = tracked_stream_timed_out; + ts->vtable.should_retry = tracked_stream_should_retry; + ts->vtable.get_base_stream = tracked_stream_get_base_stream; + + // Wrap base stream: + ts->wrapped = stream; + + // Set data for tracking: + ts->st = st; + ts->host = *host; + + // Record a new stream created to host: + stream_tracker_increment(ts->st, &ts->host); + + return (mongoc_stream_t *)ts; +} + +mongoc_stream_t * +stream_tracker_initiator(const mongoc_uri_t *uri, const mongoc_host_list_t *host, void *user_data, bson_error_t *error) +{ + BSON_ASSERT_PARAM(uri); + BSON_ASSERT_PARAM(host); + BSON_ASSERT_PARAM(user_data); + BSON_ASSERT_PARAM(error); + + stream_tracker_t *st = (stream_tracker_t *)user_data; + + // mongoc_client_default_stream_initiator expects a client context. If tracking a pool, pop a temporary client: + mongoc_client_t *client = (st->pool) ? mongoc_client_pool_pop(st->pool) : st->client; + ASSERT(client); + + mongoc_stream_t *base_stream = mongoc_client_default_stream_initiator(uri, host, client, error); + ASSERT_OR_PRINT(base_stream, (*error)); + + if (st->pool) { + mongoc_client_pool_push(st->pool, client); + } + return tracked_stream_new(base_stream, st, host); +} + +static void +test_stream_tracker(void) +{ + // Get first host+port from test environment. Example: "localhost:27017" or "[::1]:27017" + char *first_host_and_port = test_framework_get_host_and_port(); + + // Test single-threaded client: + { + stream_tracker_t *st = stream_tracker_new(); + mongoc_client_t *client = test_framework_new_default_client(); + stream_tracker_track_client(st, client); + + // Expect initial count is 0: + stream_tracker_assert_active_count(st, first_host_and_port, 0); + + // Do operation requiring a stream. Target first host: + bson_error_t error; + ASSERT_OR_PRINT(mongoc_client_command_simple_with_server_id( + client, "admin", tmp_bson("{'ping': 1}"), NULL, 1 /* server ID */, NULL, &error), + error); + + // Expect active and total count incremented: + stream_tracker_assert_active_count(st, first_host_and_port, 1); + stream_tracker_assert_total_count(st, first_host_and_port, 1); + + // Destroy stream: + mongoc_client_destroy(client); + + // Expect active count decremented: + stream_tracker_assert_active_count(st, first_host_and_port, 0); + // Expect total count unchanged: + stream_tracker_assert_total_count(st, first_host_and_port, 1); + + stream_tracker_destroy(st); + } + + // Test client-pool: + { + stream_tracker_t *st = stream_tracker_new(); + mongoc_client_pool_t *pool = test_framework_new_default_client_pool(); + stream_tracker_track_pool(st, pool); + + // Expect initial count is 0: + stream_tracker_assert_active_count(st, first_host_and_port, 0); + + // Pop a client, triggering background connections to be created: + mongoc_client_t *client = mongoc_client_pool_pop(pool); + + // Server 4.4 added support for streaming monitoring and has 2 monitoring connections. + int monitor_count = test_framework_get_server_version() >= test_framework_str_to_version("4.4") ? 2 : 1; + stream_tracker_assert_eventual_active_count(st, first_host_and_port, monitor_count); + + // Do operation requiring a stream. Target first host: + bson_error_t error; + ASSERT_OR_PRINT(mongoc_client_command_simple_with_server_id( + client, "admin", tmp_bson("{'ping': 1}"), NULL, 1 /* server ID */, NULL, &error), + error); + + // Expect active and total count incremented: + stream_tracker_assert_active_count(st, first_host_and_port, monitor_count + 1); + stream_tracker_assert_total_count(st, first_host_and_port, monitor_count + 1); + + // Destroy pool. + mongoc_client_pool_push(pool, client); + mongoc_client_pool_destroy(pool); + + // Expect active count decremented: + stream_tracker_assert_active_count(st, first_host_and_port, 0); + // Expect total count unchanged: + stream_tracker_assert_total_count(st, first_host_and_port, monitor_count + 1); + + stream_tracker_destroy(st); + } + + bson_free(first_host_and_port); +} + +void +test_stream_tracker_install(TestSuite *suite) +{ + TestSuite_AddLive(suite, "/stream_tracker/selftest", test_stream_tracker); +} diff --git a/src/libmongoc/tests/stream-tracker.h b/src/libmongoc/tests/stream-tracker.h new file mode 100644 index 0000000000..29b28b2354 --- /dev/null +++ b/src/libmongoc/tests/stream-tracker.h @@ -0,0 +1,98 @@ +/* + * Copyright 2009-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef STREAM_TRACKER_H +#define STREAM_TRACKER_H + +#include +#include + +#include + +#include + +// stream_tracker_t is a test utility to count streams created to servers. +typedef struct stream_tracker_t stream_tracker_t; + +stream_tracker_t * +stream_tracker_new(void); + +// stream_tracker_track_client tracks streams in a single-threaded client. +void +stream_tracker_track_client(stream_tracker_t *st, mongoc_client_t *client); + +// stream_tracker_track_pool tracks streams in a pool. Call before calling mongoc_client_pool_pop. +void +stream_tracker_track_pool(stream_tracker_t *st, mongoc_client_pool_t *pool); + +// stream_tracker_count_active returns a count of active streams. +int +stream_tracker_count_active(stream_tracker_t *st, const char *host); + +// stream_tracker_count_total returns a cumulative count of streams. +int +stream_tracker_count_total(stream_tracker_t *st, const char *host); + +void +stream_tracker_destroy(stream_tracker_t *st); + +#define stream_tracker_assert_active_count(st, host, expect) \ + if (1) { \ + int _got = stream_tracker_count_active(st, host); \ + if (_got != expect) { \ + test_error("Got unexpected active stream count to %s:\n" \ + " Expected %d, got %d", \ + host, \ + expect, \ + _got); \ + } \ + } else \ + ((void)0) + +#define stream_tracker_assert_total_count(st, host, expect) \ + if (1) { \ + int _got = stream_tracker_count_total(st, host); \ + if (_got != expect) { \ + test_error("Got unexpected total stream count to %s:\n" \ + " Expected %d, got %d", \ + host, \ + expect, \ + _got); \ + } \ + } else \ + ((void)0) + +#define stream_tracker_assert_eventual_active_count(st, host, expect) \ + if (1) { \ + mlib_timer _timer = mlib_expires_after(5, s); \ + while (true) { \ + int _got = stream_tracker_count_active(st, host); \ + if (_got == expect) { \ + break; \ + } \ + if (mlib_timer_is_expired(_timer)) { \ + test_error("Timed out waiting for expected active stream count to %s:\n" \ + " Expected %d, got %d", \ + host, \ + expect, \ + _got); \ + } \ + mlib_sleep_for(100, ms); \ + } \ + } else \ + ((void)0) + +#endif // STREAM_TRACKER_H diff --git a/src/libmongoc/tests/test-libmongoc-main.c b/src/libmongoc/tests/test-libmongoc-main.c index 8fec46534d..1f17ea656b 100644 --- a/src/libmongoc/tests/test-libmongoc-main.c +++ b/src/libmongoc/tests/test-libmongoc-main.c @@ -163,6 +163,7 @@ main(int argc, char *argv[]) TEST_INSTALL(test_bulkwrite_install); TEST_INSTALL(test_mongoc_oidc_callback_install); TEST_INSTALL(test_secure_channel_install); + TEST_INSTALL(test_stream_tracker_install); const int ret = TestSuite_Run(&suite); diff --git a/src/libmongoc/tests/test-mongoc-client-pool.c b/src/libmongoc/tests/test-mongoc-client-pool.c index ad443429ac..3878b21511 100644 --- a/src/libmongoc/tests/test-mongoc-client-pool.c +++ b/src/libmongoc/tests/test-mongoc-client-pool.c @@ -10,6 +10,8 @@ #include #include +#include + static void test_mongoc_client_pool_basic(void) @@ -387,20 +389,28 @@ disconnects_removed_servers_on_push(void *unused) bool ok; bson_t *ping = BCON_NEW("ping", BCON_INT32(1)); + const char *host0 = "localhost:27017"; + const char *host1 = "localhost:27018"; + stream_tracker_t *st = stream_tracker_new(); + // Create a client pool to two servers. mongoc_client_pool_t *pool; { - mongoc_uri_t *uri = mongoc_uri_new("mongodb://localhost:27017,localhost:27018"); + char *uristr = bson_strdup_printf("mongodb://%s,%s", host0, host1); + mongoc_uri_t *uri = mongoc_uri_new(uristr); // Set a short heartbeat so server monitors get quick responses. mongoc_uri_set_option_as_int32(uri, MONGOC_URI_HEARTBEATFREQUENCYMS, MONGOC_TOPOLOGY_MIN_HEARTBEAT_FREQUENCY_MS); pool = mongoc_client_pool_new(uri); test_framework_set_pool_ssl_opts(pool); mongoc_uri_destroy(uri); + bson_free(uristr); } - // Count connections to both servers. - int32_t conns_27017_before = get_current_connection_count("localhost:27017"); - int32_t conns_27018_before = get_current_connection_count("localhost:27018"); + stream_tracker_track_pool(st, pool); + + // Expect no streams created yet: + stream_tracker_assert_active_count(st, host0, 0); + stream_tracker_assert_active_count(st, host1, 0); // Pop (and push) a client to start background monitoring. { @@ -408,8 +418,8 @@ disconnects_removed_servers_on_push(void *unused) mongoc_client_pool_push(pool, client); // Wait for monitoring connections to be created. // Expect two monitoring connections per server to be created in background. - ASSERT_EVENTUAL_CONN_COUNT("localhost:27017", conns_27017_before + 2); - ASSERT_EVENTUAL_CONN_COUNT("localhost:27018", conns_27018_before + 2); + stream_tracker_assert_eventual_active_count(st, host0, 2); + stream_tracker_assert_eventual_active_count(st, host1, 2); } // Send 'ping' commands on a client to each server to create operation connections. @@ -421,8 +431,8 @@ disconnects_removed_servers_on_push(void *unused) ASSERT_OR_PRINT(ok, error); mongoc_client_pool_push(pool, client); // Expect an operation connection is created. - ASSERT_CONN_COUNT("localhost:27017", conns_27017_before + 2 + 1); - ASSERT_CONN_COUNT("localhost:27018", conns_27018_before + 2 + 1); + stream_tracker_assert_active_count(st, host0, 2 + 1); + stream_tracker_assert_active_count(st, host1, 2 + 1); } // Mock removal of server 27018 from topology. @@ -436,18 +446,19 @@ disconnects_removed_servers_on_push(void *unused) // Expect connections are closed to removed server. { // Expect monitoring connections to be closed in background. - ASSERT_EVENTUAL_CONN_COUNT("localhost:27017", conns_27017_before + 2 + 1); - ASSERT_EVENTUAL_CONN_COUNT("localhost:27018", conns_27018_before + 1); + stream_tracker_assert_eventual_active_count(st, host0, 2 + 1); + stream_tracker_assert_eventual_active_count(st, host1, 1); // Pop and push the client to "prune" the stale operation connections. mongoc_client_t *client = mongoc_client_pool_pop(pool); mongoc_client_pool_push(pool, client); - ASSERT_CONN_COUNT("localhost:27017", conns_27017_before + 2 + 1); - ASSERT_CONN_COUNT("localhost:27018", conns_27018_before); + stream_tracker_assert_active_count(st, host0, 2 + 1); + stream_tracker_assert_active_count(st, host1, 0); } mongoc_client_pool_destroy(pool); bson_destroy(ping); + stream_tracker_destroy(st); } // Test that connections are closed to servers removed from the topology on clients checked into the pool. @@ -459,20 +470,28 @@ disconnects_removed_servers_in_pool(void *unused) bool ok; bson_t *ping = BCON_NEW("ping", BCON_INT32(1)); + const char *host0 = "localhost:27017"; + const char *host1 = "localhost:27018"; + stream_tracker_t *st = stream_tracker_new(); + // Create a client pool to two servers. mongoc_client_pool_t *pool; { - mongoc_uri_t *uri = mongoc_uri_new("mongodb://localhost:27017,localhost:27018"); + char *uristr = bson_strdup_printf("mongodb://%s,%s", host0, host1); + mongoc_uri_t *uri = mongoc_uri_new(uristr); // Set a short heartbeat so server monitors get quick responses. mongoc_uri_set_option_as_int32(uri, MONGOC_URI_HEARTBEATFREQUENCYMS, MONGOC_TOPOLOGY_MIN_HEARTBEAT_FREQUENCY_MS); pool = mongoc_client_pool_new(uri); test_framework_set_pool_ssl_opts(pool); mongoc_uri_destroy(uri); + bson_free(uristr); } - // Count connections to both servers. - int32_t conns_27017_before = get_current_connection_count("localhost:27017"); - int32_t conns_27018_before = get_current_connection_count("localhost:27018"); + stream_tracker_track_pool(st, pool); + + // Expect no streams created yet: + stream_tracker_assert_active_count(st, host0, 0); + stream_tracker_assert_active_count(st, host1, 0); // Pop (and push) a client to start background monitoring. { @@ -480,8 +499,8 @@ disconnects_removed_servers_in_pool(void *unused) mongoc_client_pool_push(pool, client); // Wait for monitoring connections to be created. // Expect two monitoring connections per server to be created in background. - ASSERT_EVENTUAL_CONN_COUNT("localhost:27017", conns_27017_before + 2); - ASSERT_EVENTUAL_CONN_COUNT("localhost:27018", conns_27018_before + 2); + stream_tracker_assert_eventual_active_count(st, host0, 2); + stream_tracker_assert_eventual_active_count(st, host1, 2); } // Send 'ping' commands on two clients to each server to create operation connections. @@ -503,8 +522,8 @@ disconnects_removed_servers_in_pool(void *unused) mongoc_client_pool_push(pool, client1); // Expect an operation connection is created per client. - ASSERT_CONN_COUNT("localhost:27017", conns_27017_before + 2 + 2); - ASSERT_CONN_COUNT("localhost:27018", conns_27018_before + 2 + 2); + stream_tracker_assert_active_count(st, host0, 2 + 2); + stream_tracker_assert_active_count(st, host1, 2 + 2); } // Mock removal of server 27018 from topology. @@ -518,17 +537,18 @@ disconnects_removed_servers_in_pool(void *unused) // Expect connections are closed to removed server. { // Expect monitoring connections to be closed in background. - ASSERT_EVENTUAL_CONN_COUNT("localhost:27017", conns_27017_before + 2 + 2); - ASSERT_EVENTUAL_CONN_COUNT("localhost:27018", conns_27018_before + 2); + stream_tracker_assert_eventual_active_count(st, host0, 2 + 2); + stream_tracker_assert_eventual_active_count(st, host1, 2); // Pop and push one client to "prune" the stale operation connections for both clients. mongoc_client_t *client = mongoc_client_pool_pop(pool); mongoc_client_pool_push(pool, client); - ASSERT_CONN_COUNT("localhost:27017", conns_27017_before + 2 + 2); - ASSERT_CONN_COUNT("localhost:27018", conns_27018_before); + stream_tracker_assert_active_count(st, host0, 2 + 2); + stream_tracker_assert_active_count(st, host1, 0); } mongoc_client_pool_destroy(pool); + stream_tracker_destroy(st); bson_destroy(ping); } @@ -581,12 +601,6 @@ test_mongoc_client_set_stream_initiator(void) mongoc_client_pool_destroy(pool); } -static int -test_framework_skip_due_to_cdriver6080(void) -{ - return 0; // CDRIVER-6080 -} - void test_client_pool_install(TestSuite *suite) { @@ -617,7 +631,6 @@ test_client_pool_install(TestSuite *suite) disconnects_removed_servers_on_push, NULL, NULL, - test_framework_skip_due_to_cdriver6080, test_framework_skip_if_not_mongos /* require mongos to ensure two servers available */, test_framework_skip_if_max_wire_version_less_than_9 /* require server 4.4+ for streaming monitoring protocol */); @@ -627,7 +640,6 @@ test_client_pool_install(TestSuite *suite) disconnects_removed_servers_in_pool, NULL, NULL, - test_framework_skip_due_to_cdriver6080, test_framework_skip_if_not_mongos /* require mongos to ensure two servers available */, test_framework_skip_if_max_wire_version_less_than_9 /* require server 4.4+ for streaming monitoring protocol */); diff --git a/src/libmongoc/tests/test-mongoc-dns.c b/src/libmongoc/tests/test-mongoc-dns.c index 39eadbc0d9..4668d41d8c 100644 --- a/src/libmongoc/tests/test-mongoc-dns.c +++ b/src/libmongoc/tests/test-mongoc-dns.c @@ -20,6 +20,8 @@ #include #include +#include + static void _assert_options_match(const bson_t *test, mongoc_uri_t *uri) { @@ -1285,10 +1287,13 @@ test_removing_servers_closes_connections(void *unused) bson_error_t error; bool ok; bson_t *ping = BCON_NEW("ping", BCON_INT32(1)); + stream_tracker_t *st = stream_tracker_new(); // Create a client pool to mongodb+srv://test1.test.build.10gen.cc. The URI resolves to two SRV records: // - localhost.test.build.10gen.cc:27017 // - localhost.test.build.10gen.cc:27018 + char *host0 = "localhost.test.build.10gen.cc:27017"; + char *host1 = "localhost.test.build.10gen.cc:27018"; mongoc_client_pool_t *pool; { mongoc_uri_t *uri = mongoc_uri_new("mongodb+srv://test1.test.build.10gen.cc"); @@ -1303,16 +1308,17 @@ test_removing_servers_closes_connections(void *unused) // Override the SRV polling callback: mongoc_topology_t *topology = _mongoc_client_pool_get_topology(pool); bson_mutex_init(&rr_override.lock); - rr_override.hosts = MAKE_HOSTS("localhost.test.build.10gen.cc:27017", "localhost.test.build.10gen.cc:27018"); + rr_override.hosts = MAKE_HOSTS(host0, host1); _mongoc_topology_set_rr_resolver(topology, _mock_rr_resolver_with_override); // Set a shorter SRV rescan interval. _mongoc_topology_set_srv_polling_rescan_interval_ms(topology, RESCAN_INTERVAL_MS); mongoc_uri_destroy(uri); } - // Count connections to both servers. - int32_t conns_27017_before = get_current_connection_count("localhost:27017"); - int32_t conns_27018_before = get_current_connection_count("localhost:27018"); + stream_tracker_track_pool(st, pool); + // Expect no streams created yet: + stream_tracker_assert_active_count(st, host0, 0); + stream_tracker_assert_active_count(st, host1, 0); // Pop (and push) a client to start background monitoring. { @@ -1320,8 +1326,8 @@ test_removing_servers_closes_connections(void *unused) mongoc_client_pool_push(pool, client); // Wait for monitoring connections to be created. // Expect two monitoring connections per server to be created in background. - ASSERT_EVENTUAL_CONN_COUNT("localhost:27017", conns_27017_before + 2); - ASSERT_EVENTUAL_CONN_COUNT("localhost:27018", conns_27018_before + 2); + stream_tracker_assert_eventual_active_count(st, host0, 2); + stream_tracker_assert_eventual_active_count(st, host1, 2); } // Send 'ping' commands on a client to each server to create operation connections. @@ -1333,33 +1339,34 @@ test_removing_servers_closes_connections(void *unused) ASSERT_OR_PRINT(ok, error); mongoc_client_pool_push(pool, client); // Expect an operation connection is created. - ASSERT_CONN_COUNT("localhost:27017", conns_27017_before + 2 + 1); - ASSERT_CONN_COUNT("localhost:27018", conns_27018_before + 2 + 1); + stream_tracker_assert_active_count(st, host0, 2 + 1); + stream_tracker_assert_active_count(st, host1, 2 + 1); } - // Mock removal of localhost:27018. + // Mock removal of host1. { bson_mutex_lock(&rr_override.lock); _mongoc_host_list_destroy_all(rr_override.hosts); - rr_override.hosts = MAKE_HOSTS("localhost.test.build.10gen.cc:27017"); + rr_override.hosts = MAKE_HOSTS(host0); bson_mutex_unlock(&rr_override.lock); } // Expect connections are closed to removed server. { // Expect monitoring connections to be closed in background. - ASSERT_EVENTUAL_CONN_COUNT("localhost:27017", conns_27017_before + 2 + 1); - ASSERT_EVENTUAL_CONN_COUNT("localhost:27018", conns_27018_before + 1); + stream_tracker_assert_eventual_active_count(st, host0, 2 + 1); + stream_tracker_assert_eventual_active_count(st, host1, 1); // Pop and push the client to "prune" the stale operation connections. mongoc_client_t *client = mongoc_client_pool_pop(pool); mongoc_client_pool_push(pool, client); - ASSERT_CONN_COUNT("localhost:27017", conns_27017_before + 2 + 1); - ASSERT_CONN_COUNT("localhost:27018", conns_27018_before); + stream_tracker_assert_active_count(st, host0, 2 + 1); + stream_tracker_assert_active_count(st, host1, 0); } mongoc_client_pool_destroy(pool); bson_mutex_destroy(&rr_override.lock); + stream_tracker_destroy(st); bson_destroy(ping); } diff --git a/src/libmongoc/tests/test-mongoc-exhaust.c b/src/libmongoc/tests/test-mongoc-exhaust.c index 0e1f64ef79..f34dbd20da 100644 --- a/src/libmongoc/tests/test-mongoc-exhaust.c +++ b/src/libmongoc/tests/test-mongoc-exhaust.c @@ -14,6 +14,8 @@ #include #include +#include + #include @@ -86,27 +88,6 @@ get_generation(mongoc_client_t *client, mongoc_cursor_t *cursor) return generation; } -static uint32_t -get_connection_count(mongoc_client_t *client) -{ - bson_error_t error; - bson_t cmd = BSON_INITIALIZER; - bson_t reply; - bool res; - int conns; - - ASSERT(client); - - BSON_APPEND_INT32(&cmd, "serverStatus", 1); - res = mongoc_client_command_simple(client, "admin", &cmd, NULL, &reply, &error); - ASSERT_OR_PRINT(res, error); - - conns = bson_lookup_int32(&reply, "connections.totalCreated"); - bson_destroy(&cmd); - bson_destroy(&reply); - return conns; -} - static void test_exhaust_cursor(bool pooled) { @@ -127,20 +108,19 @@ test_exhaust_cursor(bool pooled) bson_error_t error; bson_oid_t oid; int64_t generation1; - uint32_t connection_count1; - mongoc_client_t *audit_client; + int connection_count1; + stream_tracker_t *st = stream_tracker_new(); if (pooled) { pool = test_framework_new_default_client_pool(); + stream_tracker_track_pool(st, pool); client = mongoc_client_pool_pop(pool); } else { client = test_framework_new_default_client(); + stream_tracker_track_client(st, client); } BSON_ASSERT(client); - /* Use a separate client to count connections. */ - audit_client = test_framework_new_default_client(); - collection = get_test_collection(client, "test_exhaust_cursor"); BSON_ASSERT(collection); @@ -194,8 +174,9 @@ test_exhaust_cursor(bool pooled) /* destroy the cursor, make sure the connection pool was not cleared */ generation1 = get_generation(client, cursor); - /* Getting the connection count requires a new enough server. */ - connection_count1 = get_connection_count(audit_client); + mongoc_host_list_t host; + mongoc_cursor_get_host(cursor, &host); + connection_count1 = stream_tracker_count_total(st, host.host_and_port); mongoc_cursor_destroy(cursor); BSON_ASSERT(!client->in_exhaust); } @@ -216,7 +197,9 @@ test_exhaust_cursor(bool pooled) /* The pool was not cleared. */ ASSERT_CMPINT64(generation1, ==, get_generation(client, cursor2)); /* But a new connection was made. */ - ASSERT_CMPINT32(connection_count1 + 1, ==, get_connection_count(audit_client)); + mongoc_host_list_t host; + mongoc_cursor_get_host(cursor2, &host); + stream_tracker_assert_total_count(st, host.host_and_port, connection_count1 + 1); for (i = 0; i < 5; i++) { r = mongoc_cursor_next(cursor2, &doc); @@ -302,7 +285,7 @@ test_exhaust_cursor(bool pooled) } else { mongoc_client_destroy(client); } - mongoc_client_destroy(audit_client); + stream_tracker_destroy(st); } static void diff --git a/src/libmongoc/tests/test-mongoc-primary-stepdown.c b/src/libmongoc/tests/test-mongoc-primary-stepdown.c index c463f580a0..da17735162 100644 --- a/src/libmongoc/tests/test-mongoc-primary-stepdown.c +++ b/src/libmongoc/tests/test-mongoc-primary-stepdown.c @@ -16,6 +16,9 @@ #include #include +#include + + typedef struct { // If `use_pooled` is true, a test is run with a `mongoc_client_t` obtained // from a `mongoc_client_pool_t`. @@ -73,34 +76,15 @@ _setup_test_with_client(mongoc_client_t *client) } static int -_connection_count(mongoc_client_t *client, uint32_t server_id) +_connection_count(mongoc_client_t *client, stream_tracker_t *st, uint32_t server_id) { - bson_error_t error; - bson_iter_t iter; - bson_iter_t child; - bson_t cmd = BSON_INITIALIZER; - bson_t reply; - bool res; - int conns; - - ASSERT(client); - - BSON_APPEND_INT32(&cmd, "serverStatus", 1); - - res = mongoc_client_command_simple_with_server_id(client, "admin", &cmd, NULL, server_id, &reply, &error); - ASSERT_OR_PRINT(res, error); - - ASSERT(bson_iter_init(&iter, &reply)); - ASSERT(bson_iter_find_descendant(&iter, "connections.totalCreated", &child)); - conns = bson_iter_int32(&child); - - bson_destroy(&cmd); - bson_destroy(&reply); - + mongoc_server_description_t *sd = mongoc_client_get_server_description(client, server_id); + int conns = stream_tracker_count_total(st, sd->host.host_and_port); + mongoc_server_description_destroy(sd); return conns; } -typedef void (*_test_fn_t)(mongoc_client_t *); +typedef void (*_test_fn_t)(mongoc_client_t *, stream_tracker_t *); static void _run_test_single_or_pooled(_test_fn_t test, bool use_pooled) @@ -108,6 +92,7 @@ _run_test_single_or_pooled(_test_fn_t test, bool use_pooled) mongoc_uri_t *uri; mongoc_client_t *client; mongoc_client_pool_t *pool; + stream_tracker_t *st = stream_tracker_new(); uri = _get_test_uri(); @@ -116,27 +101,29 @@ _run_test_single_or_pooled(_test_fn_t test, bool use_pooled) client = test_framework_client_new_from_uri(uri, NULL); test_framework_set_ssl_opts(client); _setup_test_with_client(client); - test(client); + stream_tracker_track_client(st, client); + test(client, st); mongoc_client_destroy(client); } else { /* Run in pooled mode */ pool = test_framework_client_pool_new_from_uri(uri, NULL); test_framework_set_pool_ssl_opts(pool); + stream_tracker_track_pool(st, pool); client = mongoc_client_pool_pop(pool); _setup_test_with_client(client); /* Wait one second to be assured that the RTT connection has been * established as well. */ mlib_sleep_for(1, s); - test(client); + test(client, st); mongoc_client_pool_push(pool, client); mongoc_client_pool_destroy(pool); } - + stream_tracker_destroy(st); mongoc_uri_destroy(uri); } static void -test_getmore_iteration(mongoc_client_t *client) +test_getmore_iteration(mongoc_client_t *client, stream_tracker_t *st) { mongoc_database_t *db; mongoc_collection_t *coll; @@ -148,13 +135,13 @@ test_getmore_iteration(mongoc_client_t *client) uint32_t primary_id; ASSERT(client); + ASSERT(st); coll = mongoc_client_get_collection(client, "step-down", "step-down"); db = mongoc_client_get_database(client, "admin"); /* Store the primary ID. After step down, the primary may be a different - * server. We must execute serverStatus against the same server to check - * connection counts. */ + * server. Check connection counts on the same server. */ primary_id = mongoc_topology_select_server_id(client->topology, MONGOC_SS_WRITE, TEST_SS_LOG_CONTEXT, @@ -163,7 +150,7 @@ test_getmore_iteration(mongoc_client_t *client) NULL /* deprioritized servers */, &error); ASSERT_OR_PRINT(primary_id, error); - conn_count = _connection_count(client, primary_id); + conn_count = _connection_count(client, st, primary_id); /* Insert 5 documents */ { @@ -206,7 +193,7 @@ test_getmore_iteration(mongoc_client_t *client) ASSERT(mongoc_cursor_next(cursor, &doc)); /* Verify that no new connections have been created */ - ASSERT_CMPINT(conn_count, ==, _connection_count(client, primary_id)); + ASSERT_CMPINT(conn_count, ==, _connection_count(client, st, primary_id)); mongoc_cursor_destroy(cursor); mongoc_collection_destroy(coll); @@ -227,7 +214,7 @@ test_getmore_iteration_runner(void *ctx_void) } static void -test_not_primary_keep_pool(mongoc_client_t *client) +test_not_primary_keep_pool(mongoc_client_t *client, stream_tracker_t *st) { mongoc_database_t *db; mongoc_collection_t *coll; @@ -237,12 +224,12 @@ test_not_primary_keep_pool(mongoc_client_t *client) uint32_t primary_id; ASSERT(client); + ASSERT(st); /* Configure fail points */ db = mongoc_client_get_database(client, "admin"); /* Store the primary ID. After step down, the primary may be a different - * server. We must execute serverStatus against the same server to check - * connection counts. */ + * server. Check connection counts on the same server. */ primary_id = mongoc_topology_select_server_id(client->topology, MONGOC_SS_WRITE, TEST_SS_LOG_CONTEXT, @@ -251,7 +238,7 @@ test_not_primary_keep_pool(mongoc_client_t *client) NULL /* deprioritized servers */, &error); ASSERT_OR_PRINT(primary_id, error); - conn_count = _connection_count(client, primary_id); + conn_count = _connection_count(client, st, primary_id); res = mongoc_database_command_simple(db, tmp_bson("{'configureFailPoint': 'failCommand', " "'mode': {'times': 1}, " @@ -277,7 +264,7 @@ test_not_primary_keep_pool(mongoc_client_t *client) ASSERT(res); /* Verify that the connection pool has not been cleared */ - ASSERT_CMPINT(conn_count, ==, _connection_count(client, primary_id)); + ASSERT_CMPINT(conn_count, ==, _connection_count(client, st, primary_id)); mongoc_collection_destroy(coll); mongoc_database_destroy(db); @@ -297,7 +284,7 @@ test_not_primary_keep_pool_runner(void *ctx_void) } static void -test_shutdown_reset_pool(mongoc_client_t *client) +test_shutdown_reset_pool(mongoc_client_t *client, stream_tracker_t *st) { mongoc_database_t *db; mongoc_collection_t *coll; @@ -308,13 +295,13 @@ test_shutdown_reset_pool(mongoc_client_t *client) uint32_t primary_id; ASSERT(client); + ASSERT(st); /* Configure fail points */ read_prefs = mongoc_read_prefs_new(MONGOC_READ_PRIMARY); db = mongoc_client_get_database(client, "admin"); /* Store the primary ID. After step down, the primary may be a different - * server. We must execute serverStatus against the same server to check - * connection counts. */ + * server. Check connection counts on the same server. */ primary_id = mongoc_topology_select_server_id(client->topology, MONGOC_SS_WRITE, TEST_SS_LOG_CONTEXT, @@ -323,7 +310,7 @@ test_shutdown_reset_pool(mongoc_client_t *client) NULL /* deprioritized servers */, &error); ASSERT_OR_PRINT(primary_id, error); - conn_count = _connection_count(client, primary_id); + conn_count = _connection_count(client, st, primary_id); res = mongoc_database_command_simple(db, tmp_bson("{'configureFailPoint': 'failCommand', " "'mode': {'times': 1}, " @@ -341,13 +328,13 @@ test_shutdown_reset_pool(mongoc_client_t *client) ASSERT_CMPINT(error.code, ==, 91); ASSERT_CONTAINS(error.message, "failpoint"); - /* Verify that the pool has been cleared */ - ASSERT_CMPINT((conn_count + 1), ==, _connection_count(client, primary_id)); - /* Execute an insert into the test collection and verify it succeeds */ res = mongoc_collection_insert_one(coll, tmp_bson("{'test': 1}"), NULL, NULL, &error); ASSERT_OR_PRINT(res, error); + /* Expect the insert created a new connection. The error cleared the connection pool. */ + ASSERT_CMPINT((conn_count + 1), ==, _connection_count(client, st, primary_id)); + mongoc_read_prefs_destroy(read_prefs); mongoc_collection_destroy(coll); mongoc_database_destroy(db); @@ -362,7 +349,7 @@ test_shutdown_reset_pool_runner(void *ctx_void) } static void -test_interrupted_shutdown_reset_pool(mongoc_client_t *client) +test_interrupted_shutdown_reset_pool(mongoc_client_t *client, stream_tracker_t *st) { mongoc_database_t *db; mongoc_collection_t *coll; @@ -373,13 +360,13 @@ test_interrupted_shutdown_reset_pool(mongoc_client_t *client) uint32_t primary_id; ASSERT(client); + ASSERT(st); /* Configure fail points */ read_prefs = mongoc_read_prefs_new(MONGOC_READ_PRIMARY); db = mongoc_client_get_database(client, "admin"); /* Store the primary ID. After step down, the primary may be a different - * server. We must execute serverStatus against the same server to check - * connection counts. */ + * server. Check connection counts on the same server. */ primary_id = mongoc_topology_select_server_id(client->topology, MONGOC_SS_WRITE, TEST_SS_LOG_CONTEXT, @@ -388,7 +375,7 @@ test_interrupted_shutdown_reset_pool(mongoc_client_t *client) NULL /* deprioritized servers */, &error); ASSERT_OR_PRINT(primary_id, error); - conn_count = _connection_count(client, primary_id); + conn_count = _connection_count(client, st, primary_id); res = mongoc_database_command_simple(db, tmp_bson("{'configureFailPoint': 'failCommand', " "'mode': {'times': 1}, " @@ -406,13 +393,13 @@ test_interrupted_shutdown_reset_pool(mongoc_client_t *client) ASSERT_CMPINT(error.code, ==, 11600); ASSERT_CONTAINS(error.message, "failpoint"); - /* Verify that the pool has been cleared */ - ASSERT_CMPINT((conn_count + 1), ==, _connection_count(client, primary_id)); - /* Execute an insert into the test collection and verify it succeeds */ res = mongoc_collection_insert_one(coll, tmp_bson("{'test': 1}"), NULL, NULL, &error); ASSERT_OR_PRINT(res, error); + /* Expect the insert created a new connection. The error cleared the connection pool. */ + ASSERT_CMPINT((conn_count + 1), ==, _connection_count(client, st, primary_id)); + mongoc_read_prefs_destroy(read_prefs); mongoc_collection_destroy(coll); mongoc_database_destroy(db);