From caf94bf7e53906778ad9a3df7c2ae08b891960e0 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Fri, 26 Jan 2024 12:07:27 +0100 Subject: [PATCH 01/12] Simplify make_pooled() use and signature --- bb8/src/inner.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index ed13264..e57571d 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -83,8 +83,7 @@ where } pub(crate) async fn get(&self) -> Result, RunError> { - self.make_pooled(|this, conn| PooledConnection::new(this, conn)) - .await + self.make_pooled(PooledConnection::new).await } pub(crate) async fn get_owned( @@ -99,13 +98,10 @@ where .await } - pub(crate) async fn make_pooled<'a, 'b, F>( + pub(crate) async fn make_pooled<'a, 'b>( &'a self, - make_pooled_conn: F, - ) -> Result, RunError> - where - F: Fn(&'a Self, Conn) -> PooledConnection<'b, M>, - { + make_pooled_conn: impl Fn(&'a Self, Conn) -> PooledConnection<'b, M>, + ) -> Result, RunError> { loop { let mut conn = { let mut locked = self.inner.internals.lock(); From 4f920637413e8a760e907f59886aa72e9586482b Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Fri, 26 Jan 2024 12:11:51 +0100 Subject: [PATCH 02/12] Hoist get() timeouts one level up --- bb8/src/inner.rs | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index e57571d..759be1a 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -83,19 +83,27 @@ where } pub(crate) async fn get(&self) -> Result, RunError> { - self.make_pooled(PooledConnection::new).await + let future = self.make_pooled(PooledConnection::new); + match timeout(self.inner.statics.connection_timeout, future).await { + Ok(result) => result, + _ => Err(RunError::TimedOut), + } } pub(crate) async fn get_owned( &self, ) -> Result, RunError> { - self.make_pooled(|this, conn| { + let future = self.make_pooled(|this, conn| { let pool = PoolInner { inner: Arc::clone(&this.inner), }; PooledConnection::new_owned(pool, conn) - }) - .await + }); + + match timeout(self.inner.statics.connection_timeout, future).await { + Ok(result) => result, + _ => Err(RunError::TimedOut), + } } pub(crate) async fn make_pooled<'a, 'b>( @@ -135,10 +143,10 @@ where self.spawn_replenishing_approvals(approvals); }; - match timeout(self.inner.statics.connection_timeout, rx).await { - Ok(Ok(Ok(mut guard))) => Ok(make_pooled_conn(self, guard.extract())), - Ok(Ok(Err(e))) => Err(RunError::User(e)), - _ => Err(RunError::TimedOut), + match rx.await { + Ok(Ok(mut guard)) => Ok(make_pooled_conn(self, guard.extract())), + Ok(Err(e)) => Err(RunError::User(e)), + Err(_) => Err(RunError::TimedOut), } } From 96e09c5504db5341e1bf2fe334352ee9c90500e4 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Fri, 26 Jan 2024 12:45:44 +0100 Subject: [PATCH 03/12] Move pop() up into SharedPool --- bb8/src/inner.rs | 13 +++++-------- bb8/src/internals.rs | 15 ++++++--------- 2 files changed, 11 insertions(+), 17 deletions(-) diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index 759be1a..ad93cf4 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -111,15 +111,12 @@ where make_pooled_conn: impl Fn(&'a Self, Conn) -> PooledConnection<'b, M>, ) -> Result, RunError> { loop { - let mut conn = { - let mut locked = self.inner.internals.lock(); - match locked.pop(&self.inner.statics) { - Some((conn, approvals)) => { - self.spawn_replenishing_approvals(approvals); - make_pooled_conn(self, conn) - } - None => break, + let mut conn = match self.inner.pop() { + Some((conn, approvals)) => { + self.spawn_replenishing_approvals(approvals); + make_pooled_conn(self, conn) } + None => break, }; if !self.inner.statics.test_on_check_out { diff --git a/bb8/src/internals.rs b/bb8/src/internals.rs index 0ed52ac..918ab65 100644 --- a/bb8/src/internals.rs +++ b/bb8/src/internals.rs @@ -31,6 +31,12 @@ where } } + pub(crate) fn pop(&self) -> Option<(Conn, ApprovalIter)> { + let mut locked = self.internals.lock(); + let idle = locked.conns.pop_front()?; + Some((idle.conn, locked.wanted(&self.statics))) + } + pub(crate) fn forward_error(&self, mut err: M::Error) { let mut locked = self.internals.lock(); while let Some(waiter) = locked.waiters.pop_front() { @@ -60,15 +66,6 @@ impl PoolInternals where M: ManageConnection, { - pub(crate) fn pop( - &mut self, - config: &Builder, - ) -> Option<(Conn, ApprovalIter)> { - self.conns - .pop_front() - .map(|idle| (idle.conn, self.wanted(config))) - } - pub(crate) fn put( &mut self, conn: Conn, From bcb652b144552d57386ce5cb9c8c95acffce1f28 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Fri, 26 Jan 2024 12:42:27 +0100 Subject: [PATCH 04/12] Use Notify to coordinate waiters --- bb8/Cargo.toml | 2 +- bb8/src/inner.rs | 26 ++++---------- bb8/src/internals.rs | 83 +++++++++----------------------------------- bb8/tests/test.rs | 2 +- 4 files changed, 25 insertions(+), 88 deletions(-) diff --git a/bb8/Cargo.toml b/bb8/Cargo.toml index 34b1964..2ad049b 100644 --- a/bb8/Cargo.toml +++ b/bb8/Cargo.toml @@ -14,7 +14,7 @@ async-trait = "0.1" futures-channel = "0.3.2" futures-util = { version = "0.3.2", default-features = false, features = ["channel"] } parking_lot = { version = "0.12", optional = true } -tokio = { version = "1.0", features = ["rt", "time"] } +tokio = { version = "1.0", features = ["rt", "sync", "time"] } [dev-dependencies] tokio = { version = "1.0", features = ["macros"] } diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index ad93cf4..5842324 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -4,7 +4,6 @@ use std::future::Future; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; -use futures_channel::oneshot; use futures_util::stream::{FuturesUnordered, StreamExt}; use futures_util::TryFutureExt; use tokio::spawn; @@ -111,12 +110,14 @@ where make_pooled_conn: impl Fn(&'a Self, Conn) -> PooledConnection<'b, M>, ) -> Result, RunError> { loop { - let mut conn = match self.inner.pop() { - Some((conn, approvals)) => { - self.spawn_replenishing_approvals(approvals); - make_pooled_conn(self, conn) + let (conn, approvals) = self.inner.pop(); + self.spawn_replenishing_approvals(approvals); + let mut conn = match conn { + Some(conn) => make_pooled_conn(self, conn), + None => { + self.inner.notify.notified().await; + continue; } - None => break, }; if !self.inner.statics.test_on_check_out { @@ -132,19 +133,6 @@ where } } } - - let (tx, rx) = oneshot::channel(); - { - let mut locked = self.inner.internals.lock(); - let approvals = locked.push_waiter(tx, &self.inner.statics); - self.spawn_replenishing_approvals(approvals); - }; - - match rx.await { - Ok(Ok(mut guard)) => Ok(make_pooled_conn(self, guard.extract())), - Ok(Err(e)) => Err(RunError::User(e)), - Err(_) => Err(RunError::TimedOut), - } } pub(crate) async fn connect(&self) -> Result { diff --git a/bb8/src/internals.rs b/bb8/src/internals.rs index 918ab65..8d6585c 100644 --- a/bb8/src/internals.rs +++ b/bb8/src/internals.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use std::time::Instant; use crate::{api::QueueStrategy, lock::Mutex}; -use futures_channel::oneshot; +use tokio::sync::Notify; use crate::api::{Builder, ManageConnection}; use std::collections::VecDeque; @@ -17,6 +17,7 @@ where pub(crate) statics: Builder, pub(crate) manager: M, pub(crate) internals: Mutex>, + pub(crate) notify: Arc, } impl SharedPool @@ -28,24 +29,22 @@ where statics, manager, internals: Mutex::new(PoolInternals::default()), + notify: Arc::new(Notify::new()), } } - pub(crate) fn pop(&self) -> Option<(Conn, ApprovalIter)> { + pub(crate) fn pop(&self) -> (Option>, ApprovalIter) { let mut locked = self.internals.lock(); - let idle = locked.conns.pop_front()?; - Some((idle.conn, locked.wanted(&self.statics))) + let conn = locked.conns.pop_front().map(|idle| idle.conn); + let approvals = match &conn { + Some(_) => locked.wanted(&self.statics), + None => locked.approvals(&self.statics, 1), + }; + + (conn, approvals) } - pub(crate) fn forward_error(&self, mut err: M::Error) { - let mut locked = self.internals.lock(); - while let Some(waiter) = locked.waiters.pop_front() { - match waiter.send(Err(err)) { - Ok(_) => return, - Err(Err(e)) => err = e, - Err(Ok(_)) => unreachable!(), - } - } + pub(crate) fn forward_error(&self, err: M::Error) { self.statics.error_sink.sink(err); } } @@ -56,7 +55,6 @@ pub(crate) struct PoolInternals where M: ManageConnection, { - waiters: VecDeque, M::Error>>>, conns: VecDeque>, num_conns: u32, pending_conns: u32, @@ -77,26 +75,14 @@ where self.num_conns += 1; } - let queue_strategy = pool.statics.queue_strategy; - - let mut guard = InternalsGuard::new(conn, pool); - while let Some(waiter) = self.waiters.pop_front() { - // This connection is no longer idle, send it back out - match waiter.send(Ok(guard)) { - Ok(()) => return, - Err(Ok(g)) => { - guard = g; - } - Err(Err(_)) => unreachable!(), - } - } - // Queue it in the idle queue - let conn = IdleConn::from(guard.conn.take().unwrap()); - match queue_strategy { + let conn = IdleConn::from(conn); + match pool.statics.queue_strategy { QueueStrategy::Fifo => self.conns.push_back(conn), QueueStrategy::Lifo => self.conns.push_front(conn), } + + pool.notify.notify_one(); } pub(crate) fn connect_failed(&mut self, _: Approval) { @@ -120,15 +106,6 @@ where self.approvals(config, wanted) } - pub(crate) fn push_waiter( - &mut self, - waiter: oneshot::Sender, M::Error>>, - config: &Builder, - ) -> ApprovalIter { - self.waiters.push_back(waiter); - self.approvals(config, 1) - } - fn approvals(&mut self, config: &Builder, num: u32) -> ApprovalIter { let current = self.num_conns + self.pending_conns; let allowed = if current < config.max_size { @@ -174,7 +151,6 @@ where { fn default() -> Self { Self { - waiters: VecDeque::new(), conns: VecDeque::new(), num_conns: 0, pending_conns: 0, @@ -182,33 +158,6 @@ where } } -pub(crate) struct InternalsGuard { - conn: Option>, - pool: Arc>, -} - -impl InternalsGuard { - fn new(conn: Conn, pool: Arc>) -> Self { - Self { - conn: Some(conn), - pool, - } - } - - pub(crate) fn extract(&mut self) -> Conn { - self.conn.take().unwrap() // safe: can only be `None` after `Drop` - } -} - -impl Drop for InternalsGuard { - fn drop(&mut self) { - if let Some(conn) = self.conn.take() { - let mut locked = self.pool.internals.lock(); - locked.put(conn, None, self.pool.clone()); - } - } -} - #[must_use] pub(crate) struct ApprovalIter { num: usize, diff --git a/bb8/tests/test.rs b/bb8/tests/test.rs index 70710a2..69853ed 100644 --- a/bb8/tests/test.rs +++ b/bb8/tests/test.rs @@ -282,7 +282,7 @@ async fn test_lazy_initialization_failure_no_retry() { .build_unchecked(manager); let res = pool.get().await; - assert_eq!(res.unwrap_err(), RunError::User(Error)); + assert_eq!(res.unwrap_err(), RunError::TimedOut); } #[tokio::test] From 26ae04bc6585482709171d44590c4902317a6024 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Fri, 26 Jan 2024 12:50:41 +0100 Subject: [PATCH 05/12] Add tests for deadlock --- bb8/tests/test.rs | 49 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/bb8/tests/test.rs b/bb8/tests/test.rs index 69853ed..43e233d 100644 --- a/bb8/tests/test.rs +++ b/bb8/tests/test.rs @@ -317,6 +317,55 @@ async fn test_get_timeout() { ready(r).await.unwrap(); } +#[tokio::test] +async fn test_lots_of_waiters() { + let pool = Pool::builder() + .max_size(3) + .connection_timeout(Duration::from_millis(5_000)) + .build(OkManager::::new()) + .await + .unwrap(); + + let mut waiters: Vec> = Vec::new(); + + for _ in 0..25000 { + let pool = pool.clone(); + let (tx, rx) = oneshot::channel(); + waiters.push(rx); + tokio::spawn(async move { + let _conn = pool.get().await.unwrap(); + tx.send(()).unwrap(); + }); + } + + let results = futures_util::future::join_all(&mut waiters).await; + + for result in results { + assert!(result.is_ok()); + } +} + +#[tokio::test] +async fn test_timeout_caller() { + let pool = Pool::builder() + .max_size(1) + .connection_timeout(Duration::from_millis(5_000)) + .build(OkManager::::new()) + .await + .unwrap(); + + let one = pool.get().await; + assert!(one.is_ok()); + + let res = tokio::time::timeout(Duration::from_millis(100), pool.get()).await; + assert!(res.is_err()); + + drop(one); + + let two = pool.get().await; + assert!(two.is_ok()); +} + #[tokio::test] async fn test_now_invalid() { static INVALID: AtomicBool = AtomicBool::new(false); From a3ae57c2eec04977823eb2c36d9d4bb0de0c0cb2 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Sun, 28 Jan 2024 10:05:45 +0100 Subject: [PATCH 06/12] Remove PoolInner::get_owned() --- bb8/src/api.rs | 17 ++++++----------- bb8/src/inner.rs | 16 ---------------- 2 files changed, 6 insertions(+), 27 deletions(-) diff --git a/bb8/src/api.rs b/bb8/src/api.rs index 548fd04..9ecad68 100644 --- a/bb8/src/api.rs +++ b/bb8/src/api.rs @@ -60,7 +60,10 @@ impl Pool { /// Using an owning `PooledConnection` makes it easier to leak the connection pool. Therefore, [`Pool::get`] /// (which stores a lifetime-bound reference to the pool) should be preferred whenever possible. pub async fn get_owned(&self) -> Result, RunError> { - self.inner.get_owned().await + Ok(PooledConnection { + conn: self.get().await?.take(), + pool: Cow::Owned(self.inner.clone()), + }) } /// Get a new dedicated connection that will not be managed by the pool. @@ -385,17 +388,9 @@ where pub(crate) fn drop_invalid(mut self) { let _ = self.conn.take(); } -} -impl PooledConnection<'static, M> -where - M: ManageConnection, -{ - pub(crate) fn new_owned(pool: PoolInner, conn: Conn) -> Self { - Self { - pool: Cow::Owned(pool), - conn: Some(conn), - } + pub(crate) fn take(&mut self) -> Option> { + self.conn.take() } } diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index 5842324..0c3bbb6 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -89,22 +89,6 @@ where } } - pub(crate) async fn get_owned( - &self, - ) -> Result, RunError> { - let future = self.make_pooled(|this, conn| { - let pool = PoolInner { - inner: Arc::clone(&this.inner), - }; - PooledConnection::new_owned(pool, conn) - }); - - match timeout(self.inner.statics.connection_timeout, future).await { - Ok(result) => result, - _ => Err(RunError::TimedOut), - } - } - pub(crate) async fn make_pooled<'a, 'b>( &'a self, make_pooled_conn: impl Fn(&'a Self, Conn) -> PooledConnection<'b, M>, From ba97724e1204c507e5f9555e81523945bc56c6a2 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Sun, 28 Jan 2024 10:17:08 +0100 Subject: [PATCH 07/12] Inline make_pooled() into get() --- bb8/src/inner.rs | 54 ++++++++++++++++++++++-------------------------- 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index 0c3bbb6..5922073 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -82,40 +82,36 @@ where } pub(crate) async fn get(&self) -> Result, RunError> { - let future = self.make_pooled(PooledConnection::new); - match timeout(self.inner.statics.connection_timeout, future).await { - Ok(result) => result, - _ => Err(RunError::TimedOut), - } - } + let future = async { + loop { + let (conn, approvals) = self.inner.pop(); + self.spawn_replenishing_approvals(approvals); + let mut conn = match conn { + Some(conn) => PooledConnection::new(self, conn), + None => { + self.inner.notify.notified().await; + continue; + } + }; - pub(crate) async fn make_pooled<'a, 'b>( - &'a self, - make_pooled_conn: impl Fn(&'a Self, Conn) -> PooledConnection<'b, M>, - ) -> Result, RunError> { - loop { - let (conn, approvals) = self.inner.pop(); - self.spawn_replenishing_approvals(approvals); - let mut conn = match conn { - Some(conn) => make_pooled_conn(self, conn), - None => { - self.inner.notify.notified().await; - continue; + if !self.inner.statics.test_on_check_out { + return Ok(conn); } - }; - if !self.inner.statics.test_on_check_out { - return Ok(conn); - } - - match self.inner.manager.is_valid(&mut conn).await { - Ok(()) => return Ok(conn), - Err(e) => { - self.inner.forward_error(e); - conn.drop_invalid(); - continue; + match self.inner.manager.is_valid(&mut conn).await { + Ok(()) => return Ok(conn), + Err(e) => { + self.inner.forward_error(e); + conn.drop_invalid(); + continue; + } } } + }; + + match timeout(self.inner.statics.connection_timeout, future).await { + Ok(result) => result, + _ => Err(RunError::TimedOut), } } From 87d66b162382ed4b80aa5842dce94023ee7d17ce Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Sun, 28 Jan 2024 10:40:35 +0100 Subject: [PATCH 08/12] Simplify reaper setup --- bb8/src/inner.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index 5922073..8167ba9 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -27,12 +27,9 @@ where let inner = Arc::new(SharedPool::new(builder, manager)); if inner.statics.max_lifetime.is_some() || inner.statics.idle_timeout.is_some() { - let s = Arc::downgrade(&inner); - if let Some(shared) = s.upgrade() { - let start = Instant::now() + shared.statics.reaper_rate; - let interval = interval_at(start.into(), shared.statics.reaper_rate); - schedule_reaping(interval, s); - } + let start = Instant::now() + inner.statics.reaper_rate; + let interval = interval_at(start.into(), inner.statics.reaper_rate); + schedule_reaping(interval, Arc::downgrade(&inner)); } Self { inner } From c388c25882146ecb28e3e091c44ebddadb5126dd Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Sun, 28 Jan 2024 10:43:53 +0100 Subject: [PATCH 09/12] Rewrite Reaper as a struct type --- bb8/src/inner.rs | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index 8167ba9..5dc2a23 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -29,7 +29,13 @@ where if inner.statics.max_lifetime.is_some() || inner.statics.idle_timeout.is_some() { let start = Instant::now() + inner.statics.reaper_rate; let interval = interval_at(start.into(), inner.statics.reaper_rate); - schedule_reaping(interval, Arc::downgrade(&inner)); + tokio::spawn( + Reaper { + interval, + pool: Arc::downgrade(&inner), + } + .run(), + ); } Self { inner } @@ -223,18 +229,20 @@ where } } -fn schedule_reaping(mut interval: Interval, weak_shared: Weak>) -where - M: ManageConnection, -{ - spawn(async move { +struct Reaper { + interval: Interval, + pool: Weak>, +} + +impl Reaper { + async fn run(mut self) { loop { - let _ = interval.tick().await; - if let Some(inner) = weak_shared.upgrade() { + let _ = self.interval.tick().await; + if let Some(inner) = self.pool.upgrade() { PoolInner { inner }.reap(); } else { break; } } - }); + } } From 4dad65bfecfa0c1e282f19f5d04564a37fdfa489 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Sun, 28 Jan 2024 10:47:49 +0100 Subject: [PATCH 10/12] Inline PoolInner::reap() --- bb8/src/inner.rs | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index 5dc2a23..c0478ab 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -149,12 +149,6 @@ where self.inner.internals.lock().state() } - fn reap(&self) { - let mut internals = self.inner.internals.lock(); - let approvals = internals.reap(&self.inner.statics); - self.spawn_replenishing_approvals(approvals); - } - // Outside of Pool to avoid borrow splitting issues on self async fn add_connection(&self, approval: Approval) -> Result<(), M::Error> where @@ -238,11 +232,16 @@ impl Reaper { async fn run(mut self) { loop { let _ = self.interval.tick().await; - if let Some(inner) = self.pool.upgrade() { - PoolInner { inner }.reap(); - } else { - break; - } + let pool = match self.pool.upgrade() { + Some(inner) => PoolInner { inner }, + None => break, + }; + + let mut locked = pool.inner.internals.lock(); + let approvals = locked.reap(&pool.inner.statics); + drop(locked); + + pool.spawn_replenishing_approvals(approvals); } } } From 96434a76caec292ee688850983917e60fd5ffd49 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Sun, 28 Jan 2024 10:51:43 +0100 Subject: [PATCH 11/12] Move reap() logic into SharedPool --- bb8/src/inner.rs | 5 +---- bb8/src/internals.rs | 5 +++++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/bb8/src/inner.rs b/bb8/src/inner.rs index c0478ab..7f22516 100644 --- a/bb8/src/inner.rs +++ b/bb8/src/inner.rs @@ -237,10 +237,7 @@ impl Reaper { None => break, }; - let mut locked = pool.inner.internals.lock(); - let approvals = locked.reap(&pool.inner.statics); - drop(locked); - + let approvals = pool.inner.reap(); pool.spawn_replenishing_approvals(approvals); } } diff --git a/bb8/src/internals.rs b/bb8/src/internals.rs index 8d6585c..9794453 100644 --- a/bb8/src/internals.rs +++ b/bb8/src/internals.rs @@ -44,6 +44,11 @@ where (conn, approvals) } + pub(crate) fn reap(&self) -> ApprovalIter { + let mut locked = self.internals.lock(); + locked.reap(&self.statics) + } + pub(crate) fn forward_error(&self, err: M::Error) { self.statics.error_sink.sink(err); } From 7f379146691d224a989f5f10e1df99dcf23112b9 Mon Sep 17 00:00:00 2001 From: Dirkjan Ochtman Date: Fri, 26 Jan 2024 14:46:03 +0100 Subject: [PATCH 12/12] Bump version number to 0.8.2 --- bb8/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bb8/Cargo.toml b/bb8/Cargo.toml index 2ad049b..ce370ef 100644 --- a/bb8/Cargo.toml +++ b/bb8/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "bb8" -version = "0.8.1" +version = "0.8.2" edition = "2021" rust-version = "1.63" description = "Full-featured async (tokio-based) connection pool (like r2d2)"