Skip to content
This repository was archived by the owner on Oct 23, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions http/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ fn main() {
let api_link_file = home.join("api");
let (addr, server) = serve(&ipfs);

// shutdown future will handle signalling the exit
drop(ipfs);

let api_multiaddr = format!("/ip4/{}/tcp/{}", addr.ip(), addr.port());

// this file is looked for when js-ipfsd-ctl checks optimistically if the IPFS_PATH has a
Expand All @@ -167,8 +170,6 @@ fn main() {
.await
.map_err(|e| eprintln!("Failed to truncate {:?}: {}", api_link_file, e));
}

ipfs.exit_daemon().await;
});
}

Expand All @@ -182,7 +183,10 @@ fn serve<Types: IpfsTypes>(
let routes = v0::routes(ipfs, shutdown_tx);
let routes = routes.with(warp::log("rust-ipfs-http-v0"));

let ipfs = ipfs.clone();

warp::serve(routes).bind_with_graceful_shutdown(([127, 0, 0, 1], 0), async move {
shutdown_rx.next().await;
ipfs.exit_daemon().await;
})
}
6 changes: 5 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ impl<Types: IpfsTypes> Ipfs<Types> {
.send(IpfsEvent::Connect(addr, tx))
.await?;
let subscription = rx.await?;
subscription.await.map_err(|e| format_err!("{}", e))
subscription.await?.map_err(|e| format_err!("{}", e))
}

pub async fn addrs(&self) -> Result<Vec<(PeerId, Vec<Multiaddr>)>, Error> {
Expand Down Expand Up @@ -491,6 +491,10 @@ impl<Types: IpfsTypes> Ipfs<Types> {

/// Exit daemon.
pub async fn exit_daemon(mut self) {
// FIXME: this is a stopgap measure needed while repo is part of the struct Ipfs instead of
// the background task or stream. After that this could be handled by dropping.
self.repo.shutdown().await;

// ignoring the error because it'd mean that the background task would had already been
// dropped
let _ = self.to_task.send(IpfsEvent::Exit).await;
Expand Down
8 changes: 7 additions & 1 deletion src/repo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
)
}

/// Shutdowns the repo, cancelling any pending subscriptions; Likely going away after some
/// refactoring, see notes on [`Ipfs::exit_daemon`].
pub async fn shutdown(&self) {
self.subscriptions.lock().await.shutdown();
}

pub async fn init(&self) -> Result<(), Error> {
let f1 = self.block_store.init();
let f2 = self.data_store.init();
Expand Down Expand Up @@ -163,7 +169,7 @@ impl<TRepoTypes: RepoTypes> Repo<TRepoTypes> {
.send(RepoEvent::WantBlock(cid.clone()))
.await
.ok();
Ok(subscription.await)
Ok(subscription.await?)
}
}

Expand Down
171 changes: 164 additions & 7 deletions src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,37 @@ use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Mutex};

#[derive(Debug)]
pub struct SubscriptionRegistry<TReq: Debug + Eq + Hash, TRes: Debug> {
subscriptions: HashMap<TReq, Arc<Mutex<Subscription<TRes>>>>,
cancelled: bool,
}

impl<TReq: Debug + Eq + Hash, TRes: Debug> fmt::Debug for SubscriptionRegistry<TReq, TRes> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(
fmt,
"{}<{}, {}>(subscriptions: {:?})",
std::any::type_name::<Self>(),
std::any::type_name::<TReq>(),
std::any::type_name::<TRes>(),
self.subscriptions
)
}
}

impl<TReq: Debug + Eq + Hash, TRes: Debug> SubscriptionRegistry<TReq, TRes> {
pub fn new() -> Self {
Self {
subscriptions: Default::default(),
cancelled: false,
}
}

pub fn create_subscription(&mut self, req: TReq) -> SubscriptionFuture<TRes> {
let subscription = self.subscriptions.entry(req).or_default().clone();
if self.cancelled {
subscription.lock().unwrap().cancel();
}
SubscriptionFuture { subscription }
}

Expand All @@ -29,6 +46,50 @@ impl<TReq: Debug + Eq + Hash, TRes: Debug> SubscriptionRegistry<TReq, TRes> {
subscription.lock().unwrap().wake(res);
}
}

/// After shutdown all SubscriptionFutures will return Err(Cancelled)
pub fn shutdown(&mut self) {
if self.cancelled {
return;
}
self.cancelled = true;

log::debug!("Shutting down {:?}", self);

let mut cancelled = 0;
let mut pending = Vec::new();

for (_, sub) in self.subscriptions.drain() {
{
if let Ok(mut sub) = sub.try_lock() {
sub.cancel();
cancelled += 1;
continue;
}
}
pending.push(sub);
}

log::trace!(
"Cancelled {} subscriptions and {} are pending (not immediatedly locked)",
cancelled,
pending.len()
);

let remaining = pending.len();

for sub in pending {
if let Ok(mut sub) = sub.lock() {
sub.cancel();
}
}

log::debug!(
"Remaining {} pending subscriptions cancelled (total of {})",
remaining,
cancelled + remaining
);
}
}

