diff --git a/src/admin.rs b/src/admin.rs index d794b86a..d16bb6ac 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -277,7 +277,7 @@ where for server in 0..pool.servers(shard) { let address = pool.address(shard, server); let pool_state = pool.pool_state(shard, server); - let banned = pool.is_banned(address, Some(address.role)); + let banned = pool.is_banned(address); res.put(data_row(&vec![ address.name(), // name diff --git a/src/pool.rs b/src/pool.rs index bb452537..f07287e6 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -329,9 +329,9 @@ impl ConnectionPool { /// Get a connection from the pool. pub async fn get( &self, - shard: usize, // shard number - role: Option, // primary or replica - process_id: i32, // client id + shard: usize, // shard number + role: Option, // primary or replica + client_process_id: i32, // client id ) -> Result<(PooledConnection<'_, ServerPool>, Address), Error> { let now = Instant::now(); let mut candidates: Vec<&Address> = self.addresses[shard] @@ -352,13 +352,19 @@ impl ConnectionPool { None => break, }; - if self.is_banned(&address, role) { - debug!("Address {:?} is banned", address); - continue; + let mut force_healthcheck = false; + + if self.is_banned(&address) { + if self.can_unban(&address).await { + force_healthcheck = true; + } else { + debug!("Address {:?} is banned", address); + continue; + } } // Indicate we're waiting on a server connection from a pool. - self.stats.client_waiting(process_id); + self.stats.client_waiting(client_process_id); // Check if we can connect let mut conn = match self.databases[address.shard][address.address_index] @@ -368,8 +374,9 @@ impl ConnectionPool { Ok(conn) => conn, Err(err) => { error!("Banning instance {:?}, error: {:?}", address, err); - self.ban(&address, process_id); - self.stats.client_checkout_error(process_id, address.id); + self.ban(&address, client_process_id); + self.stats + .client_checkout_error(client_process_id, address.id); continue; } }; @@ -378,84 +385,105 @@ impl ConnectionPool { let server = &mut *conn; // Will return error if timestamp is greater than current system time, which it should never be set to - let require_healthcheck = - server.last_activity().elapsed().unwrap().as_millis() > healthcheck_delay; + let require_healthcheck = force_healthcheck + || server.last_activity().elapsed().unwrap().as_millis() > healthcheck_delay; // Do not issue a health check unless it's been a little while // since we last checked the server is ok. // Health checks are pretty expensive. if !require_healthcheck { + self.stats.checkout_time( + now.elapsed().as_micros(), + client_process_id, + server.server_id(), + ); self.stats - .checkout_time(now.elapsed().as_micros(), process_id, server.server_id()); - self.stats.server_active(process_id, server.server_id()); + .server_active(client_process_id, server.server_id()); return Ok((conn, address.clone())); } - debug!("Running health check on server {:?}", address); - - self.stats.server_tested(server.server_id()); - - match tokio::time::timeout( - tokio::time::Duration::from_millis(healthcheck_timeout), - server.query(";"), // Cheap query (query parser not used in PG) - ) - .await + if self + .run_health_check(address, server, healthcheck_timeout, now, client_process_id) + .await { - // Check if health check succeeded. - Ok(res) => match res { - Ok(_) => { - self.stats.checkout_time( - now.elapsed().as_micros(), - process_id, - conn.server_id(), - ); - self.stats.server_active(process_id, conn.server_id()); - return Ok((conn, address.clone())); - } - - // Health check failed. - Err(err) => { - error!( - "Banning instance {:?} because of failed health check, {:?}", - address, err - ); + return Ok((conn, address.clone())); + } else { + continue; + } + } - // Don't leave a bad connection in the pool. - server.mark_bad(); + Err(Error::AllServersDown) + } - self.ban(&address, process_id); - continue; - } - }, + async fn run_health_check( + &self, + address: &Address, + server: &mut Server, + healthcheck_timeout: u64, + start: Instant, + client_process_id: i32, + ) -> bool { + debug!("Running health check on server {:?}", address); + + self.stats.server_tested(server.server_id()); + + match tokio::time::timeout( + tokio::time::Duration::from_millis(healthcheck_timeout), + server.query(";"), // Cheap query (query parser not used in PG) + ) + .await + { + // Check if health check succeeded. + Ok(res) => match res { + Ok(_) => { + self.stats.checkout_time( + start.elapsed().as_micros(), + client_process_id, + server.server_id(), + ); + self.stats + .server_active(client_process_id, server.server_id()); + return true; + } - // Health check timed out. + // Health check failed. Err(err) => { error!( - "Banning instance {:?} because of health check timeout, {:?}", + "Banning instance {:?} because of failed health check, {:?}", address, err ); - // Don't leave a bad connection in the pool. - server.mark_bad(); - - self.ban(&address, process_id); - continue; } + }, + + // Health check timed out. + Err(err) => { + error!( + "Banning instance {:?} because of health check timeout, {:?}", + address, err + ); } } - Err(Error::AllServersDown) + // Don't leave a bad connection in the pool. + server.mark_bad(); + + self.ban(&address, client_process_id); + return false; } /// Ban an address (i.e. replica). It no longer will serve /// traffic for any new transactions. Existing transactions on that replica /// will finish successfully or error out to the clients. - pub fn ban(&self, address: &Address, client_id: i32) { + pub fn ban(&self, address: &Address, client_process_id: i32) { error!("Banning {:?}", address); - self.stats.client_ban_error(client_id, address.id); + self.stats.client_ban_error(client_process_id, address.id); - let now = chrono::offset::Utc::now().naive_utc(); - let mut guard = self.banlist.write(); - guard[address.shard].insert(address.clone(), now); + // Only ban replicas + if address.role == Role::Replica { + let now = chrono::offset::Utc::now().naive_utc(); + let mut guard = self.banlist.write(); + guard[address.shard].insert(address.clone(), now); + } } /// Clear the replica to receive traffic again. Takes effect immediately @@ -465,56 +493,71 @@ impl ConnectionPool { guard[address.shard].remove(address); } + /// Check if address is banned + /// true if banned, false otherwise + pub fn is_banned(&self, address: &Address) -> bool { + let guard = self.banlist.read(); + + match guard[address.shard].get(address) { + Some(_) => true, + None => false, + } + } + /// Check if a replica can serve traffic. If all replicas are banned, /// we unban all of them. Better to try then not to. - pub fn is_banned(&self, address: &Address, role: Option) -> bool { - let replicas_available = match role { - Some(Role::Replica) => self.addresses[address.shard] - .iter() - .filter(|addr| addr.role == Role::Replica) - .count(), - None => self.addresses[address.shard].len(), - Some(Role::Primary) => return false, // Primary cannot be banned. - }; + /// returns true if can unban, false otherwise + async fn can_unban(&self, address: &Address) -> bool { + // If somehow primary ends up being banned we should return true here + if address.role == Role::Primary { + return true; + } + + let replicas_available = self.addresses[address.shard] + .iter() + .filter(|addr| addr.role == Role::Replica) + .count(); - debug!("Available targets for {:?}: {}", role, replicas_available); + debug!("Available targets: {}", replicas_available); - let guard = self.banlist.read(); + let banlist_guard = self.banlist.read(); // Everything is banned = nothing is banned. - if guard[address.shard].len() == replicas_available { - drop(guard); + if banlist_guard[address.shard].len() == replicas_available { + drop(banlist_guard); let mut guard = self.banlist.write(); guard[address.shard].clear(); drop(guard); warn!("Unbanning all replicas."); - return false; + return true; } - // I expect this to miss 99.9999% of the time. - match guard[address.shard].get(address) { + // Check if instance is banned and past the ban timer + match banlist_guard[address.shard].get(address) { Some(timestamp) => { let now = chrono::offset::Utc::now().naive_utc(); let config = get_config(); - // Ban expired. - if now.timestamp() - timestamp.timestamp() > config.general.ban_time { - drop(guard); - warn!("Unbanning {:?}", address); - let mut guard = self.banlist.write(); - guard[address.shard].remove(address); - false - } else { + // If ban time hasn't elapsed, can't unban + if !(now.timestamp() - timestamp.timestamp() > config.general.ban_time) { debug!("{:?} is banned", address); - true + return false; } } - + // address is not banned so unbanning will always "succeed" at unbanning None => { debug!("{:?} is ok", address); - false + return true; } - } + }; + + drop(banlist_guard); + + warn!("Unbanning {:?}", address); + let mut guard = self.banlist.write(); + guard[address.shard].remove(address); + + return true; } /// Get the number of configured shards.