diff --git a/http/src/main.rs b/http/src/main.rs index 8b14a10e3..86b58da20 100644 --- a/http/src/main.rs +++ b/http/src/main.rs @@ -147,6 +147,9 @@ fn main() { let api_link_file = home.join("api"); let (addr, server) = serve(&ipfs); + // shutdown future will handle signalling the exit + drop(ipfs); + let api_multiaddr = format!("/ip4/{}/tcp/{}", addr.ip(), addr.port()); // this file is looked for when js-ipfsd-ctl checks optimistically if the IPFS_PATH has a @@ -167,8 +170,6 @@ fn main() { .await .map_err(|e| eprintln!("Failed to truncate {:?}: {}", api_link_file, e)); } - - ipfs.exit_daemon().await; }); } @@ -182,7 +183,10 @@ fn serve( let routes = v0::routes(ipfs, shutdown_tx); let routes = routes.with(warp::log("rust-ipfs-http-v0")); + let ipfs = ipfs.clone(); + warp::serve(routes).bind_with_graceful_shutdown(([127, 0, 0, 1], 0), async move { shutdown_rx.next().await; + ipfs.exit_daemon().await; }) } diff --git a/src/lib.rs b/src/lib.rs index df2b3cebe..ed076ca4a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -379,7 +379,7 @@ impl Ipfs { .send(IpfsEvent::Connect(addr, tx)) .await?; let subscription = rx.await?; - subscription.await.map_err(|e| format_err!("{}", e)) + subscription.await?.map_err(|e| format_err!("{}", e)) } pub async fn addrs(&self) -> Result)>, Error> { @@ -491,6 +491,10 @@ impl Ipfs { /// Exit daemon. pub async fn exit_daemon(mut self) { + // FIXME: this is a stopgap measure needed while repo is part of the struct Ipfs instead of + // the background task or stream. After that this could be handled by dropping. + self.repo.shutdown().await; + // ignoring the error because it'd mean that the background task would had already been // dropped let _ = self.to_task.send(IpfsEvent::Exit).await; diff --git a/src/repo/mod.rs b/src/repo/mod.rs index 9441f7cf0..af1e181d9 100644 --- a/src/repo/mod.rs +++ b/src/repo/mod.rs @@ -107,6 +107,12 @@ impl Repo { ) } + /// Shutdowns the repo, cancelling any pending subscriptions; Likely going away after some + /// refactoring, see notes on [`Ipfs::exit_daemon`]. + pub async fn shutdown(&self) { + self.subscriptions.lock().await.shutdown(); + } + pub async fn init(&self) -> Result<(), Error> { let f1 = self.block_store.init(); let f2 = self.data_store.init(); @@ -163,7 +169,7 @@ impl Repo { .send(RepoEvent::WantBlock(cid.clone())) .await .ok(); - Ok(subscription.await) + Ok(subscription.await?) } } diff --git a/src/subscription.rs b/src/subscription.rs index b0de9a490..66ac218e8 100644 --- a/src/subscription.rs +++ b/src/subscription.rs @@ -7,20 +7,37 @@ use std::collections::HashMap; use std::fmt; use std::sync::{Arc, Mutex}; -#[derive(Debug)] pub struct SubscriptionRegistry { subscriptions: HashMap>>>, + cancelled: bool, +} + +impl fmt::Debug for SubscriptionRegistry { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!( + fmt, + "{}<{}, {}>(subscriptions: {:?})", + std::any::type_name::(), + std::any::type_name::(), + std::any::type_name::(), + self.subscriptions + ) + } } impl SubscriptionRegistry { pub fn new() -> Self { Self { subscriptions: Default::default(), + cancelled: false, } } pub fn create_subscription(&mut self, req: TReq) -> SubscriptionFuture { let subscription = self.subscriptions.entry(req).or_default().clone(); + if self.cancelled { + subscription.lock().unwrap().cancel(); + } SubscriptionFuture { subscription } } @@ -29,6 +46,50 @@ impl SubscriptionRegistry { subscription.lock().unwrap().wake(res); } } + + /// After shutdown all SubscriptionFutures will return Err(Cancelled) + pub fn shutdown(&mut self) { + if self.cancelled { + return; + } + self.cancelled = true; + + log::debug!("Shutting down {:?}", self); + + let mut cancelled = 0; + let mut pending = Vec::new(); + + for (_, sub) in self.subscriptions.drain() { + { + if let Ok(mut sub) = sub.try_lock() { + sub.cancel(); + cancelled += 1; + continue; + } + } + pending.push(sub); + } + + log::trace!( + "Cancelled {} subscriptions and {} are pending (not immediatedly locked)", + cancelled, + pending.len() + ); + + let remaining = pending.len(); + + for sub in pending { + if let Ok(mut sub) = sub.lock() { + sub.cancel(); + } + } + + log::debug!( + "Remaining {} pending subscriptions cancelled (total of {})", + remaining, + cancelled + remaining + ); + } } impl Default for SubscriptionRegistry { @@ -37,10 +98,45 @@ impl Default for SubscriptionRegistry Drop for SubscriptionRegistry { + fn drop(&mut self) { + self.shutdown(); + } +} + +/// Subscription and it's linked SubscriptionFutures were cancelled before completion. +#[derive(Debug, PartialEq, Eq)] +pub struct Cancelled; + +impl fmt::Display for Cancelled { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!(fmt, "{:?}", self) + } +} + +impl std::error::Error for Cancelled {} + pub struct Subscription { result: Option, wakers: Vec, + cancelled: bool, +} + +impl fmt::Debug for Subscription { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + write!( + fmt, + "Subscription<{}>(result: {}, wakers: {}, cancelled: {})", + std::any::type_name::(), + if self.result.is_some() { + "Some(_)" + } else { + "None" + }, + self.wakers.len(), + self.cancelled + ) + } } impl Subscription { @@ -48,6 +144,7 @@ impl Subscription { Self { result: Default::default(), wakers: Default::default(), + cancelled: false, } } @@ -61,6 +158,20 @@ impl Subscription { waker.wake(); } } + + pub fn cancel(&mut self) { + if self.cancelled { + return; + } + self.cancelled = true; + for waker in self.wakers.drain(..) { + waker.wake(); + } + } + + pub fn is_cancelled(&self) -> bool { + self.cancelled + } } impl Subscription { @@ -80,12 +191,16 @@ pub struct SubscriptionFuture { } impl Future for SubscriptionFuture { - type Output = TResult; + type Output = Result; fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll { let mut subscription = self.subscription.lock().unwrap(); + if subscription.is_cancelled() { + return Poll::Ready(Err(Cancelled)); + } + if let Some(result) = subscription.result() { - Poll::Ready(result) + Poll::Ready(Ok(result)) } else { subscription.add_waker(context.waker().clone()); Poll::Pending @@ -97,7 +212,7 @@ impl fmt::Debug for SubscriptionFuture { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!( fmt, - "SubscriptionFuture<{}>", + "SubscriptionFuture>", std::any::type_name::() ) } @@ -113,7 +228,49 @@ mod tests { let s1 = registry.create_subscription(0); let s2 = registry.create_subscription(0); registry.finish_subscription(&0, 10); - assert_eq!(s1.await, 10); - assert_eq!(s2.await, 10); + assert_eq!(s1.await.unwrap(), 10); + assert_eq!(s2.await.unwrap(), 10); + } + + #[async_std::test] + async fn subscription_cancelled_on_dropping_registry() { + let mut registry = SubscriptionRegistry::::new(); + let s1 = registry.create_subscription(0); + drop(registry); + s1.await.unwrap_err(); + } + + #[async_std::test] + async fn subscription_cancelled_on_shutdown() { + let mut registry = SubscriptionRegistry::::new(); + let s1 = registry.create_subscription(0); + registry.shutdown(); + s1.await.unwrap_err(); + } + + #[async_std::test] + async fn new_subscriptions_cancelled_after_shutdown() { + let mut registry = SubscriptionRegistry::::new(); + registry.shutdown(); + let s1 = registry.create_subscription(0); + s1.await.unwrap_err(); + } + + #[async_std::test] + async fn dropping_subscription_future_after_registering() { + use async_std::future::timeout; + use std::time::Duration; + + let mut registry = SubscriptionRegistry::::new(); + let s1 = timeout(Duration::from_millis(1), registry.create_subscription(0)); + let s2 = registry.create_subscription(0); + + // make sure it timeouted but had time to register the waker + drop(s1.await.unwrap_err()); + + // this will cause a call to waker installed by s1, but it shouldn't be a problem. + registry.finish_subscription(&0, 0); + + assert_eq!(s2.await.unwrap(), 0); } }