impl<TReq: Debug + Eq + Hash, TRes: Debug> Default for SubscriptionRegistry<TReq, TRes> {
Expand All @@ -37,17 +98,53 @@ impl<TReq: Debug + Eq + Hash, TRes: Debug> Default for SubscriptionRegistry<TReq
}
}

#[derive(Debug)]
impl<TReq: Debug + Eq + Hash, TRes: Debug> Drop for SubscriptionRegistry<TReq, TRes> {
fn drop(&mut self) {
self.shutdown();
}
}

/// Subscription and it's linked SubscriptionFutures were cancelled before completion.
#[derive(Debug, PartialEq, Eq)]
pub struct Cancelled;

impl fmt::Display for Cancelled {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "{:?}", self)
}
}

impl std::error::Error for Cancelled {}

pub struct Subscription<TResult> {
result: Option<TResult>,
wakers: Vec<Waker>,
cancelled: bool,
}

impl<TResult> fmt::Debug for Subscription<TResult> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(
fmt,
"Subscription<{}>(result: {}, wakers: {}, cancelled: {})",
std::any::type_name::<TResult>(),
if self.result.is_some() {
"Some(_)"
} else {
"None"
},
self.wakers.len(),
self.cancelled
)
}
}

impl<TResult> Subscription<TResult> {
pub fn new() -> Self {
Self {
result: Default::default(),
wakers: Default::default(),
cancelled: false,
}
}

Expand All @@ -61,6 +158,20 @@ impl<TResult> Subscription<TResult> {
waker.wake();
}
}

pub fn cancel(&mut self) {
if self.cancelled {
return;
}
self.cancelled = true;
for waker in self.wakers.drain(..) {
waker.wake();
}
}

pub fn is_cancelled(&self) -> bool {
self.cancelled
}
}

impl<TResult: Clone> Subscription<TResult> {
Expand All @@ -80,12 +191,16 @@ pub struct SubscriptionFuture<TResult> {
}

impl<TResult: Clone> Future for SubscriptionFuture<TResult> {
type Output = TResult;
type Output = Result<TResult, Cancelled>;

fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<Self::Output> {
let mut subscription = self.subscription.lock().unwrap();
if subscription.is_cancelled() {
return Poll::Ready(Err(Cancelled));
}

if let Some(result) = subscription.result() {
Poll::Ready(result)
Poll::Ready(Ok(result))
} else {
subscription.add_waker(context.waker().clone());
Poll::Pending
Expand All @@ -97,7 +212,7 @@ impl<TResult> fmt::Debug for SubscriptionFuture<TResult> {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(
fmt,
"SubscriptionFuture<{}>",
"SubscriptionFuture<Output = Result<{}, Cancelled>>",
std::any::type_name::<TResult>()
)
}
Expand All @@ -113,7 +228,49 @@ mod tests {
let s1 = registry.create_subscription(0);
let s2 = registry.create_subscription(0);
registry.finish_subscription(&0, 10);
assert_eq!(s1.await, 10);
assert_eq!(s2.await, 10);
assert_eq!(s1.await.unwrap(), 10);
assert_eq!(s2.await.unwrap(), 10);
}

#[async_std::test]
async fn subscription_cancelled_on_dropping_registry() {
let mut registry = SubscriptionRegistry::<u32, u32>::new();
let s1 = registry.create_subscription(0);
drop(registry);
s1.await.unwrap_err();
}

#[async_std::test]
async fn subscription_cancelled_on_shutdown() {
let mut registry = SubscriptionRegistry::<u32, u32>::new();
let s1 = registry.create_subscription(0);
registry.shutdown();
s1.await.unwrap_err();
}

#[async_std::test]
async fn new_subscriptions_cancelled_after_shutdown() {
let mut registry = SubscriptionRegistry::<u32, u32>::new();
registry.shutdown();
let s1 = registry.create_subscription(0);
s1.await.unwrap_err();
}

#[async_std::test]
async fn dropping_subscription_future_after_registering() {
use async_std::future::timeout;
use std::time::Duration;

let mut registry = SubscriptionRegistry::<u32, u32>::new();
let s1 = timeout(Duration::from_millis(1), registry.create_subscription(0));
let s2 = registry.create_subscription(0);

// make sure it timeouted but had time to register the waker
drop(s1.await.unwrap_err());

// this will cause a call to waker installed by s1, but it shouldn't be a problem.
registry.finish_subscription(&0, 0);

assert_eq!(s2.await.unwrap(), 0);
}
}