From b816997c07e6ae75b82e28bc0d3527bbc9b295ec Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Tue, 20 Dec 2022 19:12:02 +0100 Subject: [PATCH 1/7] Add rack-aware load balancing policy We need to prefer local rack as there are higher network costs when associated with nodes in remote rack. This is initial version, some things are still missing: * no tests yet * fallback to racks in local DC is not implemented yet (i.e. this version can connect to remote DC even if there is another rack in the local DC that we haven't tried yet) --- src/cluster.cpp | 3 + src/cluster.hpp | 4 + src/cluster_config.cpp | 27 +++ src/cluster_connector.cpp | 11 +- src/cluster_connector.hpp | 1 + src/cluster_metadata_resolver.hpp | 2 + src/dc_aware_policy.cpp | 2 +- src/dc_aware_policy.hpp | 2 +- src/execution_profile.hpp | 1 + src/latency_aware_policy.cpp | 4 +- src/latency_aware_policy.hpp | 2 +- src/list_policy.cpp | 4 +- src/list_policy.hpp | 2 +- src/load_balancing.hpp | 6 +- src/rack_aware_policy.cpp | 290 ++++++++++++++++++++++++++ src/rack_aware_policy.hpp | 129 ++++++++++++ src/request_processor.cpp | 4 +- src/request_processor.hpp | 3 +- src/request_processor_initializer.cpp | 5 +- src/request_processor_initializer.hpp | 4 +- src/round_robin_policy.cpp | 2 +- src/round_robin_policy.hpp | 2 +- src/session.cpp | 9 +- src/session.hpp | 2 +- src/token_aware_policy.cpp | 4 +- src/token_aware_policy.hpp | 2 +- 26 files changed, 498 insertions(+), 29 deletions(-) create mode 100644 src/rack_aware_policy.cpp create mode 100644 src/rack_aware_policy.hpp diff --git a/src/cluster.cpp b/src/cluster.cpp index a46f3ffc1..21ab3eafb 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -18,6 +18,7 @@ #include "constants.hpp" #include "dc_aware_policy.hpp" +#include "rack_aware_policy.hpp" #include "external.hpp" #include "logger.hpp" #include "resolver.hpp" @@ -240,6 +241,7 @@ Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* list const ControlConnectionSchema& schema, const LoadBalancingPolicy::Ptr& load_balancing_policy, const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc, + const String& local_rack, const StringMultimap& supported_options, const ClusterSettings& settings) : connection_(connection) , listener_(listener ? listener : &nop_cluster_listener__) @@ -251,6 +253,7 @@ Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* list , connected_host_(connected_host) , hosts_(hosts) , local_dc_(local_dc) + , local_rack_(local_rack) , supported_options_(supported_options) , is_recording_events_(settings.disable_events_on_startup) { static const auto optimized_msg = "===== Using optimized driver!!! =====\n"; diff --git a/src/cluster.hpp b/src/cluster.hpp index 4fe36ff5d..dec6043b6 100644 --- a/src/cluster.hpp +++ b/src/cluster.hpp @@ -257,6 +257,7 @@ class Cluster * determining the next control connection host. * @param load_balancing_policies * @param local_dc The local datacenter determined by the metadata service for initializing the + * @param local_rack The local rack determined by the metadata service for initializing the * load balancing policies. * @param supported_options Supported options discovered during control connection. * @param settings The control connection settings to use for reconnecting the @@ -267,6 +268,7 @@ class Cluster const ControlConnectionSchema& schema, const LoadBalancingPolicy::Ptr& load_balancing_policy, const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc, + const String& local_rack, const StringMultimap& supported_options, const ClusterSettings& settings); /** @@ -361,6 +363,7 @@ class Cluster const Host::Ptr& connected_host() const { return connected_host_; } const TokenMap::Ptr& token_map() const { return token_map_; } const String& local_dc() const { return local_dc_; } + const String& local_rack() const { return local_rack_; } const VersionNumber& dse_server_version() const { return connection_->dse_server_version(); } const StringMultimap& supported_options() const { return supported_options_; } const ShardPortCalculator* shard_port_calculator() const { return shard_port_calculator_.get(); } @@ -449,6 +452,7 @@ class Cluster PreparedMetadata prepared_metadata_; TokenMap::Ptr token_map_; String local_dc_; + String local_rack_; StringMultimap supported_options_; Timer timer_; bool is_recording_events_; diff --git a/src/cluster_config.cpp b/src/cluster_config.cpp index ab357ed52..932a6b2e9 100644 --- a/src/cluster_config.cpp +++ b/src/cluster_config.cpp @@ -300,7 +300,34 @@ CassError cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster, const c String(local_dc, local_dc_length), used_hosts_per_remote_dc, !allow_remote_dcs_for_local_cl)); return CASS_OK; } +/* +CassError cass_cluster_set_load_balance_rack_aware(CassCluster* cluster, const char* local_dc, + const char* local_rack, + unsigned used_hosts_per_remote_dc, + cass_bool_t allow_remote_dcs_for_local_cl) { + if (local_dc == NULL || local_rack == NULL) { + return CASS_ERROR_LIB_BAD_PARAMS; + } + return cass_cluster_set_load_balance_rack_aware_n(cluster, local_dc, SAFE_STRLEN(local_dc), + local_rack, SAFE_STRLEN(local_rack), + used_hosts_per_remote_dc, + allow_remote_dcs_for_local_cl); +} +CassError cass_cluster_set_load_balance_rack_aware_n(CassCluster* cluster, const char* local_dc, + size_t local_dc_length, + const char* local_rack, + size_t local_rack_length, + unsigned used_hosts_per_remote_dc, + cass_bool_t allow_remote_dcs_for_local_cl) { + if (local_dc == NULL || local_dc_length == 0 || local_rack == NULL || local_rack_length == 0) { + return CASS_ERROR_LIB_BAD_PARAMS; + } + cluster->config().set_load_balancing_policy(new RackAwarePolicy( + String(local_dc, local_dc_length), String(local_rack, local_rack_length), used_hosts_per_remote_dc, !allow_remote_dcs_for_local_cl)); + return CASS_OK; +} +*/ void cass_cluster_set_token_aware_routing(CassCluster* cluster, cass_bool_t enabled) { cluster->config().set_token_aware_routing(enabled == cass_true); } diff --git a/src/cluster_connector.cpp b/src/cluster_connector.cpp index e0415e151..eb959e17f 100644 --- a/src/cluster_connector.cpp +++ b/src/cluster_connector.cpp @@ -16,6 +16,7 @@ #include "cluster_connector.hpp" #include "dc_aware_policy.hpp" +#include "rack_aware_policy.hpp" #include "protocol.hpp" #include "random.hpp" #include "round_robin_policy.hpp" @@ -177,6 +178,7 @@ void ClusterConnector::on_resolve(ClusterMetadataResolver* resolver) { } local_dc_ = resolver->local_dc(); + local_rack_ = resolver->local_rack(); remaining_connector_count_ = resolved_contact_points.size(); for (AddressVec::const_iterator it = resolved_contact_points.begin(), end = resolved_contact_points.end(); @@ -231,7 +233,7 @@ void ClusterConnector::on_connect(ControlConnector* connector) { for (LoadBalancingPolicy::Vec::const_iterator it = policies.begin(), end = policies.end(); it != end; ++it) { LoadBalancingPolicy::Ptr policy(*it); - policy->init(connected_host, hosts, random_, local_dc_); + policy->init(connected_host, hosts, random_, local_dc_, local_rack_); policy->register_handles(event_loop_->loop()); } @@ -248,6 +250,11 @@ void ClusterConnector::on_connect(ControlConnector* connector) { message = "No hosts available for the control connection using the " "DC-aware load balancing policy. " "Check to see if the configured local datacenter is valid"; + } else if (dynamic_cast(query_plan.get()) != + NULL) { // Check if Rack-aware + message = "No hosts available for the control connection using the " + "Rack-aware load balancing policy. " + "Check to see if the configured local datacenter and rack is valid"; } else { message = "No hosts available for the control connection using the " "configured load balancing policy"; @@ -258,7 +265,7 @@ void ClusterConnector::on_connect(ControlConnector* connector) { cluster_.reset(new Cluster(connector->release_connection(), listener_, event_loop_, connected_host, hosts, connector->schema(), default_policy, policies, - local_dc_, connector->supported_options(), settings_)); + local_dc_, local_rack_, connector->supported_options(), settings_)); // Clear any connection errors and set the final negotiated protocol version. error_code_ = CLUSTER_OK; diff --git a/src/cluster_connector.hpp b/src/cluster_connector.hpp index e960fa058..8c8572322 100644 --- a/src/cluster_connector.hpp +++ b/src/cluster_connector.hpp @@ -169,6 +169,7 @@ class ClusterConnector : public RefCounted { Random* random_; Metrics* metrics_; String local_dc_; + String local_rack_; ClusterSettings settings_; Callback callback_; diff --git a/src/cluster_metadata_resolver.hpp b/src/cluster_metadata_resolver.hpp index 90e91acbd..bcca03bf3 100644 --- a/src/cluster_metadata_resolver.hpp +++ b/src/cluster_metadata_resolver.hpp @@ -48,6 +48,7 @@ class ClusterMetadataResolver : public RefCounted { const AddressVec& resolved_contact_points() const { return resolved_contact_points_; } const String& local_dc() const { return local_dc_; } + const String& local_rack() const { return local_rack_; } protected: virtual void internal_resolve(uv_loop_t* loop, const AddressVec& contact_points) = 0; @@ -57,6 +58,7 @@ class ClusterMetadataResolver : public RefCounted { protected: AddressVec resolved_contact_points_; String local_dc_; + String local_rack_; Callback callback_; }; diff --git a/src/dc_aware_policy.cpp b/src/dc_aware_policy.cpp index d68bec1a2..5c1550227 100644 --- a/src/dc_aware_policy.cpp +++ b/src/dc_aware_policy.cpp @@ -43,7 +43,7 @@ DCAwarePolicy::DCAwarePolicy(const String& local_dc, size_t used_hosts_per_remot DCAwarePolicy::~DCAwarePolicy() { uv_rwlock_destroy(&available_rwlock_); } void DCAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { if (local_dc_.empty()) { // Only override if no local DC was specified. local_dc_ = local_dc; } diff --git a/src/dc_aware_policy.hpp b/src/dc_aware_policy.hpp index f76b7307b..526338c29 100644 --- a/src/dc_aware_policy.hpp +++ b/src/dc_aware_policy.hpp @@ -37,7 +37,7 @@ class DCAwarePolicy : public LoadBalancingPolicy { ~DCAwarePolicy(); virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual CassHostDistance distance(const Host::Ptr& host) const; diff --git a/src/execution_profile.hpp b/src/execution_profile.hpp index 2b5645149..cb4d34f61 100644 --- a/src/execution_profile.hpp +++ b/src/execution_profile.hpp @@ -23,6 +23,7 @@ #include "cassandra.h" #include "constants.hpp" #include "dc_aware_policy.hpp" +#include "rack_aware_policy.hpp" #include "dense_hash_map.hpp" #include "latency_aware_policy.hpp" #include "speculative_execution.hpp" diff --git a/src/latency_aware_policy.cpp b/src/latency_aware_policy.cpp index 9f77a384f..29541bbb8 100644 --- a/src/latency_aware_policy.cpp +++ b/src/latency_aware_policy.cpp @@ -27,13 +27,13 @@ using namespace datastax::internal; using namespace datastax::internal::core; void LatencyAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { hosts_->reserve(hosts.size()); std::transform(hosts.begin(), hosts.end(), std::back_inserter(*hosts_), GetHost()); for (HostMap::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) { i->second->enable_latency_tracking(settings_.scale_ns, settings_.min_measured); } - ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc); + ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc, local_rack); } void LatencyAwarePolicy::register_handles(uv_loop_t* loop) { start_timer(loop); } diff --git a/src/latency_aware_policy.hpp b/src/latency_aware_policy.hpp index 178752a4a..c04430c1a 100644 --- a/src/latency_aware_policy.hpp +++ b/src/latency_aware_policy.hpp @@ -51,7 +51,7 @@ class LatencyAwarePolicy : public ChainedLoadBalancingPolicy { virtual ~LatencyAwarePolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual void register_handles(uv_loop_t* loop); virtual void close_handles(); diff --git a/src/list_policy.cpp b/src/list_policy.cpp index 7dc9357d4..fa38c838e 100644 --- a/src/list_policy.cpp +++ b/src/list_policy.cpp @@ -23,7 +23,7 @@ using namespace datastax::internal; using namespace datastax::internal::core; void ListPolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { HostMap valid_hosts; for (HostMap::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) { const Host::Ptr& host = i->second; @@ -36,7 +36,7 @@ void ListPolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Ran LOG_ERROR("No valid hosts available for list policy"); } - ChainedLoadBalancingPolicy::init(connected_host, valid_hosts, random, local_dc); + ChainedLoadBalancingPolicy::init(connected_host, valid_hosts, random, local_dc, local_rack); } CassHostDistance ListPolicy::distance(const Host::Ptr& host) const { diff --git a/src/list_policy.hpp b/src/list_policy.hpp index bda75f5f5..99b13eb22 100644 --- a/src/list_policy.hpp +++ b/src/list_policy.hpp @@ -31,7 +31,7 @@ class ListPolicy : public ChainedLoadBalancingPolicy { virtual ~ListPolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual CassHostDistance distance(const Host::Ptr& host) const; diff --git a/src/load_balancing.hpp b/src/load_balancing.hpp index ba60928a9..ff118ad4a 100644 --- a/src/load_balancing.hpp +++ b/src/load_balancing.hpp @@ -87,7 +87,7 @@ class LoadBalancingPolicy : public RefCounted { virtual ~LoadBalancingPolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) = 0; + const String& local_dc, const String &local_rack) = 0; virtual void register_handles(uv_loop_t* loop) {} virtual void close_handles() {} @@ -124,8 +124,8 @@ class ChainedLoadBalancingPolicy : public LoadBalancingPolicy { virtual ~ChainedLoadBalancingPolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { - return child_policy_->init(connected_host, hosts, random, local_dc); + const String& local_dc, const String& local_rack) { + return child_policy_->init(connected_host, hosts, random, local_dc, local_rack); } virtual const LoadBalancingPolicy::Ptr& child_policy() const { return child_policy_; } diff --git a/src/rack_aware_policy.cpp b/src/rack_aware_policy.cpp new file mode 100644 index 000000000..c3b992e00 --- /dev/null +++ b/src/rack_aware_policy.cpp @@ -0,0 +1,290 @@ +/* + Copyright (c) DataStax, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#include "rack_aware_policy.hpp" + +#include "logger.hpp" +#include "request_handler.hpp" +#include "scoped_lock.hpp" + +#include + +using namespace datastax; +using namespace datastax::internal; +using namespace datastax::internal::core; + +RackAwarePolicy::RackAwarePolicy(const String& local_dc, const String& local_rack, size_t used_hosts_per_remote_dc, + bool skip_remote_dcs_for_local_cl) + : local_dc_(local_dc) + , local_rack_(local_rack) + , used_hosts_per_remote_dc_(used_hosts_per_remote_dc) + , skip_remote_dcs_for_local_cl_(skip_remote_dcs_for_local_cl) + , local_dc_live_hosts_(new HostVec()) + , index_(0) { + uv_rwlock_init(&available_rwlock_); + if (used_hosts_per_remote_dc_ > 0 || !skip_remote_dcs_for_local_cl) { + LOG_WARN("Remote multi-dc settings have been deprecated and will be removed" + " in the next major release"); + } +} + +RackAwarePolicy::~RackAwarePolicy() { uv_rwlock_destroy(&available_rwlock_); } + +void RackAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, + const String& local_dc, const String& local_rack) { + if (local_dc_.empty()) { // Only override if no local DC was specified. + local_dc_ = local_dc; + } + + if (local_dc_.empty() && connected_host && !connected_host->dc().empty()) { + LOG_INFO("Using '%s' for the local data center " + "(if this is incorrect, please provide the correct data center)", + connected_host->dc().c_str()); + local_dc_ = connected_host->dc(); + } + + if (local_rack_.empty()) { // Only override if no local rack was specified. + local_rack_ = local_rack; + } + + if (local_rack_.empty() && connected_host && !connected_host->rack().empty()) { + LOG_INFO("Using '%s' for the local rack " + "(if this is incorrect, please provide the correct rack)", + connected_host->rack().c_str()); + local_rack_ = connected_host->rack(); + } + + available_.resize(hosts.size()); + std::transform(hosts.begin(), hosts.end(), std::inserter(available_, available_.begin()), + GetAddress()); + + for (HostMap::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) { + on_host_added(i->second); + } + if (random != NULL) { + index_ = random->next(std::max(static_cast(1), hosts.size())); + } +} + +CassHostDistance RackAwarePolicy::distance(const Host::Ptr& host) const { + if (local_dc_.empty() || local_rack_.empty() || (host->dc() == local_dc_ && host->rack() == local_rack_)) { + return CASS_HOST_DISTANCE_LOCAL; + } + + const CopyOnWriteHostVec& hosts = per_remote_dc_live_hosts_.get_hosts(host->rack()); + size_t num_hosts = std::min(hosts->size(), used_hosts_per_remote_dc_); + for (size_t i = 0; i < num_hosts; ++i) { + if ((*hosts)[i]->address() == host->address()) { + return CASS_HOST_DISTANCE_REMOTE; + } + } + + return CASS_HOST_DISTANCE_IGNORE; +} + +QueryPlan* RackAwarePolicy::new_query_plan(const String& keyspace, RequestHandler* request_handler, + const TokenMap* token_map) { + CassConsistency cl = + request_handler != NULL ? request_handler->consistency() : CASS_DEFAULT_CONSISTENCY; + return new RackAwareQueryPlan(this, cl, index_++); +} + +bool RackAwarePolicy::is_host_up(const Address& address) const { + ScopedReadLock rl(&available_rwlock_); + return available_.count(address) > 0; +} + +void RackAwarePolicy::on_host_added(const Host::Ptr& host) { + const String& dc = host->dc(); + const String& rack = host->rack(); + if (local_dc_.empty() && !dc.empty()) { + LOG_INFO("Using '%s' for local data center " + "(if this is incorrect, please provide the correct data center)", + host->dc().c_str()); + local_dc_ = dc; + } + if (local_rack_.empty() && !rack.empty()) { + LOG_INFO("Using '%s' for local data center " + "(if this is incorrect, please provide the correct data center)", + host->rack().c_str()); + local_rack_ = rack; + } + + if (dc == local_dc_ && rack == local_rack_) { + add_host(local_dc_live_hosts_, host); + } else { + per_remote_dc_live_hosts_.add_host_to_dc(rack, host); + } +} + +void RackAwarePolicy::on_host_removed(const Host::Ptr& host) { + const String& dc = host->dc(); + const String& rack = host->rack(); + if (dc == local_dc_ && rack == local_rack_) { + remove_host(local_dc_live_hosts_, host); + } else { + per_remote_dc_live_hosts_.remove_host_from_dc(host->rack(), host); + } + + ScopedWriteLock wl(&available_rwlock_); + available_.erase(host->address()); +} + +void RackAwarePolicy::on_host_up(const Host::Ptr& host) { + on_host_added(host); + + ScopedWriteLock wl(&available_rwlock_); + available_.insert(host->address()); +} + +void RackAwarePolicy::on_host_down(const Address& address) { + if (!remove_host(local_dc_live_hosts_, address) && + !per_remote_dc_live_hosts_.remove_host(address)) { + LOG_DEBUG("Attempted to mark host %s as DOWN, but it doesn't exist", + address.to_string().c_str()); + } + + ScopedWriteLock wl(&available_rwlock_); + available_.erase(address); +} + +bool RackAwarePolicy::skip_remote_dcs_for_local_cl() const { + ScopedReadLock rl(&available_rwlock_); + return skip_remote_dcs_for_local_cl_; +} + +size_t RackAwarePolicy::used_hosts_per_remote_dc() const { + ScopedReadLock rl(&available_rwlock_); + return used_hosts_per_remote_dc_; +} + +const String& RackAwarePolicy::local_dc() const { + ScopedReadLock rl(&available_rwlock_); + return local_dc_; +} + +const String& RackAwarePolicy::local_rack() const { + ScopedReadLock rl(&available_rwlock_); + return local_rack_; +} + +void RackAwarePolicy::PerDCHostMap::add_host_to_dc(const String& dc, const Host::Ptr& host) { + ScopedWriteLock wl(&rwlock_); + Map::iterator i = map_.find(dc); + if (i == map_.end()) { + CopyOnWriteHostVec hosts(new HostVec()); + hosts->push_back(host); + map_.insert(Map::value_type(dc, hosts)); + } else { + add_host(i->second, host); + } +} + +void RackAwarePolicy::PerDCHostMap::remove_host_from_dc(const String& dc, const Host::Ptr& host) { + ScopedWriteLock wl(&rwlock_); + Map::iterator i = map_.find(dc); + if (i != map_.end()) { + core::remove_host(i->second, host); + } +} + +bool RackAwarePolicy::PerDCHostMap::remove_host(const Address& address) { + ScopedWriteLock wl(&rwlock_); + for (Map::iterator i = map_.begin(), end = map_.end(); i != end; ++i) { + if (core::remove_host(i->second, address)) { + return true; + } + } + return false; +} + +const CopyOnWriteHostVec& RackAwarePolicy::PerDCHostMap::get_hosts(const String& dc) const { + ScopedReadLock rl(&rwlock_); + Map::const_iterator i = map_.find(dc); + if (i == map_.end()) return no_hosts_; + + return i->second; +} + +void RackAwarePolicy::PerDCHostMap::copy_dcs(KeySet* dcs) const { + ScopedReadLock rl(&rwlock_); + for (Map::const_iterator i = map_.begin(), end = map_.end(); i != end; ++i) { + dcs->insert(i->first); + } +} + +// Helper functions to prevent copy (Notice: "const CopyOnWriteHostVec&") + +static const Host::Ptr& get_next_host(const CopyOnWriteHostVec& hosts, size_t index) { + return (*hosts)[index % hosts->size()]; +} + +static const Host::Ptr& get_next_host_bounded(const CopyOnWriteHostVec& hosts, size_t index, + size_t bound) { + return (*hosts)[index % std::min(hosts->size(), bound)]; +} + +static size_t get_hosts_size(const CopyOnWriteHostVec& hosts) { return hosts->size(); } + +RackAwarePolicy::RackAwareQueryPlan::RackAwareQueryPlan(const RackAwarePolicy* policy, CassConsistency cl, + size_t start_index) + : policy_(policy) + , cl_(cl) + , hosts_(policy_->local_dc_live_hosts_) + , local_remaining_(get_hosts_size(hosts_)) + , remote_remaining_(0) + , index_(start_index) {} + +Host::Ptr RackAwarePolicy::RackAwareQueryPlan::compute_next() { + while (local_remaining_ > 0) { + --local_remaining_; + const Host::Ptr& host(get_next_host(hosts_, index_++)); + if (policy_->is_host_up(host->address())) { + return host; + } + } + + if (policy_->skip_remote_dcs_for_local_cl_ && is_dc_local(cl_)) { + return Host::Ptr(); + } + + if (!remote_dcs_) { + remote_dcs_.reset(new PerDCHostMap::KeySet()); + policy_->per_remote_dc_live_hosts_.copy_dcs(remote_dcs_.get()); + } + + while (true) { + while (remote_remaining_ > 0) { + --remote_remaining_; + const Host::Ptr& host( + get_next_host_bounded(hosts_, index_++, policy_->used_hosts_per_remote_dc_)); + if (policy_->is_host_up(host->address())) { + return host; + } + } + + if (remote_dcs_->empty()) { + break; + } + + PerDCHostMap::KeySet::iterator i = remote_dcs_->begin(); + hosts_ = policy_->per_remote_dc_live_hosts_.get_hosts(*i); + remote_remaining_ = std::min(get_hosts_size(hosts_), policy_->used_hosts_per_remote_dc_); + remote_dcs_->erase(i); + } + + return Host::Ptr(); +} diff --git a/src/rack_aware_policy.hpp b/src/rack_aware_policy.hpp new file mode 100644 index 000000000..adbe138f2 --- /dev/null +++ b/src/rack_aware_policy.hpp @@ -0,0 +1,129 @@ +/* + Copyright (c) DataStax, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#ifndef DATASTAX_INTERNAL_RACK_AWARE_POLICY_HPP +#define DATASTAX_INTERNAL_RACK_AWARE_POLICY_HPP + +#include "host.hpp" +#include "load_balancing.hpp" +#include "map.hpp" +#include "round_robin_policy.hpp" +#include "scoped_lock.hpp" +#include "scoped_ptr.hpp" +#include "set.hpp" + +#include + +namespace datastax { namespace internal { namespace core { + +class RackAwarePolicy : public LoadBalancingPolicy { +public: + RackAwarePolicy(const String& local_dc = "", const String &local_rack = "", size_t used_hosts_per_remote_dc = 0, + bool skip_remote_dcs_for_local_cl = true); + + ~RackAwarePolicy(); + + virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, + const String& local_dc, const String& local_rack); + + virtual CassHostDistance distance(const Host::Ptr& host) const; + + virtual QueryPlan* new_query_plan(const String& keyspace, RequestHandler* request_handler, + const TokenMap* token_map); + + virtual bool is_host_up(const Address& address) const; + + virtual void on_host_added(const Host::Ptr& host); + virtual void on_host_removed(const Host::Ptr& host); + virtual void on_host_up(const Host::Ptr& host); + virtual void on_host_down(const Address& address); + + virtual bool skip_remote_dcs_for_local_cl() const; + virtual size_t used_hosts_per_remote_dc() const; + virtual const String& local_dc() const; + virtual const String& local_rack() const; + + virtual LoadBalancingPolicy* new_instance() { + return new RackAwarePolicy(local_dc_, local_rack_, used_hosts_per_remote_dc_, skip_remote_dcs_for_local_cl_); + } + +private: + class PerDCHostMap { + public: + typedef internal::Map Map; + typedef Set KeySet; + + PerDCHostMap() + : no_hosts_(new HostVec()) { + uv_rwlock_init(&rwlock_); + } + ~PerDCHostMap() { uv_rwlock_destroy(&rwlock_); } + + void add_host_to_dc(const String& dc, const Host::Ptr& host); + void remove_host_from_dc(const String& dc, const Host::Ptr& host); + bool remove_host(const Address& address); + const CopyOnWriteHostVec& get_hosts(const String& dc) const; + void copy_dcs(KeySet* dcs) const; + + private: + Map map_; + mutable uv_rwlock_t rwlock_; + const CopyOnWriteHostVec no_hosts_; + + private: + DISALLOW_COPY_AND_ASSIGN(PerDCHostMap); + }; + + const CopyOnWriteHostVec& get_local_dc_hosts() const; + void get_remote_dcs(PerDCHostMap::KeySet* remote_dcs) const; + +public: + class RackAwareQueryPlan : public QueryPlan { + public: + RackAwareQueryPlan(const RackAwarePolicy* policy, CassConsistency cl, size_t start_index); + + virtual Host::Ptr compute_next(); + + private: + const RackAwarePolicy* policy_; + CassConsistency cl_; + CopyOnWriteHostVec hosts_; + ScopedPtr remote_dcs_; + size_t local_remaining_; + size_t remote_remaining_; + size_t index_; + }; + +private: + mutable uv_rwlock_t available_rwlock_; + AddressSet available_; + + String local_dc_; + String local_rack_; + size_t used_hosts_per_remote_dc_; + bool skip_remote_dcs_for_local_cl_; + + CopyOnWriteHostVec local_dc_live_hosts_; + PerDCHostMap per_remote_dc_live_hosts_; + size_t index_; + +private: + DISALLOW_COPY_AND_ASSIGN(RackAwarePolicy); +}; + +}}} // namespace datastax::internal::core + +#endif diff --git a/src/request_processor.cpp b/src/request_processor.cpp index d02ad9e7f..bda526df4 100644 --- a/src/request_processor.cpp +++ b/src/request_processor.cpp @@ -170,7 +170,7 @@ RequestProcessor::RequestProcessor(RequestProcessorListener* listener, EventLoop const Host::Ptr& connected_host, const HostMap& hosts, const TokenMap::Ptr& token_map, const RequestProcessorSettings& settings, Random* random, - const String& local_dc) + const String& local_dc, const String& local_rack) : connection_pool_manager_(connection_pool_manager) , listener_(listener ? listener : &nop_request_processor_listener__) , event_loop_(event_loop) @@ -213,7 +213,7 @@ RequestProcessor::RequestProcessor(RequestProcessorListener* listener, EventLoop LoadBalancingPolicy::Vec policies = load_balancing_policies(); for (LoadBalancingPolicy::Vec::const_iterator it = policies.begin(); it != policies.end(); ++it) { // Initialize the load balancing policies - (*it)->init(connected_host, hosts, random, local_dc); + (*it)->init(connected_host, hosts, random, local_dc, local_rack); (*it)->register_handles(event_loop_->loop()); } diff --git a/src/request_processor.hpp b/src/request_processor.hpp index 67b0bf47c..253ac4828 100644 --- a/src/request_processor.hpp +++ b/src/request_processor.hpp @@ -166,12 +166,13 @@ class RequestProcessor * @param settings The current settings for the request processor. * @param random A RNG for randomizing hosts in the load balancing policies. * @param local_dc The local datacenter for initializing the load balancing policies. + * @param local_rack The local rack for initializing the load balancing policies. */ RequestProcessor(RequestProcessorListener* listener, EventLoop* event_loop, const ConnectionPoolManager::Ptr& connection_pool_manager, const Host::Ptr& connected_host, const HostMap& hosts, const TokenMap::Ptr& token_map, const RequestProcessorSettings& settings, - Random* random, const String& local_dc); + Random* random, const String& local_dc, const String& local_rack); /** * Close/Terminate the request request processor (thread-safe). diff --git a/src/request_processor_initializer.cpp b/src/request_processor_initializer.cpp index 6705b72e9..24d1a6d48 100644 --- a/src/request_processor_initializer.cpp +++ b/src/request_processor_initializer.cpp @@ -40,7 +40,7 @@ class RunInitializeProcessor : public Task { RequestProcessorInitializer::RequestProcessorInitializer( const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, - const TokenMap::Ptr& token_map, const String& local_dc, const Callback& callback) + const TokenMap::Ptr& token_map, const String& local_dc, const String& local_rack, const Callback& callback) : event_loop_(NULL) , listener_(NULL) , metrics_(NULL) @@ -51,6 +51,7 @@ RequestProcessorInitializer::RequestProcessorInitializer( , hosts_(hosts) , token_map_(token_map) , local_dc_(local_dc) + , local_rack_(local_rack) , error_code_(REQUEST_PROCESSOR_OK) , callback_(callback) { uv_mutex_init(&mutex_); @@ -166,7 +167,7 @@ void RequestProcessorInitializer::on_initialize(ConnectionPoolManagerInitializer } else { processor_.reset(new RequestProcessor(listener_, event_loop_, initializer->release_manager(), connected_host_, hosts_, token_map_, settings_, random_, - local_dc_)); + local_dc_, local_rack_)); int rc = processor_->init(RequestProcessor::Protected()); if (rc != 0) { diff --git a/src/request_processor_initializer.hpp b/src/request_processor_initializer.hpp index 8d63380d9..b685e5dd2 100644 --- a/src/request_processor_initializer.hpp +++ b/src/request_processor_initializer.hpp @@ -60,12 +60,13 @@ class RequestProcessorInitializer * @param hosts A mapping of available hosts in the cluster. * @param token_map A token map. * @param local_dc The local datacenter for initializing the load balancing policies. + * @param local_rack The local datacenter for initializing the load balancing policies. * @param callback A callback that is called when the processor is initialized * or if an error occurred. */ RequestProcessorInitializer(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc, const Callback& callback); + const String& local_dc, const String& local_rack, const Callback& callback); ~RequestProcessorInitializer(); /** @@ -176,6 +177,7 @@ class RequestProcessorInitializer HostMap hosts_; const TokenMap::Ptr token_map_; String local_dc_; + String local_rack_; RequestProcessorError error_code_; String error_message_; diff --git a/src/round_robin_policy.cpp b/src/round_robin_policy.cpp index dd7f2ecff..4c8ac62d7 100644 --- a/src/round_robin_policy.cpp +++ b/src/round_robin_policy.cpp @@ -33,7 +33,7 @@ RoundRobinPolicy::RoundRobinPolicy() RoundRobinPolicy::~RoundRobinPolicy() { uv_rwlock_destroy(&available_rwlock_); } void RoundRobinPolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { available_.resize(hosts.size()); std::transform(hosts.begin(), hosts.end(), std::inserter(available_, available_.begin()), GetAddress()); diff --git a/src/round_robin_policy.hpp b/src/round_robin_policy.hpp index f5b4f715d..aebc62deb 100644 --- a/src/round_robin_policy.hpp +++ b/src/round_robin_policy.hpp @@ -31,7 +31,7 @@ class RoundRobinPolicy : public LoadBalancingPolicy { ~RoundRobinPolicy(); virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual CassHostDistance distance(const Host::Ptr& host) const; diff --git a/src/session.cpp b/src/session.cpp index 34de94c13..9d725da46 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -195,13 +195,14 @@ class SessionInitializer : public RefCounted { SessionInitializer() { uv_mutex_destroy(&mutex_); } void initialize(const Host::Ptr& connected_host, ProtocolVersion protocol_version, - const HostMap& hosts, const TokenMap::Ptr& token_map, const String& local_dc) { + const HostMap& hosts, const TokenMap::Ptr& token_map, const String& local_dc, + const String& local_rack) { inc_ref(); const size_t thread_count_io = remaining_ = session_->config().thread_count_io(); for (size_t i = 0; i < thread_count_io; ++i) { RequestProcessorInitializer::Ptr initializer(new RequestProcessorInitializer( - connected_host, protocol_version, hosts, token_map, local_dc, + connected_host, protocol_version, hosts, token_map, local_dc, local_rack, bind_callback(&SessionInitializer::on_initialize, this))); RequestProcessorSettings settings(session_->config()); @@ -360,7 +361,7 @@ void Session::join() { void Session::on_connect(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc) { + const String& local_dc, const String& local_rack) { int rc = 0; if (hosts.empty()) { @@ -394,7 +395,7 @@ void Session::on_connect(const Host::Ptr& connected_host, ProtocolVersion protoc request_processor_count_ = 0; is_closing_ = false; SessionInitializer::Ptr initializer(new SessionInitializer(this)); - initializer->initialize(connected_host, protocol_version, hosts, token_map, local_dc); + initializer->initialize(connected_host, protocol_version, hosts, token_map, local_dc, local_rack); } void Session::on_close() { diff --git a/src/session.hpp b/src/session.hpp index 856833114..45efc5e89 100644 --- a/src/session.hpp +++ b/src/session.hpp @@ -54,7 +54,7 @@ class Session virtual void on_connect(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual void on_close(); diff --git a/src/token_aware_policy.cpp b/src/token_aware_policy.cpp index cc2cd8394..82805af43 100644 --- a/src/token_aware_policy.cpp +++ b/src/token_aware_policy.cpp @@ -37,7 +37,7 @@ static inline bool contains(const CopyOnWriteHostVec& replicas, const Address& a } void TokenAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc) { + const String& local_dc, const String& local_rack) { if (random != NULL) { if (shuffle_replicas_) { // Store random so that it can be used to shuffle replicas. @@ -48,7 +48,7 @@ void TokenAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& host index_ = random->next(std::max(static_cast(1), hosts.size())); } } - ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc); + ChainedLoadBalancingPolicy::init(connected_host, hosts, random, local_dc, local_rack); } QueryPlan* TokenAwarePolicy::new_query_plan(const String& keyspace, RequestHandler* request_handler, diff --git a/src/token_aware_policy.hpp b/src/token_aware_policy.hpp index 5a8cee903..54fc5dc48 100644 --- a/src/token_aware_policy.hpp +++ b/src/token_aware_policy.hpp @@ -35,7 +35,7 @@ class TokenAwarePolicy : public ChainedLoadBalancingPolicy { virtual ~TokenAwarePolicy() {} virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random, - const String& local_dc); + const String& local_dc, const String& local_rack); virtual QueryPlan* new_query_plan(const String& keyspace, RequestHandler* request_handler, const TokenMap* token_map); From 903db61090d0a3994d7a1165784eb4a6b2920675 Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Wed, 21 Dec 2022 11:16:39 +0100 Subject: [PATCH 2/7] Export API to enable rack-awareness I forgot to uncomment this when debugging build yesteday. --- include/cassandra.h | 67 ++++++++++++++++++++++++++++++++++++++++++ src/cluster_config.cpp | 4 +-- 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/include/cassandra.h b/include/cassandra.h index 49666cf20..c5288903f 100644 --- a/include/cassandra.h +++ b/include/cassandra.h @@ -2213,6 +2213,73 @@ cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster, unsigned used_hosts_per_remote_dc, cass_bool_t allow_remote_dcs_for_local_cl); +/** + * Configures the cluster to use Rack-aware load balancing. + * For each query, all live nodes in a primary 'local' rack are tried first, + * followed by nodes from local DC and then any node from other DCs. + * + * Note: This is the default, and does not need to be called unless + * switching an existing from another policy or changing settings. + * Without further configuration, a default local_dc and local_rack + * is chosen from the first connected contact point, + * and no remote hosts are considered in query plans. + * If relying on this mechanism, be sure to use only contact + * points from the local DC. + * + * @deprecated The remote DC settings for DC-aware are not suitable for most + * scenarios that require DC failover. There is also unhandled gap between + * replication factor number of nodes failing and the full cluster failing. Only + * the remote DC settings are being deprecated. + * + * @public @memberof CassCluster + * + * @param[in] cluster + * @param[in] local_dc The primary data center to try first + * @param[in] local_rack The primary rack to try first + * @param[in] used_hosts_per_remote_dc The number of hosts used in each remote + * DC if no hosts are available in the local dc (deprecated) + * @param[in] allow_remote_dcs_for_local_cl Allows remote hosts to be used if no + * local dc hosts are available and the consistency level is LOCAL_ONE or + * LOCAL_QUORUM (deprecated) + * @return CASS_OK if successful, otherwise an error occurred + */ +CASS_EXPORT CassError +cass_cluster_set_load_balance_rack_aware(CassCluster* cluster, + const char* local_dc, + const char* local_rack, + unsigned used_hosts_per_remote_dc, + cass_bool_t allow_remote_dcs_for_local_cl); + + +/** + * Same as cass_cluster_set_load_balance_rack_aware(), but with lengths for string + * parameters. + * + * @deprecated The remote DC settings for DC-aware are not suitable for most + * scenarios that require DC failover. There is also unhandled gap between + * replication factor number of nodes failing and the full cluster failing. Only + * the remote DC settings are being deprecated. + * + * @public @memberof CassCluster + * + * @param[in] cluster + * @param[in] local_dc + * @param[in] local_dc_length + * @param[in] used_hosts_per_remote_dc (deprecated) + * @param[in] allow_remote_dcs_for_local_cl (deprecated) + * @return same as cass_cluster_set_load_balance_dc_aware() + * + * @see cass_cluster_set_load_balance_dc_aware() + */ +CASS_EXPORT CassError +cass_cluster_set_load_balance_rack_aware_n(CassCluster* cluster, + const char* local_dc, + size_t local_dc_length, + const char* local_rack, + size_t local_rack_length, + unsigned used_hosts_per_remote_dc, + cass_bool_t allow_remote_dcs_for_local_cl); + /** * Configures the cluster to use token-aware request routing or not. * diff --git a/src/cluster_config.cpp b/src/cluster_config.cpp index 932a6b2e9..81db785a3 100644 --- a/src/cluster_config.cpp +++ b/src/cluster_config.cpp @@ -300,7 +300,7 @@ CassError cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster, const c String(local_dc, local_dc_length), used_hosts_per_remote_dc, !allow_remote_dcs_for_local_cl)); return CASS_OK; } -/* + CassError cass_cluster_set_load_balance_rack_aware(CassCluster* cluster, const char* local_dc, const char* local_rack, unsigned used_hosts_per_remote_dc, @@ -327,7 +327,7 @@ CassError cass_cluster_set_load_balance_rack_aware_n(CassCluster* cluster, const String(local_dc, local_dc_length), String(local_rack, local_rack_length), used_hosts_per_remote_dc, !allow_remote_dcs_for_local_cl)); return CASS_OK; } -*/ + void cass_cluster_set_token_aware_routing(CassCluster* cluster, cass_bool_t enabled) { cluster->config().set_token_aware_routing(enabled == cass_true); } From 902fdea4847f23054081995fcb0126da7ff70963 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20Navr=C3=A1til?= Date: Wed, 21 Dec 2022 13:14:27 +0100 Subject: [PATCH 3/7] Add inflight request count getter --- include/cassandra.h | 10 ++++++++++ src/session.cpp | 10 ++++++++++ 2 files changed, 20 insertions(+) diff --git a/include/cassandra.h b/include/cassandra.h index c5288903f..5d5723a1f 100644 --- a/include/cassandra.h +++ b/include/cassandra.h @@ -3242,6 +3242,16 @@ CASS_EXPORT void cass_session_get_speculative_execution_metrics(const CassSession* session, CassSpeculativeExecutionMetrics* output); +/** + * Gets the current count of inflight request to all hosts. + * + * @public @memberof CassSession + * + * @param[in] session + */ +CASS_EXPORT cass_uint64_t +cass_session_get_inflight_request_count(const CassSession* session); + /** * Get the client id. * diff --git a/src/session.cpp b/src/session.cpp index 9d725da46..e2a544f28 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -169,6 +169,16 @@ void cass_session_get_speculative_execution_metrics(const CassSession* session, CassUuid cass_session_get_client_id(CassSession* session) { return session->client_id(); } +cass_uint64_t cass_session_get_inflight_request_count(const CassSession* session) { + cass_uint64_t inflight_request_count = 0; + const HostMap hosts = session->cluster()->available_hosts(); + for (HostMap::const_iterator it = hosts.begin(), end = hosts.end(); it != end; ++it) { + const Host::Ptr& host = it->second; + inflight_request_count += host->inflight_request_count(); + } + return inflight_request_count; +} + } // extern "C" static inline bool least_busy_comp(const RequestProcessor::Ptr& a, const RequestProcessor::Ptr& b) { From a933412da46cec69a43e63716607a2da5fb48c81 Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Wed, 21 Dec 2022 12:58:55 +0100 Subject: [PATCH 4/7] Remove deprecated options --- include/cassandra.h | 25 ++----------------------- src/cluster_config.cpp | 14 ++++---------- src/rack_aware_policy.cpp | 33 ++++++--------------------------- src/rack_aware_policy.hpp | 9 ++------- 4 files changed, 14 insertions(+), 67 deletions(-) diff --git a/include/cassandra.h b/include/cassandra.h index 5d5723a1f..54b253ed1 100644 --- a/include/cassandra.h +++ b/include/cassandra.h @@ -2226,47 +2226,28 @@ cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster, * If relying on this mechanism, be sure to use only contact * points from the local DC. * - * @deprecated The remote DC settings for DC-aware are not suitable for most - * scenarios that require DC failover. There is also unhandled gap between - * replication factor number of nodes failing and the full cluster failing. Only - * the remote DC settings are being deprecated. - * * @public @memberof CassCluster * * @param[in] cluster * @param[in] local_dc The primary data center to try first * @param[in] local_rack The primary rack to try first - * @param[in] used_hosts_per_remote_dc The number of hosts used in each remote - * DC if no hosts are available in the local dc (deprecated) - * @param[in] allow_remote_dcs_for_local_cl Allows remote hosts to be used if no - * local dc hosts are available and the consistency level is LOCAL_ONE or - * LOCAL_QUORUM (deprecated) * @return CASS_OK if successful, otherwise an error occurred */ CASS_EXPORT CassError cass_cluster_set_load_balance_rack_aware(CassCluster* cluster, const char* local_dc, - const char* local_rack, - unsigned used_hosts_per_remote_dc, - cass_bool_t allow_remote_dcs_for_local_cl); + const char* local_rack); /** * Same as cass_cluster_set_load_balance_rack_aware(), but with lengths for string * parameters. * - * @deprecated The remote DC settings for DC-aware are not suitable for most - * scenarios that require DC failover. There is also unhandled gap between - * replication factor number of nodes failing and the full cluster failing. Only - * the remote DC settings are being deprecated. - * * @public @memberof CassCluster * * @param[in] cluster * @param[in] local_dc * @param[in] local_dc_length - * @param[in] used_hosts_per_remote_dc (deprecated) - * @param[in] allow_remote_dcs_for_local_cl (deprecated) * @return same as cass_cluster_set_load_balance_dc_aware() * * @see cass_cluster_set_load_balance_dc_aware() @@ -2276,9 +2257,7 @@ cass_cluster_set_load_balance_rack_aware_n(CassCluster* cluster, const char* local_dc, size_t local_dc_length, const char* local_rack, - size_t local_rack_length, - unsigned used_hosts_per_remote_dc, - cass_bool_t allow_remote_dcs_for_local_cl); + size_t local_rack_length); /** * Configures the cluster to use token-aware request routing or not. diff --git a/src/cluster_config.cpp b/src/cluster_config.cpp index 81db785a3..e536515b4 100644 --- a/src/cluster_config.cpp +++ b/src/cluster_config.cpp @@ -302,29 +302,23 @@ CassError cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster, const c } CassError cass_cluster_set_load_balance_rack_aware(CassCluster* cluster, const char* local_dc, - const char* local_rack, - unsigned used_hosts_per_remote_dc, - cass_bool_t allow_remote_dcs_for_local_cl) { + const char* local_rack) { if (local_dc == NULL || local_rack == NULL) { return CASS_ERROR_LIB_BAD_PARAMS; } return cass_cluster_set_load_balance_rack_aware_n(cluster, local_dc, SAFE_STRLEN(local_dc), - local_rack, SAFE_STRLEN(local_rack), - used_hosts_per_remote_dc, - allow_remote_dcs_for_local_cl); + local_rack, SAFE_STRLEN(local_rack)); } CassError cass_cluster_set_load_balance_rack_aware_n(CassCluster* cluster, const char* local_dc, size_t local_dc_length, const char* local_rack, - size_t local_rack_length, - unsigned used_hosts_per_remote_dc, - cass_bool_t allow_remote_dcs_for_local_cl) { + size_t local_rack_length) { if (local_dc == NULL || local_dc_length == 0 || local_rack == NULL || local_rack_length == 0) { return CASS_ERROR_LIB_BAD_PARAMS; } cluster->config().set_load_balancing_policy(new RackAwarePolicy( - String(local_dc, local_dc_length), String(local_rack, local_rack_length), used_hosts_per_remote_dc, !allow_remote_dcs_for_local_cl)); + String(local_dc, local_dc_length), String(local_rack, local_rack_length))); return CASS_OK; } diff --git a/src/rack_aware_policy.cpp b/src/rack_aware_policy.cpp index c3b992e00..00475fc93 100644 --- a/src/rack_aware_policy.cpp +++ b/src/rack_aware_policy.cpp @@ -26,19 +26,12 @@ using namespace datastax; using namespace datastax::internal; using namespace datastax::internal::core; -RackAwarePolicy::RackAwarePolicy(const String& local_dc, const String& local_rack, size_t used_hosts_per_remote_dc, - bool skip_remote_dcs_for_local_cl) +RackAwarePolicy::RackAwarePolicy(const String& local_dc, const String& local_rack) : local_dc_(local_dc) , local_rack_(local_rack) - , used_hosts_per_remote_dc_(used_hosts_per_remote_dc) - , skip_remote_dcs_for_local_cl_(skip_remote_dcs_for_local_cl) , local_dc_live_hosts_(new HostVec()) , index_(0) { uv_rwlock_init(&available_rwlock_); - if (used_hosts_per_remote_dc_ > 0 || !skip_remote_dcs_for_local_cl) { - LOG_WARN("Remote multi-dc settings have been deprecated and will be removed" - " in the next major release"); - } } RackAwarePolicy::~RackAwarePolicy() { uv_rwlock_destroy(&available_rwlock_); } @@ -85,7 +78,7 @@ CassHostDistance RackAwarePolicy::distance(const Host::Ptr& host) const { } const CopyOnWriteHostVec& hosts = per_remote_dc_live_hosts_.get_hosts(host->rack()); - size_t num_hosts = std::min(hosts->size(), used_hosts_per_remote_dc_); + size_t num_hosts = hosts->size(); for (size_t i = 0; i < num_hosts; ++i) { if ((*hosts)[i]->address() == host->address()) { return CASS_HOST_DISTANCE_REMOTE; @@ -161,16 +154,6 @@ void RackAwarePolicy::on_host_down(const Address& address) { available_.erase(address); } -bool RackAwarePolicy::skip_remote_dcs_for_local_cl() const { - ScopedReadLock rl(&available_rwlock_); - return skip_remote_dcs_for_local_cl_; -} - -size_t RackAwarePolicy::used_hosts_per_remote_dc() const { - ScopedReadLock rl(&available_rwlock_); - return used_hosts_per_remote_dc_; -} - const String& RackAwarePolicy::local_dc() const { ScopedReadLock rl(&available_rwlock_); return local_dc_; @@ -232,11 +215,6 @@ static const Host::Ptr& get_next_host(const CopyOnWriteHostVec& hosts, size_t in return (*hosts)[index % hosts->size()]; } -static const Host::Ptr& get_next_host_bounded(const CopyOnWriteHostVec& hosts, size_t index, - size_t bound) { - return (*hosts)[index % std::min(hosts->size(), bound)]; -} - static size_t get_hosts_size(const CopyOnWriteHostVec& hosts) { return hosts->size(); } RackAwarePolicy::RackAwareQueryPlan::RackAwareQueryPlan(const RackAwarePolicy* policy, CassConsistency cl, @@ -257,7 +235,8 @@ Host::Ptr RackAwarePolicy::RackAwareQueryPlan::compute_next() { } } - if (policy_->skip_remote_dcs_for_local_cl_ && is_dc_local(cl_)) { + // Skip remote DCs for LOCAL_ consistency levels. + if (is_dc_local(cl_)) { return Host::Ptr(); } @@ -270,7 +249,7 @@ Host::Ptr RackAwarePolicy::RackAwareQueryPlan::compute_next() { while (remote_remaining_ > 0) { --remote_remaining_; const Host::Ptr& host( - get_next_host_bounded(hosts_, index_++, policy_->used_hosts_per_remote_dc_)); + get_next_host(hosts_, index_++)); if (policy_->is_host_up(host->address())) { return host; } @@ -282,7 +261,7 @@ Host::Ptr RackAwarePolicy::RackAwareQueryPlan::compute_next() { PerDCHostMap::KeySet::iterator i = remote_dcs_->begin(); hosts_ = policy_->per_remote_dc_live_hosts_.get_hosts(*i); - remote_remaining_ = std::min(get_hosts_size(hosts_), policy_->used_hosts_per_remote_dc_); + remote_remaining_ = get_hosts_size(hosts_); remote_dcs_->erase(i); } diff --git a/src/rack_aware_policy.hpp b/src/rack_aware_policy.hpp index adbe138f2..1f68ae1a0 100644 --- a/src/rack_aware_policy.hpp +++ b/src/rack_aware_policy.hpp @@ -31,8 +31,7 @@ namespace datastax { namespace internal { namespace core { class RackAwarePolicy : public LoadBalancingPolicy { public: - RackAwarePolicy(const String& local_dc = "", const String &local_rack = "", size_t used_hosts_per_remote_dc = 0, - bool skip_remote_dcs_for_local_cl = true); + RackAwarePolicy(const String& local_dc = "", const String &local_rack = ""); ~RackAwarePolicy(); @@ -51,13 +50,11 @@ class RackAwarePolicy : public LoadBalancingPolicy { virtual void on_host_up(const Host::Ptr& host); virtual void on_host_down(const Address& address); - virtual bool skip_remote_dcs_for_local_cl() const; - virtual size_t used_hosts_per_remote_dc() const; virtual const String& local_dc() const; virtual const String& local_rack() const; virtual LoadBalancingPolicy* new_instance() { - return new RackAwarePolicy(local_dc_, local_rack_, used_hosts_per_remote_dc_, skip_remote_dcs_for_local_cl_); + return new RackAwarePolicy(local_dc_, local_rack_); } private: @@ -113,8 +110,6 @@ class RackAwarePolicy : public LoadBalancingPolicy { String local_dc_; String local_rack_; - size_t used_hosts_per_remote_dc_; - bool skip_remote_dcs_for_local_cl_; CopyOnWriteHostVec local_dc_live_hosts_; PerDCHostMap per_remote_dc_live_hosts_; From 3e60f42f5774128a4eeb9a14c837230bc3c770db Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Wed, 21 Dec 2022 12:37:26 +0100 Subject: [PATCH 5/7] Add fallback to remote DC to rack-aware policy We want to try local rack, local dc and then remote dcs, in that order. --- include/cassandra.h | 8 ++-- src/load_balancing.hpp | 1 + src/rack_aware_policy.cpp | 82 ++++++++++++++++++++++++++++---------- src/rack_aware_policy.hpp | 27 +++++++------ src/token_aware_policy.cpp | 15 +++++-- src/token_aware_policy.hpp | 6 ++- 6 files changed, 96 insertions(+), 43 deletions(-) diff --git a/include/cassandra.h b/include/cassandra.h index 54b253ed1..06389f8bd 100644 --- a/include/cassandra.h +++ b/include/cassandra.h @@ -2216,15 +2216,13 @@ cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster, /** * Configures the cluster to use Rack-aware load balancing. * For each query, all live nodes in a primary 'local' rack are tried first, - * followed by nodes from local DC and then any node from other DCs. + * followed by nodes from local DC and then nodes from other DCs. * - * Note: This is the default, and does not need to be called unless - * switching an existing from another policy or changing settings. - * Without further configuration, a default local_dc and local_rack + * With empty local_rack and local_dc, default local_dc and local_rack * is chosen from the first connected contact point, * and no remote hosts are considered in query plans. * If relying on this mechanism, be sure to use only contact - * points from the local DC. + * points from the local rack. * * @public @memberof CassCluster * diff --git a/src/load_balancing.hpp b/src/load_balancing.hpp index ff118ad4a..16259583e 100644 --- a/src/load_balancing.hpp +++ b/src/load_balancing.hpp @@ -43,6 +43,7 @@ typedef enum CassBalancingState_ { typedef enum CassHostDistance_ { CASS_HOST_DISTANCE_LOCAL, CASS_HOST_DISTANCE_REMOTE, + CASS_HOST_DISTANCE_REMOTE2, CASS_HOST_DISTANCE_IGNORE } CassHostDistance; diff --git a/src/rack_aware_policy.cpp b/src/rack_aware_policy.cpp index 00475fc93..626a080e4 100644 --- a/src/rack_aware_policy.cpp +++ b/src/rack_aware_policy.cpp @@ -29,7 +29,7 @@ using namespace datastax::internal::core; RackAwarePolicy::RackAwarePolicy(const String& local_dc, const String& local_rack) : local_dc_(local_dc) , local_rack_(local_rack) - , local_dc_live_hosts_(new HostVec()) + , local_rack_live_hosts_(new HostVec()) , index_(0) { uv_rwlock_init(&available_rwlock_); } @@ -77,11 +77,21 @@ CassHostDistance RackAwarePolicy::distance(const Host::Ptr& host) const { return CASS_HOST_DISTANCE_LOCAL; } - const CopyOnWriteHostVec& hosts = per_remote_dc_live_hosts_.get_hosts(host->rack()); + if (host->dc() == local_dc_) { + const CopyOnWriteHostVec& hosts = per_remote_rack_live_hosts_.get_hosts(host->rack()); + size_t num_hosts = hosts->size(); + for (size_t i = 0; i < num_hosts; ++i) { + if ((*hosts)[i]->address() == host->address()) { + return CASS_HOST_DISTANCE_REMOTE; + } + } + } + + const CopyOnWriteHostVec& hosts = per_remote_dc_live_hosts_.get_hosts(host->dc()); size_t num_hosts = hosts->size(); for (size_t i = 0; i < num_hosts; ++i) { if ((*hosts)[i]->address() == host->address()) { - return CASS_HOST_DISTANCE_REMOTE; + return CASS_HOST_DISTANCE_REMOTE2; } } @@ -117,9 +127,11 @@ void RackAwarePolicy::on_host_added(const Host::Ptr& host) { } if (dc == local_dc_ && rack == local_rack_) { - add_host(local_dc_live_hosts_, host); + add_host(local_rack_live_hosts_, host); + } else if (dc == local_dc_) { + per_remote_rack_live_hosts_.add_host_to_key(rack, host); } else { - per_remote_dc_live_hosts_.add_host_to_dc(rack, host); + per_remote_dc_live_hosts_.add_host_to_key(dc, host); } } @@ -127,9 +139,11 @@ void RackAwarePolicy::on_host_removed(const Host::Ptr& host) { const String& dc = host->dc(); const String& rack = host->rack(); if (dc == local_dc_ && rack == local_rack_) { - remove_host(local_dc_live_hosts_, host); + remove_host(local_rack_live_hosts_, host); + } else if (dc == local_dc_) { + per_remote_rack_live_hosts_.remove_host_from_key(host->rack(), host); } else { - per_remote_dc_live_hosts_.remove_host_from_dc(host->rack(), host); + per_remote_dc_live_hosts_.remove_host_from_key(host->dc(), host); } ScopedWriteLock wl(&available_rwlock_); @@ -144,7 +158,8 @@ void RackAwarePolicy::on_host_up(const Host::Ptr& host) { } void RackAwarePolicy::on_host_down(const Address& address) { - if (!remove_host(local_dc_live_hosts_, address) && + if (!remove_host(local_rack_live_hosts_, address) && + !per_remote_rack_live_hosts_.remove_host(address) && !per_remote_dc_live_hosts_.remove_host(address)) { LOG_DEBUG("Attempted to mark host %s as DOWN, but it doesn't exist", address.to_string().c_str()); @@ -164,27 +179,27 @@ const String& RackAwarePolicy::local_rack() const { return local_rack_; } -void RackAwarePolicy::PerDCHostMap::add_host_to_dc(const String& dc, const Host::Ptr& host) { +void RackAwarePolicy::PerKeyHostMap::add_host_to_key(const String& key, const Host::Ptr& host) { ScopedWriteLock wl(&rwlock_); - Map::iterator i = map_.find(dc); + Map::iterator i = map_.find(key); if (i == map_.end()) { CopyOnWriteHostVec hosts(new HostVec()); hosts->push_back(host); - map_.insert(Map::value_type(dc, hosts)); + map_.insert(Map::value_type(key, hosts)); } else { add_host(i->second, host); } } -void RackAwarePolicy::PerDCHostMap::remove_host_from_dc(const String& dc, const Host::Ptr& host) { +void RackAwarePolicy::PerKeyHostMap::remove_host_from_key(const String& key, const Host::Ptr& host) { ScopedWriteLock wl(&rwlock_); - Map::iterator i = map_.find(dc); + Map::iterator i = map_.find(key); if (i != map_.end()) { core::remove_host(i->second, host); } } -bool RackAwarePolicy::PerDCHostMap::remove_host(const Address& address) { +bool RackAwarePolicy::PerKeyHostMap::remove_host(const Address& address) { ScopedWriteLock wl(&rwlock_); for (Map::iterator i = map_.begin(), end = map_.end(); i != end; ++i) { if (core::remove_host(i->second, address)) { @@ -194,7 +209,7 @@ bool RackAwarePolicy::PerDCHostMap::remove_host(const Address& address) { return false; } -const CopyOnWriteHostVec& RackAwarePolicy::PerDCHostMap::get_hosts(const String& dc) const { +const CopyOnWriteHostVec& RackAwarePolicy::PerKeyHostMap::get_hosts(const String& dc) const { ScopedReadLock rl(&rwlock_); Map::const_iterator i = map_.find(dc); if (i == map_.end()) return no_hosts_; @@ -202,10 +217,10 @@ const CopyOnWriteHostVec& RackAwarePolicy::PerDCHostMap::get_hosts(const String& return i->second; } -void RackAwarePolicy::PerDCHostMap::copy_dcs(KeySet* dcs) const { +void RackAwarePolicy::PerKeyHostMap::copy_keys(KeySet* keys) const { ScopedReadLock rl(&rwlock_); for (Map::const_iterator i = map_.begin(), end = map_.end(); i != end; ++i) { - dcs->insert(i->first); + keys->insert(i->first); } } @@ -221,7 +236,7 @@ RackAwarePolicy::RackAwareQueryPlan::RackAwareQueryPlan(const RackAwarePolicy* p size_t start_index) : policy_(policy) , cl_(cl) - , hosts_(policy_->local_dc_live_hosts_) + , hosts_(policy_->local_rack_live_hosts_) , local_remaining_(get_hosts_size(hosts_)) , remote_remaining_(0) , index_(start_index) {} @@ -235,14 +250,39 @@ Host::Ptr RackAwarePolicy::RackAwareQueryPlan::compute_next() { } } + if (!remote_racks_) { + remote_racks_.reset(new PerKeyHostMap::KeySet()); + policy_->per_remote_rack_live_hosts_.copy_keys(remote_racks_.get()); + } + + while (true) { + while (remote_remaining_ > 0) { + --remote_remaining_; + const Host::Ptr& host( + get_next_host(hosts_, index_++)); + if (policy_->is_host_up(host->address())) { + return host; + } + } + + if (remote_racks_->empty()) { + break; + } + + PerKeyHostMap::KeySet::iterator i = remote_racks_->begin(); + hosts_ = policy_->per_remote_rack_live_hosts_.get_hosts(*i); + remote_remaining_ = get_hosts_size(hosts_); + remote_racks_->erase(i); + } + // Skip remote DCs for LOCAL_ consistency levels. if (is_dc_local(cl_)) { return Host::Ptr(); } if (!remote_dcs_) { - remote_dcs_.reset(new PerDCHostMap::KeySet()); - policy_->per_remote_dc_live_hosts_.copy_dcs(remote_dcs_.get()); + remote_dcs_.reset(new PerKeyHostMap::KeySet()); + policy_->per_remote_dc_live_hosts_.copy_keys(remote_dcs_.get()); } while (true) { @@ -259,7 +299,7 @@ Host::Ptr RackAwarePolicy::RackAwareQueryPlan::compute_next() { break; } - PerDCHostMap::KeySet::iterator i = remote_dcs_->begin(); + PerKeyHostMap::KeySet::iterator i = remote_dcs_->begin(); hosts_ = policy_->per_remote_dc_live_hosts_.get_hosts(*i); remote_remaining_ = get_hosts_size(hosts_); remote_dcs_->erase(i); diff --git a/src/rack_aware_policy.hpp b/src/rack_aware_policy.hpp index 1f68ae1a0..68282563b 100644 --- a/src/rack_aware_policy.hpp +++ b/src/rack_aware_policy.hpp @@ -58,22 +58,22 @@ class RackAwarePolicy : public LoadBalancingPolicy { } private: - class PerDCHostMap { + class PerKeyHostMap { public: typedef internal::Map Map; typedef Set KeySet; - PerDCHostMap() + PerKeyHostMap() : no_hosts_(new HostVec()) { uv_rwlock_init(&rwlock_); } - ~PerDCHostMap() { uv_rwlock_destroy(&rwlock_); } + ~PerKeyHostMap() { uv_rwlock_destroy(&rwlock_); } - void add_host_to_dc(const String& dc, const Host::Ptr& host); - void remove_host_from_dc(const String& dc, const Host::Ptr& host); + void add_host_to_key(const String& key, const Host::Ptr& host); + void remove_host_from_key(const String& key, const Host::Ptr& host); bool remove_host(const Address& address); - const CopyOnWriteHostVec& get_hosts(const String& dc) const; - void copy_dcs(KeySet* dcs) const; + const CopyOnWriteHostVec& get_hosts(const String& key) const; + void copy_keys(KeySet* keys) const; private: Map map_; @@ -81,11 +81,11 @@ class RackAwarePolicy : public LoadBalancingPolicy { const CopyOnWriteHostVec no_hosts_; private: - DISALLOW_COPY_AND_ASSIGN(PerDCHostMap); + DISALLOW_COPY_AND_ASSIGN(PerKeyHostMap); }; const CopyOnWriteHostVec& get_local_dc_hosts() const; - void get_remote_dcs(PerDCHostMap::KeySet* remote_dcs) const; + void get_remote_dcs(PerKeyHostMap::KeySet* remote_dcs) const; public: class RackAwareQueryPlan : public QueryPlan { @@ -98,7 +98,8 @@ class RackAwarePolicy : public LoadBalancingPolicy { const RackAwarePolicy* policy_; CassConsistency cl_; CopyOnWriteHostVec hosts_; - ScopedPtr remote_dcs_; + ScopedPtr remote_racks_; + ScopedPtr remote_dcs_; size_t local_remaining_; size_t remote_remaining_; size_t index_; @@ -111,8 +112,10 @@ class RackAwarePolicy : public LoadBalancingPolicy { String local_dc_; String local_rack_; - CopyOnWriteHostVec local_dc_live_hosts_; - PerDCHostMap per_remote_dc_live_hosts_; + CopyOnWriteHostVec local_rack_live_hosts_; + // remote rack, local dc + PerKeyHostMap per_remote_rack_live_hosts_; + PerKeyHostMap per_remote_dc_live_hosts_; size_t index_; private: diff --git a/src/token_aware_policy.cpp b/src/token_aware_policy.cpp index 82805af43..d63122736 100644 --- a/src/token_aware_policy.cpp +++ b/src/token_aware_policy.cpp @@ -87,8 +87,8 @@ QueryPlan* TokenAwarePolicy::new_query_plan(const String& keyspace, RequestHandl } Host::Ptr TokenAwarePolicy::TokenAwareQueryPlan::compute_next() { - while (remaining_ > 0) { - --remaining_; + while (remaining_local_ > 0) { + --remaining_local_; const Host::Ptr& host((*replicas_)[index_++ % replicas_->size()]); if (child_policy_->is_host_up(host->address()) && child_policy_->distance(host) == CASS_HOST_DISTANCE_LOCAL) { @@ -96,10 +96,19 @@ Host::Ptr TokenAwarePolicy::TokenAwareQueryPlan::compute_next() { } } + while (remaining_remote_ > 0) { + --remaining_remote_; + const Host::Ptr& host((*replicas_)[index_++ % replicas_->size()]); + if (child_policy_->is_host_up(host->address()) && + child_policy_->distance(host) == CASS_HOST_DISTANCE_REMOTE) { + return host; + } + } + Host::Ptr host; while ((host = child_plan_->compute_next())) { if (!contains(replicas_, host->address()) || - child_policy_->distance(host) != CASS_HOST_DISTANCE_LOCAL) { + child_policy_->distance(host) > CASS_HOST_DISTANCE_REMOTE) { return host; } } diff --git a/src/token_aware_policy.hpp b/src/token_aware_policy.hpp index 54fc5dc48..811bfb906 100644 --- a/src/token_aware_policy.hpp +++ b/src/token_aware_policy.hpp @@ -53,7 +53,8 @@ class TokenAwarePolicy : public ChainedLoadBalancingPolicy { , child_plan_(child_plan) , replicas_(replicas) , index_(start_index) - , remaining_(replicas->size()) {} + , remaining_local_(replicas->size()) + , remaining_remote_(replicas->size()) {} Host::Ptr compute_next(); @@ -62,7 +63,8 @@ class TokenAwarePolicy : public ChainedLoadBalancingPolicy { ScopedPtr child_plan_; CopyOnWriteHostVec replicas_; size_t index_; - size_t remaining_; + size_t remaining_local_; + size_t remaining_remote_; }; Random* random_; From 6b799bacb3dce1cc7a26d8032d2a95fd5480c34e Mon Sep 17 00:00:00 2001 From: Martin Sucha Date: Wed, 21 Dec 2022 12:37:26 +0100 Subject: [PATCH 6/7] Prefer replicas in remote DCs There is no reason why not to try to use a replica first. --- src/token_aware_policy.cpp | 11 ++++++++++- src/token_aware_policy.hpp | 4 +++- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/token_aware_policy.cpp b/src/token_aware_policy.cpp index d63122736..adb7a5f68 100644 --- a/src/token_aware_policy.cpp +++ b/src/token_aware_policy.cpp @@ -105,10 +105,19 @@ Host::Ptr TokenAwarePolicy::TokenAwareQueryPlan::compute_next() { } } + while (remaining_remote2_ > 0) { + --remaining_remote2_; + const Host::Ptr& host((*replicas_)[index_++ % replicas_->size()]); + if (child_policy_->is_host_up(host->address()) && + child_policy_->distance(host) == CASS_HOST_DISTANCE_REMOTE2) { + return host; + } + } + Host::Ptr host; while ((host = child_plan_->compute_next())) { if (!contains(replicas_, host->address()) || - child_policy_->distance(host) > CASS_HOST_DISTANCE_REMOTE) { + child_policy_->distance(host) > CASS_HOST_DISTANCE_REMOTE2) { return host; } } diff --git a/src/token_aware_policy.hpp b/src/token_aware_policy.hpp index 811bfb906..637f041cf 100644 --- a/src/token_aware_policy.hpp +++ b/src/token_aware_policy.hpp @@ -54,7 +54,8 @@ class TokenAwarePolicy : public ChainedLoadBalancingPolicy { , replicas_(replicas) , index_(start_index) , remaining_local_(replicas->size()) - , remaining_remote_(replicas->size()) {} + , remaining_remote_(replicas->size()) + , remaining_remote2_(replicas->size()) {} Host::Ptr compute_next(); @@ -65,6 +66,7 @@ class TokenAwarePolicy : public ChainedLoadBalancingPolicy { size_t index_; size_t remaining_local_; size_t remaining_remote_; + size_t remaining_remote2_; }; Random* random_; From 2d6469a014662b55fe6779fbcbaf8638047c94c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Peter=20Navr=C3=A1til?= Date: Wed, 21 Dec 2022 14:08:01 +0100 Subject: [PATCH 7/7] Fix session base on_connect --- src/session_base.cpp | 4 ++-- src/session_base.hpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/session_base.cpp b/src/session_base.cpp index 6ccb4f7c5..c1e7b6333 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -160,7 +160,7 @@ void SessionBase::notify_closed() { void SessionBase::on_connect(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc) { + const String& local_dc, const String& local_rack) { notify_connected(); } @@ -200,7 +200,7 @@ void SessionBase::on_initialize(ClusterConnector* connector) { } on_connect(cluster_->connected_host(), cluster_->protocol_version(), - cluster_->available_hosts(), cluster_->token_map(), cluster_->local_dc()); + cluster_->available_hosts(), cluster_->token_map(), cluster_->local_dc(), cluster_->local_rack()); } else { assert(!connector->is_canceled() && "Cluster connection process canceled"); switch (connector->error_code()) { diff --git a/src/session_base.hpp b/src/session_base.hpp index 1c3e6c68f..b0c3c7c16 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -117,7 +117,7 @@ class SessionBase : public ClusterListener { */ virtual void on_connect(const Host::Ptr& connected_host, ProtocolVersion protocol_version, const HostMap& hosts, const TokenMap::Ptr& token_map, - const String& local_dc); + const String& local_dc, const String& local_rack); /** * A callback called after the control connection fails to connect. By default