Skip to content

Commit f963b12

Browse files
authored
Health check delay (#118)
* initial commit of server check delay implementation * fmt * spelling * Update name to last_healthcheck and some comments * Moved server tested stat to after require_healthcheck check * Make health check delay configurable * Rename to last_activity * Fix typo * Add debug log for healthcheck * Add address to debug log
1 parent a262337 commit f963b12

File tree

8 files changed

+165
-41
lines changed

8 files changed

+165
-41
lines changed

.circleci/pgcat.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ connect_timeout = 100
2020
# How much time to give the health check query to return with a result (ms).
2121
healthcheck_timeout = 100
2222

23+
# How long to keep connection available for immediate re-use, without running a healthcheck query on it
24+
healthcheck_delay = 30000
25+
2326
# How much time to give clients during shutdown before forcibly killing client connections (ms).
2427
shutdown_timeout = 5000
2528

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ psql -h 127.0.0.1 -p 6432 -c 'SELECT 1'
4848
| `connect_timeout` | Maximum time to establish a connection to a server (milliseconds). If reached, the server is banned and the next target is attempted. | `5000` |
4949
| `healthcheck_timeout` | Maximum time to pass a health check (`SELECT 1`, milliseconds). If reached, the server is banned and the next target is attempted. | `1000` |
5050
| `shutdown_timeout` | Maximum time to give clients during shutdown before forcibly killing client connections (ms). | `60000` |
51+
| `healthcheck_delay` | How long to keep connection available for immediate re-use, without running a healthcheck query on it | `30000` |
5152
| `ban_time` | Ban time for a server (seconds). It won't be allowed to serve transactions until the ban expires; failover targets will be used instead. | `60` |
5253
| | | |
5354
| **`user`** | | |
@@ -252,6 +253,7 @@ The config can be reloaded by sending a `kill -s SIGHUP` to the process or by qu
252253
| `connect_timeout` | yes |
253254
| `healthcheck_timeout` | no |
254255
| `shutdown_timeout` | no |
256+
| `healthcheck_delay` | no |
255257
| `ban_time` | no |
256258
| `user` | yes |
257259
| `shards` | yes |

examples/docker/pgcat.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ connect_timeout = 5000
2020
# How much time to give `SELECT 1` health check query to return with a result (ms).
2121
healthcheck_timeout = 1000
2222

23+
# How long to keep connection available for immediate re-use, without running a healthcheck query on it
24+
healthcheck_delay = 30000
25+
2326
# How much time to give clients during shutdown before forcibly killing client connections (ms).
2427
shutdown_timeout = 60000
2528

pgcat.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ connect_timeout = 5000
2020
# How much time to give the health check query to return with a result (ms).
2121
healthcheck_timeout = 1000
2222

23+
# How long to keep connection available for immediate re-use, without running a healthcheck query on it
24+
healthcheck_delay = 30000
25+
2326
# How much time to give clients during shutdown before forcibly killing client connections (ms).
2427
shutdown_timeout = 60000
2528

src/client.rs

Lines changed: 104 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ use tokio::net::TcpStream;
77
use tokio::sync::broadcast::Receiver;
88

99
use crate::admin::{generate_server_info_for_admin, handle_admin};
10-
use crate::config::get_config;
10+
use crate::config::{get_config, Address};
1111
use crate::constants::*;
1212
use crate::errors::Error;
1313
use crate::messages::*;
14-
use crate::pool::{get_pool, ClientServerMap};
14+
use crate::pool::{get_pool, ClientServerMap, ConnectionPool};
1515
use crate::query_router::{Command, QueryRouter};
1616
use crate::server::Server;
1717
use crate::stats::{get_reporter, Reporter};
@@ -246,7 +246,7 @@ where
246246
}
247247
}
248248

