From c5ca61f74f713763789815133db2dfd0e9301193 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Wed, 10 Aug 2022 01:07:16 -0400 Subject: [PATCH 01/10] initial commit of server check delay implementation --- src/client.rs | 125 ++++++++++++++++++++++++++++++++++++++++---------- src/pool.rs | 35 +++++++++----- src/server.rs | 19 +++++++- 3 files changed, 142 insertions(+), 37 deletions(-) diff --git a/src/client.rs b/src/client.rs index 9d4f4038..019f18c2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -7,11 +7,11 @@ use tokio::net::TcpStream; use tokio::sync::broadcast::Receiver; use crate::admin::{generate_server_info_for_admin, handle_admin}; -use crate::config::get_config; +use crate::config::{get_config, Address}; use crate::constants::*; use crate::errors::Error; use crate::messages::*; -use crate::pool::{get_pool, ClientServerMap}; +use crate::pool::{get_pool, ClientServerMap, ConnectionPool}; use crate::query_router::{Command, QueryRouter}; use crate::server::Server; use crate::stats::{get_reporter, Reporter}; @@ -540,21 +540,21 @@ where // Get a pool instance referenced by the most up-to-date // pointer. This ensures we always read the latest config // when starting a query. - let mut pool = - match get_pool(self.target_pool_name.clone(), self.target_user_name.clone()) { - Some(pool) => pool, - None => { - error_response( - &mut self.write, - &format!( - "No pool configured for database: {:?}, user: {:?}", - self.target_pool_name, self.target_user_name - ), - ) - .await?; - return Err(Error::ClientError); - } - }; + let pool = match get_pool(self.target_pool_name.clone(), self.target_user_name.clone()) + { + Some(pool) => pool, + None => { + error_response( + &mut self.write, + &format!( + "No pool configured for database: {:?}, user: {:?}", + self.target_pool_name, self.target_user_name + ), + ) + .await?; + return Err(Error::ClientError); + } + }; query_router.update_pool_settings(pool.settings.clone()); let current_shard = query_router.shard(); @@ -731,12 +731,26 @@ where 'Q' => { debug!("Sending query to server"); - server.send(original).await?; + self.send_server_message( + server, + original, + &address, + query_router.shard(), + &pool, + ) + .await?; // Read all data the server has to offer, which can be multiple messages // buffered in 8196 bytes chunks. loop { - let response = server.recv().await?; + let response = self + .receive_server_message( + server, + &address, + query_router.shard(), + &pool, + ) + .await?; // Send server reply to the client. match write_all_half(&mut self.write, response).await { @@ -816,14 +830,28 @@ where self.buffer.put(&original[..]); - server.send(self.buffer.clone()).await?; + self.send_server_message( + server, + self.buffer.clone(), + &address, + query_router.shard(), + &pool, + ) + .await?; self.buffer.clear(); // Read all data the server has to offer, which can be multiple messages // buffered in 8196 bytes chunks. loop { - let response = server.recv().await?; + let response = self + .receive_server_message( + server, + &address, + query_router.shard(), + &pool, + ) + .await?; match write_all_half(&mut self.write, response).await { Ok(_) => (), @@ -857,15 +885,31 @@ where 'd' => { // Forward the data to the server, // don't buffer it since it can be rather large. - server.send(original).await?; + self.send_server_message( + server, + original, + &address, + query_router.shard(), + &pool, + ) + .await?; } // CopyDone or CopyFail // Copy is done, successfully or not. 'c' | 'f' => { - server.send(original).await?; + self.send_server_message( + server, + original, + &address, + query_router.shard(), + &pool, + ) + .await?; - let response = server.recv().await?; + let response = self + .receive_server_message(server, &address, query_router.shard(), &pool) + .await?; match write_all_half(&mut self.write, response).await { Ok(_) => (), @@ -907,6 +951,39 @@ where let mut guard = self.client_server_map.lock(); guard.remove(&(self.process_id, self.secret_key)); } + + async fn send_server_message( + &self, + server: &mut Server, + message: BytesMut, + address: &Address, + shard: usize, + pool: &ConnectionPool, + ) -> Result<(), Error> { + match server.send(message).await { + Ok(_) => Ok(()), + Err(err) => { + pool.ban(address, shard, self.process_id); + Err(err) + } + } + } + + async fn receive_server_message( + &self, + server: &mut Server, + address: &Address, + shard: usize, + pool: &ConnectionPool, + ) -> Result { + match server.recv().await { + Ok(message) => Ok(message), + Err(err) => { + pool.ban(address, shard, self.process_id); + Err(err) + } + } + } } impl Drop for Client { diff --git a/src/pool.rs b/src/pool.rs index 775c8d42..d4528a41 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -251,7 +251,7 @@ impl ConnectionPool { /// Get a connection from the pool. pub async fn get( - &mut self, + &self, shard: usize, // shard number role: Option, // primary or replica process_id: i32, // client id @@ -312,7 +312,7 @@ impl ConnectionPool { Ok(conn) => conn, Err(err) => { error!("Banning replica {}, error: {:?}", index, err); - self.ban(address, shard); + self.ban(address, shard, process_id); self.stats.client_disconnecting(process_id, address.id); self.stats .checkout_time(now.elapsed().as_micros(), process_id, address.id); @@ -326,6 +326,21 @@ impl ConnectionPool { self.stats.server_tested(server.process_id(), address.id); + // Will return error if timestamp is greater than current system time, which it should never be set to + let require_health_check = server + .latest_successful_server_interaction_timestamp() + .elapsed() + .unwrap() + .as_millis() + > 10000; // TODO: Make this configurable + + if !require_health_check { + self.stats + .checkout_time(now.elapsed().as_micros(), process_id, address.id); + self.stats.server_idle(conn.process_id(), address.id); + return Ok((conn, address.clone())); + } + match tokio::time::timeout( tokio::time::Duration::from_millis(healthcheck_timeout), server.query(";"), @@ -348,10 +363,7 @@ impl ConnectionPool { // Don't leave a bad connection in the pool. server.mark_bad(); - self.ban(address, shard); - self.stats.client_disconnecting(process_id, address.id); - self.stats - .checkout_time(now.elapsed().as_micros(), process_id, address.id); + self.ban(address, shard, process_id); continue; } }, @@ -362,10 +374,7 @@ impl ConnectionPool { // Don't leave a bad connection in the pool. server.mark_bad(); - self.ban(address, shard); - self.stats.client_disconnecting(process_id, address.id); - self.stats - .checkout_time(now.elapsed().as_micros(), process_id, address.id); + self.ban(address, shard, process_id); continue; } } @@ -377,7 +386,11 @@ impl ConnectionPool { /// Ban an address (i.e. replica). It no longer will serve /// traffic for any new transactions. Existing transactions on that replica /// will finish successfully or error out to the clients. - pub fn ban(&self, address: &Address, shard: usize) { + pub fn ban(&self, address: &Address, shard: usize, process_id: i32) { + self.stats.client_disconnecting(process_id, address.id); + self.stats + .checkout_time(Instant::now().elapsed().as_micros(), process_id, address.id); + error!("Banning {:?}", address); let now = chrono::offset::Utc::now().naive_utc(); let mut guard = self.banlist.write(); diff --git a/src/server.rs b/src/server.rs index d21696f0..1df1dd82 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,6 +2,7 @@ /// Here we are pretending to the a Postgres client. use bytes::{Buf, BufMut, BytesMut}; use log::{debug, error, info, trace}; +use std::time::SystemTime; use tokio::io::{AsyncReadExt, BufReader}; use tokio::net::{ tcp::{OwnedReadHalf, OwnedWriteHalf}, @@ -58,6 +59,9 @@ pub struct Server { /// Application name using the server at the moment. application_name: String, + + // Last time that a successful server send or response happened + latest_successful_server_interaction_timestamp: SystemTime, } impl Server { @@ -316,6 +320,7 @@ impl Server { connected_at: chrono::offset::Utc::now().naive_utc(), stats: stats, application_name: String::new(), + latest_successful_server_interaction_timestamp: SystemTime::now(), }; server.set_name("pgcat").await?; @@ -366,7 +371,10 @@ impl Server { .data_sent(messages.len(), self.process_id, self.address.id); match write_all_half(&mut self.write, messages).await { - Ok(_) => Ok(()), + Ok(_) => { + self.latest_successful_server_interaction_timestamp = SystemTime::now(); + Ok(()) + }, Err(err) => { error!("Terminating server because of: {:?}", err); self.bad = true; @@ -413,7 +421,7 @@ impl Server { self.in_transaction = false; } - // Some error occured, the transaction was rolled back. + // Some error occurred, the transaction was rolled back. 'E' => { self.in_transaction = true; } @@ -474,6 +482,8 @@ impl Server { // Clear the buffer for next query. self.buffer.clear(); + self.latest_successful_server_interaction_timestamp = SystemTime::now(); + // Pass the data back to the client. Ok(bytes) } @@ -564,6 +574,11 @@ impl Server { pub fn process_id(&self) -> i32 { self.process_id } + + // Get server's latest response timestamp + pub fn latest_successful_server_interaction_timestamp(&self) -> SystemTime { + self.latest_successful_server_interaction_timestamp + } } impl Drop for Server { From d999c933c2c0a5da42618c1f5d214e1b6c121b68 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Wed, 10 Aug 2022 01:23:14 -0400 Subject: [PATCH 02/10] fmt --- src/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.rs b/src/server.rs index 1df1dd82..23f9a8b2 100644 --- a/src/server.rs +++ b/src/server.rs @@ -374,7 +374,7 @@ impl Server { Ok(_) => { self.latest_successful_server_interaction_timestamp = SystemTime::now(); Ok(()) - }, + } Err(err) => { error!("Terminating server because of: {:?}", err); self.bad = true; From d3c9d6204acfb0aa3a715b4add5438dcc14592da Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Wed, 10 Aug 2022 01:36:49 -0400 Subject: [PATCH 03/10] spelling --- src/client.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/client.rs b/src/client.rs index 019f18c2..1dd1bccb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -246,7 +246,7 @@ where } } -/// Handle TLS connection negotation. +/// Handle TLS connection negotiation. pub async fn startup_tls( stream: TcpStream, client_server_map: ClientServerMap, @@ -259,14 +259,14 @@ pub async fn startup_tls( let mut stream = match tls.acceptor.accept(stream).await { Ok(stream) => stream, - // TLS negotitation failed. + // TLS negotiation failed. Err(err) => { error!("TLS negotiation failed: {:?}", err); return Err(Error::TlsError); } }; - // TLS negotitation successful. + // TLS negotiation successful. // Continue with regular startup using encrypted connection. match get_startup::>(&mut stream).await { // Got good startup message, proceeding like normal except we From 4641a573cdbd568da9771992fa748061ab819ce7 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Wed, 10 Aug 2022 12:55:39 -0400 Subject: [PATCH 04/10] Update name to last_healthcheck and some comments --- src/pool.rs | 12 ++++-------- src/server.rs | 14 ++++++++------ 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/pool.rs b/src/pool.rs index d4528a41..17a86e68 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -327,14 +327,10 @@ impl ConnectionPool { self.stats.server_tested(server.process_id(), address.id); // Will return error if timestamp is greater than current system time, which it should never be set to - let require_health_check = server - .latest_successful_server_interaction_timestamp() - .elapsed() - .unwrap() - .as_millis() - > 10000; // TODO: Make this configurable - - if !require_health_check { + let require_healthcheck = + server.last_healthcheck().elapsed().unwrap().as_millis() > 10000; + + if !require_healthcheck { self.stats .checkout_time(now.elapsed().as_micros(), process_id, address.id); self.stats.server_idle(conn.process_id(), address.id); diff --git a/src/server.rs b/src/server.rs index 23f9a8b2..5daf9cfd 100644 --- a/src/server.rs +++ b/src/server.rs @@ -61,7 +61,7 @@ pub struct Server { application_name: String, // Last time that a successful server send or response happened - latest_successful_server_interaction_timestamp: SystemTime, + last_healthcheck: SystemTime, } impl Server { @@ -320,7 +320,7 @@ impl Server { connected_at: chrono::offset::Utc::now().naive_utc(), stats: stats, application_name: String::new(), - latest_successful_server_interaction_timestamp: SystemTime::now(), + last_healthcheck: SystemTime::now(), }; server.set_name("pgcat").await?; @@ -372,7 +372,8 @@ impl Server { match write_all_half(&mut self.write, messages).await { Ok(_) => { - self.latest_successful_server_interaction_timestamp = SystemTime::now(); + // Successfully sent to server + self.last_healthcheck = SystemTime::now(); Ok(()) } Err(err) => { @@ -482,7 +483,8 @@ impl Server { // Clear the buffer for next query. self.buffer.clear(); - self.latest_successful_server_interaction_timestamp = SystemTime::now(); + // Successfully received data from server + self.last_healthcheck = SystemTime::now(); // Pass the data back to the client. Ok(bytes) @@ -576,8 +578,8 @@ impl Server { } // Get server's latest response timestamp - pub fn latest_successful_server_interaction_timestamp(&self) -> SystemTime { - self.latest_successful_server_interaction_timestamp + pub fn last_healthcheck(&self) -> SystemTime { + self.last_healthcheck } } From 49883fbe6138cb01500cc0b6b835c61692b81167 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Wed, 10 Aug 2022 13:03:43 -0400 Subject: [PATCH 05/10] Moved server tested stat to after require_healthcheck check --- src/pool.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/pool.rs b/src/pool.rs index 17a86e68..217d7c62 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -324,8 +324,6 @@ impl ConnectionPool { let server = &mut *conn; let healthcheck_timeout = get_config().general.healthcheck_timeout; - self.stats.server_tested(server.process_id(), address.id); - // Will return error if timestamp is greater than current system time, which it should never be set to let require_healthcheck = server.last_healthcheck().elapsed().unwrap().as_millis() > 10000; @@ -337,6 +335,8 @@ impl ConnectionPool { return Ok((conn, address.clone())); } + self.stats.server_tested(server.process_id(), address.id); + match tokio::time::timeout( tokio::time::Duration::from_millis(healthcheck_timeout), server.query(";"), From bc67a799ab3e9c27c4dda3b83b196907b174f2f2 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Wed, 10 Aug 2022 14:05:42 -0400 Subject: [PATCH 06/10] Make health check delay configurable --- .circleci/pgcat.toml | 3 +++ README.md | 2 ++ examples/docker/pgcat.toml | 3 +++ pgcat.toml | 3 +++ src/config.rs | 7 +++++++ src/pool.rs | 6 ++++-- 6 files changed, 22 insertions(+), 2 deletions(-) diff --git a/.circleci/pgcat.toml b/.circleci/pgcat.toml index bc37a291..24dfe97d 100644 --- a/.circleci/pgcat.toml +++ b/.circleci/pgcat.toml @@ -20,6 +20,9 @@ connect_timeout = 100 # How much time to give the health check query to return with a result (ms). healthcheck_timeout = 100 +# How long to keep connection available for immediate re-use, without running a healthcheck query on it +healthcheck_delay = 30000 + # How much time to give clients during shutdown before forcibly killing client connections (ms). shutdown_timeout = 5000 diff --git a/README.md b/README.md index c03982d2..1fac2b60 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,7 @@ psql -h 127.0.0.1 -p 6432 -c 'SELECT 1' | `connect_timeout` | Maximum time to establish a connection to a server (milliseconds). If reached, the server is banned and the next target is attempted. | `5000` | | `healthcheck_timeout` | Maximum time to pass a health check (`SELECT 1`, milliseconds). If reached, the server is banned and the next target is attempted. | `1000` | | `shutdown_timeout` | Maximum time to give clients during shutdown before forcibly killing client connections (ms). | `60000` | +| `healthcheck_delay` | How long to keep connection available for immediate re-use, without running a healthcheck query on it | `30000` | | `ban_time` | Ban time for a server (seconds). It won't be allowed to serve transactions until the ban expires; failover targets will be used instead. | `60` | | | | | | **`user`** | | | @@ -252,6 +253,7 @@ The config can be reloaded by sending a `kill -s SIGHUP` to the process or by qu | `connect_timeout` | yes | | `healthcheck_timeout` | no | | `shutdown_timeout` | no | +| `healthcheck_delay` | no | | `ban_time` | no | | `user` | yes | | `shards` | yes | diff --git a/examples/docker/pgcat.toml b/examples/docker/pgcat.toml index cbfb1d9b..b2ce0b64 100644 --- a/examples/docker/pgcat.toml +++ b/examples/docker/pgcat.toml @@ -20,6 +20,9 @@ connect_timeout = 5000 # How much time to give `SELECT 1` health check query to return with a result (ms). healthcheck_timeout = 1000 +# How long to keep connection available for immediate re-use, without running a healthcheck query on it +healthcheck_delay = 30000 + # How much time to give clients during shutdown before forcibly killing client connections (ms). shutdown_timeout = 60000 diff --git a/pgcat.toml b/pgcat.toml index d826994a..5b046785 100644 --- a/pgcat.toml +++ b/pgcat.toml @@ -20,6 +20,9 @@ connect_timeout = 5000 # How much time to give the health check query to return with a result (ms). healthcheck_timeout = 1000 +# How long to keep connection available for immediate re-use, without running a healthcheck query on it +healthcheck_delay = 30000 + # How much time to give clients during shutdown before forcibly killing client connections (ms). shutdown_timeout = 60000 diff --git a/src/config.rs b/src/config.rs index 17aad85c..435cd9e3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -121,6 +121,7 @@ pub struct General { pub connect_timeout: u64, pub healthcheck_timeout: u64, pub shutdown_timeout: u64, + pub healthcheck_delay: u64, pub ban_time: i64, pub autoreload: bool, pub tls_certificate: Option, @@ -138,6 +139,7 @@ impl Default for General { connect_timeout: 5000, healthcheck_timeout: 1000, shutdown_timeout: 60000, + healthcheck_delay: 30000, ban_time: 60, autoreload: false, tls_certificate: None, @@ -281,6 +283,10 @@ impl From<&Config> for std::collections::HashMap { "shutdown_timeout".to_string(), config.general.shutdown_timeout.to_string(), ), + ( + "healthcheck_delay".to_string(), + config.general.healthcheck_delay.to_string(), + ), ("ban_time".to_string(), config.general.ban_time.to_string()), ]; @@ -299,6 +305,7 @@ impl Config { ); info!("Connection timeout: {}ms", self.general.connect_timeout); info!("Shutdown timeout: {}ms", self.general.shutdown_timeout); + info!("Healthcheck delay: {}ms", self.general.healthcheck_delay); match self.general.tls_certificate.clone() { Some(tls_certificate) => { info!("TLS certificate: {}", tls_certificate); diff --git a/src/pool.rs b/src/pool.rs index 217d7c62..f0c9b79a 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -283,6 +283,9 @@ impl ConnectionPool { return Err(Error::BadConfig); } + let healthcheck_timeout = get_config().general.healthcheck_timeout; + let healthcheck_delay = get_config().general.healthcheck_delay as u128; + while allowed_attempts > 0 { // Round-robin replicas. round_robin += 1; @@ -322,11 +325,10 @@ impl ConnectionPool { // // Check if this server is alive with a health check. let server = &mut *conn; - let healthcheck_timeout = get_config().general.healthcheck_timeout; // Will return error if timestamp is greater than current system time, which it should never be set to let require_healthcheck = - server.last_healthcheck().elapsed().unwrap().as_millis() > 10000; + server.last_healthcheck().elapsed().unwrap().as_millis() > healthcheck_delay; if !require_healthcheck { self.stats From 5381e59afee02bdaf567c4b79d9892efe8474912 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Thu, 11 Aug 2022 17:19:46 -0400 Subject: [PATCH 07/10] Rename to last_activity --- src/server.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/server.rs b/src/server.rs index 5daf9cfd..ddf95ce6 100644 --- a/src/server.rs +++ b/src/server.rs @@ -61,7 +61,7 @@ pub struct Server { application_name: String, // Last time that a successful server send or response happened - last_healthcheck: SystemTime, + last_activity: SystemTime, } impl Server { @@ -320,7 +320,7 @@ impl Server { connected_at: chrono::offset::Utc::now().naive_utc(), stats: stats, application_name: String::new(), - last_healthcheck: SystemTime::now(), + last_activity: SystemTime::now(), }; server.set_name("pgcat").await?; @@ -373,7 +373,7 @@ impl Server { match write_all_half(&mut self.write, messages).await { Ok(_) => { // Successfully sent to server - self.last_healthcheck = SystemTime::now(); + self.last_activity = SystemTime::now(); Ok(()) } Err(err) => { @@ -484,7 +484,7 @@ impl Server { self.buffer.clear(); // Successfully received data from server - self.last_healthcheck = SystemTime::now(); + self.last_activity = SystemTime::now(); // Pass the data back to the client. Ok(bytes) @@ -578,8 +578,8 @@ impl Server { } // Get server's latest response timestamp - pub fn last_healthcheck(&self) -> SystemTime { - self.last_healthcheck + pub fn last_activity(&self) -> SystemTime { + self.last_activity } } From db6fa6c1a9a201ffc9a179319c54f1337ae74816 Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Thu, 11 Aug 2022 17:23:53 -0400 Subject: [PATCH 08/10] Fix typo --- src/pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pool.rs b/src/pool.rs index f0c9b79a..c528d0cc 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -328,7 +328,7 @@ impl ConnectionPool { // Will return error if timestamp is greater than current system time, which it should never be set to let require_healthcheck = - server.last_healthcheck().elapsed().unwrap().as_millis() > healthcheck_delay; + server.last_activity().elapsed().unwrap().as_millis() > healthcheck_delay; if !require_healthcheck { self.stats From d1d5465bbd581ec09f231912c018f620441e9c4b Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Thu, 11 Aug 2022 17:27:59 -0400 Subject: [PATCH 09/10] Add debug log for healthcheck --- src/pool.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/pool.rs b/src/pool.rs index c528d0cc..ff1b38bf 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -337,6 +337,7 @@ impl ConnectionPool { return Ok((conn, address.clone())); } + debug!("Running health check for replica {}", index); self.stats.server_tested(server.process_id(), address.id); match tokio::time::timeout( From f56a6367df0aaca70b00c2a30b6982771931cb4d Mon Sep 17 00:00:00 2001 From: Zain Kabani Date: Thu, 11 Aug 2022 17:36:42 -0400 Subject: [PATCH 10/10] Add address to debug log --- src/pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pool.rs b/src/pool.rs index ff1b38bf..fcb6f855 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -337,7 +337,7 @@ impl ConnectionPool { return Ok((conn, address.clone())); } - debug!("Running health check for replica {}", index); + debug!("Running health check for replica {}, {:?}", index, address); self.stats.server_tested(server.process_id(), address.id); match tokio::time::timeout(