diff --git a/src/config.rs b/src/config.rs index f4177730..2f5596c6 100644 --- a/src/config.rs +++ b/src/config.rs @@ -487,10 +487,15 @@ pub struct Pool { #[serde(default)] // False pub primary_reads_enabled: bool, + /// Maximum time to allow for establishing a new server connection. pub connect_timeout: Option, + /// Close idle connections that have been opened for longer than this. pub idle_timeout: Option, + /// Close server connections that have been opened for longer than this. + /// Only applied to idle connections. If the connection is actively used for + /// longer than this period, the pool will not interrupt it. pub server_lifetime: Option, #[serde(default = "Pool::default_sharding_function")] @@ -507,6 +512,9 @@ pub struct Pool { pub auth_query_user: Option, pub auth_query_password: Option, + #[serde(default = "Pool::default_cleanup_server_connections")] + pub cleanup_server_connections: bool, + pub plugins: Option, pub shards: BTreeMap, pub users: BTreeMap, @@ -548,6 +556,10 @@ impl Pool { ShardingFunction::PgBigintHash } + pub fn default_cleanup_server_connections() -> bool { + true + } + pub fn validate(&mut self) -> Result<(), Error> { match self.default_role.as_ref() { "any" => (), @@ -637,6 +649,7 @@ impl Default for Pool { auth_query_password: None, server_lifetime: None, plugins: None, + cleanup_server_connections: true, } } } @@ -1066,6 +1079,10 @@ impl Config { None => "default".to_string(), } ); + info!( + "[pool: {}] Cleanup server connections: {}", + pool_name, pool_config.cleanup_server_connections + ); info!( "[pool: {}] Plugins: {}", pool_name, diff --git a/src/mirrors.rs b/src/mirrors.rs index d6d691ff..7e2c9a09 100644 --- a/src/mirrors.rs +++ b/src/mirrors.rs @@ -44,6 +44,7 @@ impl MirroredClient { Arc::new(PoolStats::new(identifier, cfg.clone())), Arc::new(RwLock::new(None)), None, + true, ); Pool::builder() diff --git a/src/pool.rs b/src/pool.rs index 6235e220..a0b0c4d1 100644 --- a/src/pool.rs +++ b/src/pool.rs @@ -364,6 +364,7 @@ impl ConnectionPool { Some(ref plugins) => Some(plugins.clone()), None => config.plugins.clone(), }, + pool_config.cleanup_server_connections, ); let connect_timeout = match pool_config.connect_timeout { @@ -914,13 +915,29 @@ impl ConnectionPool { /// Wrapper for the bb8 connection pool. pub struct ServerPool { + /// Server address. address: Address, + + /// Server Postgres user. user: User, + + /// Server database. database: String, + + /// Client/server mapping. client_server_map: ClientServerMap, + + /// Server statistics. stats: Arc, + + /// Server auth hash (for auth passthrough). auth_hash: Arc>>, + + /// Server plugins. plugins: Option, + + /// Should we clean up dirty connections before putting them into the pool? + cleanup_connections: bool, } impl ServerPool { @@ -932,6 +949,7 @@ impl ServerPool { stats: Arc, auth_hash: Arc>>, plugins: Option, + cleanup_connections: bool, ) -> ServerPool { ServerPool { address, @@ -941,6 +959,7 @@ impl ServerPool { stats, auth_hash, plugins, + cleanup_connections, } } } @@ -970,6 +989,7 @@ impl ManageConnection for ServerPool { self.client_server_map.clone(), stats.clone(), self.auth_hash.clone(), + self.cleanup_connections, ) .await { diff --git a/src/server.rs b/src/server.rs index 244c06e7..32dd91f8 100644 --- a/src/server.rs +++ b/src/server.rs @@ -188,13 +188,16 @@ pub struct Server { /// Application name using the server at the moment. application_name: String, - // Last time that a successful server send or response happened + /// Last time that a successful server send or response happened last_activity: SystemTime, mirror_manager: Option, - // Associated addresses used + /// Associated addresses used addr_set: Option, + + /// Should clean up dirty connections? + cleanup_connections: bool, } impl Server { @@ -207,6 +210,7 @@ impl Server { client_server_map: ClientServerMap, stats: Arc, auth_hash: Arc>>, + cleanup_connections: bool, ) -> Result { let cached_resolver = CACHED_RESOLVER.load(); let mut addr_set: Option = None; @@ -687,6 +691,7 @@ impl Server { address.mirrors.clone(), )), }, + cleanup_connections, }; server.set_name("pgcat").await?; @@ -1004,7 +1009,7 @@ impl Server { // to avoid leaking state between clients. For performance reasons we only // send `DISCARD ALL` if we think the session is altered instead of just sending // it before each checkin. - if self.cleanup_state.needs_cleanup() { + if self.cleanup_state.needs_cleanup() && self.cleanup_connections { warn!("Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name); self.query("DISCARD ALL").await?; self.query("RESET ROLE").await?; @@ -1084,6 +1089,7 @@ impl Server { client_server_map, Arc::new(ServerStats::default()), Arc::new(RwLock::new(None)), + true, ) .await?; debug!("Connected!, sending query."); diff --git a/tests/ruby/helpers/pgcat_helper.rb b/tests/ruby/helpers/pgcat_helper.rb index e36801eb..7a5bd71d 100644 --- a/tests/ruby/helpers/pgcat_helper.rb +++ b/tests/ruby/helpers/pgcat_helper.rb @@ -118,7 +118,7 @@ def self.single_instance_setup(pool_name, pool_size, pool_mode="transaction", lb end end - def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random", log_level="info") + def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mode="random", log_level="info", pool_settings={}) user = { "password" => "sharding_user", "pool_size" => pool_size, @@ -134,28 +134,32 @@ def self.single_shard_setup(pool_name, pool_size, pool_mode="transaction", lb_mo replica1 = PgInstance.new(8432, user["username"], user["password"], "shard0") replica2 = PgInstance.new(9432, user["username"], user["password"], "shard0") + pool_config = { + "default_role" => "any", + "pool_mode" => pool_mode, + "load_balancing_mode" => lb_mode, + "primary_reads_enabled" => false, + "query_parser_enabled" => false, + "sharding_function" => "pg_bigint_hash", + "shards" => { + "0" => { + "database" => "shard0", + "servers" => [ + ["localhost", primary.port.to_s, "primary"], + ["localhost", replica0.port.to_s, "replica"], + ["localhost", replica1.port.to_s, "replica"], + ["localhost", replica2.port.to_s, "replica"] + ] + }, + }, + "users" => { "0" => user } + } + + pool_config = pool_config.merge(pool_settings) + # Main proxy configs pgcat_cfg["pools"] = { - "#{pool_name}" => { - "default_role" => "any", - "pool_mode" => pool_mode, - "load_balancing_mode" => lb_mode, - "primary_reads_enabled" => false, - "query_parser_enabled" => false, - "sharding_function" => "pg_bigint_hash", - "shards" => { - "0" => { - "database" => "shard0", - "servers" => [ - ["localhost", primary.port.to_s, "primary"], - ["localhost", replica0.port.to_s, "replica"], - ["localhost", replica1.port.to_s, "replica"], - ["localhost", replica2.port.to_s, "replica"] - ] - }, - }, - "users" => { "0" => user } - } + "#{pool_name}" => pool_config, } pgcat_cfg["general"]["port"] = pgcat.port pgcat.update_config(pgcat_cfg) diff --git a/tests/ruby/misc_spec.rb b/tests/ruby/misc_spec.rb index e4d6f6fb..fe216e5b 100644 --- a/tests/ruby/misc_spec.rb +++ b/tests/ruby/misc_spec.rb @@ -320,6 +320,31 @@ expect(processes.primary.count_query("DISCARD ALL")).to eq(0) end end + + context "server cleanup disabled" do + let(:processes) { Helpers::Pgcat.single_shard_setup("sharded_db", 1, "transaction", "random", "info", { "cleanup_server_connections" => false }) } + + it "will not clean up connection state" do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + processes.primary.reset_stats + conn.async_exec("SET statement_timeout TO 1000") + conn.close + + puts processes.pgcat.logs + expect(processes.primary.count_query("DISCARD ALL")).to eq(0) + end + + it "will not clean up prepared statements" do + conn = PG::connect(processes.pgcat.connection_string("sharded_db", "sharding_user")) + processes.primary.reset_stats + conn.async_exec("PREPARE prepared_q (int) AS SELECT $1") + + conn.close + + puts processes.pgcat.logs + expect(processes.primary.count_query("DISCARD ALL")).to eq(0) + end + end end describe "Idle client timeout" do