249-
/// Handle TLS connection negotation.
249+
/// Handle TLS connection negotiation.
250250
pub async fn startup_tls(
251251
stream: TcpStream,
252252
client_server_map: ClientServerMap,
@@ -259,14 +259,14 @@ pub async fn startup_tls(
259259
let mut stream = match tls.acceptor.accept(stream).await {
260260
Ok(stream) => stream,
261261

262-
// TLS negotitation failed.
262+
// TLS negotiation failed.
263263
Err(err) => {
264264
error!("TLS negotiation failed: {:?}", err);
265265
return Err(Error::TlsError);
266266
}
267267
};
268268

269-
// TLS negotitation successful.
269+
// TLS negotiation successful.
270270
// Continue with regular startup using encrypted connection.
271271
match get_startup::<TlsStream<TcpStream>>(&mut stream).await {
272272
// Got good startup message, proceeding like normal except we
@@ -540,21 +540,21 @@ where
540540
// Get a pool instance referenced by the most up-to-date
541541
// pointer. This ensures we always read the latest config
542542
// when starting a query.
543-
let mut pool =
544-
match get_pool(self.target_pool_name.clone(), self.target_user_name.clone()) {
545-
Some(pool) => pool,
546-
None => {
547-
error_response(
548-
&mut self.write,
549-
&format!(
550-
"No pool configured for database: {:?}, user: {:?}",
551-
self.target_pool_name, self.target_user_name
552-
),
553-
)
554-
.await?;
555-
return Err(Error::ClientError);
556-
}
557-
};
543+
let pool = match get_pool(self.target_pool_name.clone(), self.target_user_name.clone())
544+
{
545+
Some(pool) => pool,
546+
None => {
547+
error_response(
548+
&mut self.write,
549+
&format!(
550+
"No pool configured for database: {:?}, user: {:?}",
551+
self.target_pool_name, self.target_user_name
552+
),
553+
)
554+
.await?;
555+
return Err(Error::ClientError);
556+
}
557+
};
558558
query_router.update_pool_settings(pool.settings.clone());
559559
let current_shard = query_router.shard();
560560

@@ -731,12 +731,26 @@ where
731731
'Q' => {
732732
debug!("Sending query to server");
733733

734-
server.send(original).await?;
734+
self.send_server_message(
735+
server,
736+
original,
737+
&address,
738+
query_router.shard(),
739+
&pool,
740+
)
741+
.await?;
735742

736743
// Read all data the server has to offer, which can be multiple messages
737744
// buffered in 8196 bytes chunks.
738745
loop {
739-
let response = server.recv().await?;
746+
let response = self
747+
.receive_server_message(
748+
server,
749+
&address,
750+
query_router.shard(),
751+
&pool,
752+
)
753+
.await?;
740754

741755
// Send server reply to the client.
742756
match write_all_half(&mut self.write, response).await {
@@ -816,14 +830,28 @@ where
816830

817831
self.buffer.put(&original[..]);
818832

819-
server.send(self.buffer.clone()).await?;
833+
self.send_server_message(
834+
server,
835+
self.buffer.clone(),
836+
&address,
837+
query_router.shard(),
838+
&pool,
839+
)
840+
.await?;
820841

821842
self.buffer.clear();
822843

823844
// Read all data the server has to offer, which can be multiple messages
824845
// buffered in 8196 bytes chunks.
825846
loop {
826-
let response = server.recv().await?;
847+
let response = self
848+
.receive_server_message(
849+
server,
850+
&address,
851+
query_router.shard(),
852+
&pool,
853+
)
854+
.await?;
827855

828856
match write_all_half(&mut self.write, response).await {
829857
Ok(_) => (),
@@ -857,15 +885,31 @@ where
857885
'd' => {
858886
// Forward the data to the server,
859887
// don't buffer it since it can be rather large.
860-
server.send(original).await?;
888+
self.send_server_message(
889+
server,
890+
original,
891+
&address,
892+
query_router.shard(),
893+
&pool,
894+
)
895+
.await?;
861896
}
862897

863898
// CopyDone or CopyFail
864899
// Copy is done, successfully or not.
865900
'c' | 'f' => {
866-
server.send(original).await?;
901+
self.send_server_message(
902+
server,
903+
original,
904+
&address,
905+
query_router.shard(),
906+
&pool,
907+
)
908+
.await?;
867909

868-
let response = server.recv().await?;
910+
let response = self
911+
.receive_server_message(server, &address, query_router.shard(), &pool)
912+
.await?;
869913

870914
match write_all_half(&mut self.write, response).await {
871915
Ok(_) => (),
@@ -907,6 +951,39 @@ where
907951
let mut guard = self.client_server_map.lock();
908952
guard.remove(&(self.process_id, self.secret_key));
909953
}
954+
955+
async fn send_server_message(
956+
&self,
957+
server: &mut Server,
958+
message: BytesMut,
959+
address: &Address,
960+
shard: usize,
961+
pool: &ConnectionPool,
962+
) -> Result<(), Error> {
963+
match server.send(message).await {
964+
Ok(_) => Ok(()),
965+
Err(err) => {
966+
pool.ban(address, shard, self.process_id);
967+
Err(err)
968+
}
969+
}
970+
}
971+
972+
async fn receive_server_message(
973+
&self,
974+
server: &mut Server,
975+
address: &Address,
976+
shard: usize,
977+
pool: &ConnectionPool,
978+
) -> Result<BytesMut, Error> {
979+
match server.recv().await {
980+
Ok(message) => Ok(message),
981+
Err(err) => {
982+
pool.ban(address, shard, self.process_id);
983+
Err(err)
984+
}
985+
}
986+
}
910987
}
911988

912989
impl<S, T> Drop for Client<S, T> {

src/config.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ pub struct General {
121121
pub connect_timeout: u64,
122122
pub healthcheck_timeout: u64,
123123
pub shutdown_timeout: u64,
124+
pub healthcheck_delay: u64,
124125
pub ban_time: i64,
125126
pub autoreload: bool,
126127
pub tls_certificate: Option<String>,
@@ -138,6 +139,7 @@ impl Default for General {
138139
connect_timeout: 5000,
139140
healthcheck_timeout: 1000,
140141
shutdown_timeout: 60000,
142+
healthcheck_delay: 30000,
141143
ban_time: 60,
142144
autoreload: false,
143145
tls_certificate: None,
@@ -281,6 +283,10 @@ impl From<&Config> for std::collections::HashMap<String, String> {
281283
"shutdown_timeout".to_string(),
282284
config.general.shutdown_timeout.to_string(),
283285
),
286+
(
287+
"healthcheck_delay".to_string(),
288+
config.general.healthcheck_delay.to_string(),
289+
),
284290
("ban_time".to_string(), config.general.ban_time.to_string()),
285291
];
286292

@@ -299,6 +305,7 @@ impl Config {
299305
);
300306
info!("Connection timeout: {}ms", self.general.connect_timeout);
301307
info!("Shutdown timeout: {}ms", self.general.shutdown_timeout);
308+
info!("Healthcheck delay: {}ms", self.general.healthcheck_delay);
302309
match self.general.tls_certificate.clone() {
303310
Some(tls_certificate) => {
304311
info!("TLS certificate: {}", tls_certificate);

src/pool.rs

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ impl ConnectionPool {
251251

252252
/// Get a connection from the pool.
253253
pub async fn get(
254-
&mut self,
254+
&self,
255255
shard: usize, // shard number
256256
role: Option<Role>, // primary or replica
257257
process_id: i32, // client id
@@ -283,6 +283,9 @@ impl ConnectionPool {
283283
return Err(Error::BadConfig);
284284
}
285285

286+
let healthcheck_timeout = get_config().general.healthcheck_timeout;
287+
let healthcheck_delay = get_config().general.healthcheck_delay as u128;
288+
286289
while allowed_attempts > 0 {
287290
// Round-robin replicas.
288291
round_robin += 1;
@@ -312,7 +315,7 @@ impl ConnectionPool {
312315
Ok(conn) => conn,
313316
Err(err) => {
314317
error!("Banning replica {}, error: {:?}", index, err);
315-
self.ban(address, shard);
318+
self.ban(address, shard, process_id);
316319
self.stats.client_disconnecting(process_id, address.id);
317320
self.stats
318321
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
@@ -322,8 +325,19 @@ impl ConnectionPool {
322325

323326
// // Check if this server is alive with a health check.
324327
let server = &mut *conn;
325-
let healthcheck_timeout = get_config().general.healthcheck_timeout;
326328

329+
// Will return error if timestamp is greater than current system time, which it should never be set to
330+
let require_healthcheck =
331+
server.last_activity().elapsed().unwrap().as_millis() > healthcheck_delay;
332+
333+
if !require_healthcheck {
334+
self.stats
335+
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
336+
self.stats.server_idle(conn.process_id(), address.id);
337+
return Ok((conn, address.clone()));
338+
}
339+
340+
debug!("Running health check for replica {}, {:?}", index, address);
327341
self.stats.server_tested(server.process_id(), address.id);
328342

329343
match tokio::time::timeout(
@@ -348,10 +362,7 @@ impl ConnectionPool {
348362
// Don't leave a bad connection in the pool.
349363
server.mark_bad();
350364

351-
self.ban(address, shard);
352-
self.stats.client_disconnecting(process_id, address.id);
353-
self.stats
354-
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
365+
self.ban(address, shard, process_id);
355366
continue;
356367
}
357368
},
@@ -362,10 +373,7 @@ impl ConnectionPool {
362373
// Don't leave a bad connection in the pool.
363374
server.mark_bad();
364375

365-
self.ban(address, shard);
366-
self.stats.client_disconnecting(process_id, address.id);
367-
self.stats
368-
.checkout_time(now.elapsed().as_micros(), process_id, address.id);
376+
self.ban(address, shard, process_id);
369377
continue;
370378
}
371379
}
@@ -377,7 +385,11 @@ impl ConnectionPool {
377385
/// Ban an address (i.e. replica). It no longer will serve
378386
/// traffic for any new transactions. Existing transactions on that replica
379387
/// will finish successfully or error out to the clients.
380-
pub fn ban(&self, address: &Address, shard: usize) {
388+
pub fn ban(&self, address: &Address, shard: usize, process_id: i32) {
389+
self.stats.client_disconnecting(process_id, address.id);
390+
self.stats
391+
.checkout_time(Instant::now().elapsed().as_micros(), process_id, address.id);
392+
381393
error!("Banning {:?}", address);
382394
let now = chrono::offset::Utc::now().naive_utc();
383395
let mut guard = self.banlist.write();

0 commit comments

Comments
 (0)