diff --git a/src/server.rs b/src/server.rs index 8f0ece99..dceab49d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -103,6 +103,48 @@ impl StreamInner { } } +#[derive(Copy, Clone)] +struct CleanupState { + /// If server connection requires DISCARD ALL before checkin because of set statement + needs_cleanup_set: bool, + + /// If server connection requires DISCARD ALL before checkin because of prepare statement + needs_cleanup_prepare: bool, +} + +impl CleanupState { + fn new() -> Self { + CleanupState { + needs_cleanup_set: false, + needs_cleanup_prepare: false, + } + } + + fn needs_cleanup(&self) -> bool { + self.needs_cleanup_set || self.needs_cleanup_prepare + } + + fn set_true(&mut self) { + self.needs_cleanup_set = true; + self.needs_cleanup_prepare = true; + } + + fn reset(&mut self) { + self.needs_cleanup_set = false; + self.needs_cleanup_prepare = false; + } +} + +impl std::fmt::Display for CleanupState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "SET: {}, PREPARE: {}", + self.needs_cleanup_set, self.needs_cleanup_prepare + ) + } +} + /// Server state. pub struct Server { /// Server host, e.g. localhost, @@ -131,8 +173,8 @@ pub struct Server { /// Is the server broken? We'll remote it from the pool if so. bad: bool, - /// If server connection requires a DISCARD ALL before checkin - needs_cleanup: bool, + /// If server connection requires DISCARD ALL before checkin + cleanup_state: CleanupState, /// Mapping of clients and servers used for query cancellation. client_server_map: ClientServerMap, @@ -630,7 +672,7 @@ impl Server { in_transaction: false, data_available: false, bad: false, - needs_cleanup: false, + cleanup_state: CleanupState::new(), client_server_map, addr_set, connected_at: chrono::offset::Utc::now().naive_utc(), @@ -793,12 +835,12 @@ impl Server { // This will reduce amount of discard statements sent if !self.in_transaction { debug!("Server connection marked for clean up"); - self.needs_cleanup = true; + self.cleanup_state.needs_cleanup_set = true; } } "PREPARE\0" => { debug!("Server connection marked for clean up"); - self.needs_cleanup = true; + self.cleanup_state.needs_cleanup_prepare = true; } _ => (), } @@ -960,11 +1002,11 @@ impl Server { // to avoid leaking state between clients. For performance reasons we only // send `DISCARD ALL` if we think the session is altered instead of just sending // it before each checkin. - if self.needs_cleanup { - warn!("Server returned with session state altered, discarding state"); + if self.cleanup_state.needs_cleanup() { + warn!("Server returned with session state altered, discarding state ({}) for application {}", self.cleanup_state, self.application_name); self.query("DISCARD ALL").await?; self.query("RESET ROLE").await?; - self.needs_cleanup = false; + self.cleanup_state.reset(); } Ok(()) @@ -976,12 +1018,12 @@ impl Server { self.application_name = name.to_string(); // We don't want `SET application_name` to mark the server connection // as needing cleanup - let needs_cleanup_before = self.needs_cleanup; + let needs_cleanup_before = self.cleanup_state; let result = Ok(self .query(&format!("SET application_name = '{}'", name)) .await?); - self.needs_cleanup = needs_cleanup_before; + self.cleanup_state = needs_cleanup_before; result } else { Ok(()) @@ -1006,7 +1048,7 @@ impl Server { // Marks a connection as needing DISCARD ALL at checkin pub fn mark_dirty(&mut self) { - self.needs_cleanup = true; + self.cleanup_state.set_true(); } pub fn mirror_send(&mut self, bytes: &BytesMut) {