Skip to content

Fix race condition on wait() #501

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 28 additions & 39 deletions http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use parking_lot::Mutex;
use crate::jsonrpc::futures::sync::oneshot;
use crate::jsonrpc::futures::{self, Future, Stream};
use crate::jsonrpc::MetaIoHandler;
use crate::server_utils::reactor::{Executor, UninitializedExecutor};
use crate::server_utils::reactor::{Executor, ExecutorCloseHandle, UninitializedExecutor};
use hyper::{server, Body};
use jsonrpc_core as jsonrpc;

Expand Down Expand Up @@ -386,12 +386,11 @@ impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {

let (local_addr_tx, local_addr_rx) = mpsc::channel();
let (close, shutdown_signal) = oneshot::channel();
let (done_tx, done_rx) = oneshot::channel();
let eloop = self.executor.init_with_name("http.worker0")?;
let req_max_size = self.max_request_body_size;
// The first threads `Executor` is initialised differently from the others
serve(
(shutdown_signal, local_addr_tx, done_tx),
(shutdown_signal, local_addr_tx),
eloop.executor(),
addr.to_owned(),
cors_domains.clone(),
Expand All @@ -410,10 +409,9 @@ impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
.map(|i| {
let (local_addr_tx, local_addr_rx) = mpsc::channel();
let (close, shutdown_signal) = oneshot::channel();
let (done_tx, done_rx) = oneshot::channel();
let eloop = UninitializedExecutor::Unspawned.init_with_name(format!("http.worker{}", i + 1))?;
serve(
(shutdown_signal, local_addr_tx, done_tx),
(shutdown_signal, local_addr_tx),
eloop.executor(),
addr.to_owned(),
cors_domains.clone(),
Expand All @@ -428,34 +426,34 @@ impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
reuse_port,
req_max_size,
);
Ok((eloop, close, local_addr_rx, done_rx))
Ok((eloop, close, local_addr_rx))
})
.collect::<io::Result<Vec<_>>>()?;

// Wait for server initialization
let local_addr = recv_address(local_addr_rx);
// Wait for other threads as well.
let mut handles: Vec<(Executor, oneshot::Sender<()>, oneshot::Receiver<()>)> = handles
let mut handles: Vec<(Executor, oneshot::Sender<()>)> = handles
.into_iter()
.map(|(eloop, close, local_addr_rx, done_rx)| {
.map(|(eloop, close, local_addr_rx)| {
let _ = recv_address(local_addr_rx)?;
Ok((eloop, close, done_rx))
Ok((eloop, close))
})
.collect::<io::Result<(Vec<_>)>>()?;
handles.push((eloop, close, done_rx));
handles.push((eloop, close));

let (executors, done_rxs) = handles
let (inner_handles, executors) = handles
.into_iter()
.fold((vec![], vec![]), |mut acc, (eloop, closer, done_rx)| {
acc.0.push((eloop, closer));
acc.1.push(done_rx);
.fold((vec![], vec![]), |mut acc, (eloop, closer)| {
acc.0.push((eloop.close_handle(), closer));
acc.1.push(eloop);
acc
});

Ok(Server {
address: local_addr?,
executors: Arc::new(Mutex::new(Some(executors))),
done: Some(done_rxs),
executors,
close_handle: CloseHandle(Arc::new(Mutex::new(inner_handles))),
})
}
}
Expand All @@ -467,11 +465,7 @@ fn recv_address(local_addr_rx: mpsc::Receiver<io::Result<SocketAddr>>) -> io::Re
}

fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
signals: (
oneshot::Receiver<()>,
mpsc::Sender<io::Result<SocketAddr>>,
oneshot::Sender<()>,
),
signals: (oneshot::Receiver<()>, mpsc::Sender<io::Result<SocketAddr>>),
executor: tokio::runtime::TaskExecutor,
addr: SocketAddr,
cors_domains: CorsDomains,
Expand All @@ -486,7 +480,7 @@ fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
reuse_port: bool,
max_request_body_size: usize,
) {
let (shutdown_signal, local_addr_tx, done_tx) = signals;
let (shutdown_signal, local_addr_tx) = signals;
executor.spawn({
let handle = tokio::reactor::Handle::default();

Expand Down Expand Up @@ -565,7 +559,7 @@ fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
}))
.map_err(|_| ())
})
.and_then(|_| done_tx.send(()))
.and_then(|_| futures::future::ok(()))
});
}

Expand All @@ -587,27 +581,24 @@ fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> {

/// Handle used to close the server. Can be cloned and passed around to different threads and be used
/// to close a server that is `wait()`ing.

#[derive(Clone)]
pub struct CloseHandle(Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>);
#[derive(Debug, Clone)]
pub struct CloseHandle(Arc<Mutex<Vec<(ExecutorCloseHandle, oneshot::Sender<()>)>>>);

impl CloseHandle {
/// Shutdown a running server
pub fn close(self) {
if let Some(executors) = self.0.lock().take() {
for (executor, closer) in executors {
executor.close();
let _ = closer.send(());
}
for (inner_handle, closer) in self.0.lock().drain(..) {
inner_handle.close();
let _ = closer.send(());
}
}
}

/// jsonrpc http server instance
pub struct Server {
address: SocketAddr,
executors: Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>,
done: Option<Vec<oneshot::Receiver<()>>>,
executors: Vec<Executor>,
close_handle: CloseHandle,
}

impl Server {
Expand All @@ -623,22 +614,20 @@ impl Server {

/// Will block, waiting for the server to finish.
pub fn wait(mut self) {
if let Some(receivers) = self.done.take() {
for receiver in receivers {
let _ = receiver.wait();
}
for executor in self.executors.drain(..) {
executor.wait();
}
}

/// Get a handle that allows us to close the server from a different thread and/or while the
/// server is `wait()`ing.
pub fn close_handle(&self) -> CloseHandle {
CloseHandle(self.executors.clone())
self.close_handle.clone()
}
}

impl Drop for Server {
fn drop(&mut self) {
self.close_handle().close();
self.close_handle().close()
}
}
63 changes: 23 additions & 40 deletions ipc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
let outgoing_separator = self.outgoing_separator;
let (stop_signal, stop_receiver) = oneshot::channel();
let (start_signal, start_receiver) = oneshot::channel();
let (wait_signal, wait_receiver) = oneshot::channel();
let security_attributes = self.security_attributes;
let client_buffer_size = self.client_buffer_size;

Expand Down Expand Up @@ -240,23 +239,21 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
.buffer_unordered(1024)
.for_each(|_| Ok(()))
.select(stop)
.map(|_| {
let _ = wait_signal.send(());
})
.map_err(|_| ()),
.map_err(|_| ())
.and_then(|_| future::ok(())),
)
}));

let handle = InnerHandles {
executor: Some(executor),
stop: Some(stop_signal),
let close_handle = CloseHandle {
inner_handle: executor.close_handle(),
stop: Arc::new(Mutex::new(Some(stop_signal))),
path: path.to_owned(),
};

match start_receiver.wait().expect("Message should always be sent") {
Ok(()) => Ok(Server {
handles: Arc::new(Mutex::new(handle)),
wait_handle: Some(wait_receiver),
executor: Some(executor),
close_handle,
}),
Err(e) => Err(e),
}
Expand All @@ -266,61 +263,47 @@ impl<M: Metadata, S: Middleware<M>> ServerBuilder<M, S> {
/// IPC Server handle
#[derive(Debug)]
pub struct Server {
handles: Arc<Mutex<InnerHandles>>,
wait_handle: Option<oneshot::Receiver<()>>,
executor: Option<reactor::Executor>,
close_handle: CloseHandle,
}

impl Server {
/// Closes the server (waits for finish)
/// Closes the server
pub fn close(self) {
self.handles.lock().close();
self.close_handle().close();
}

/// Creates a close handle that can be used to stop the server remotely
pub fn close_handle(&self) -> CloseHandle {
CloseHandle {
inner: self.handles.clone(),
}
self.close_handle.clone()
}

/// Wait for the server to finish
pub fn wait(mut self) {
self.wait_handle.take().map(|wait_receiver| wait_receiver.wait());
}
}

#[derive(Debug)]
struct InnerHandles {
executor: Option<reactor::Executor>,
stop: Option<oneshot::Sender<()>>,
path: String,
}

impl InnerHandles {
pub fn close(&mut self) {
let _ = self.stop.take().map(|stop| stop.send(()));
if let Some(executor) = self.executor.take() {
executor.close()
}
let _ = ::std::fs::remove_file(&self.path); // ignore error, file could have been gone somewhere
self.executor.take().map(|wait_receiver| wait_receiver.wait());
}
}

impl Drop for InnerHandles {
impl Drop for Server {
fn drop(&mut self) {
self.close();
self.close_handle().close();
}
}

/// `CloseHandle` allows one to stop an `IpcServer` remotely.
#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct CloseHandle {
inner: Arc<Mutex<InnerHandles>>,
inner_handle: reactor::ExecutorCloseHandle,
stop: Arc<Mutex<Option<oneshot::Sender<()>>>>,
path: String,
}

impl CloseHandle {
/// `close` closes the corresponding `IpcServer` instance.
pub fn close(self) {
self.inner.lock().close();
let _ = self.stop.lock().take().map(|stop| stop.send(()));
self.inner_handle.close();
let _ = ::std::fs::remove_file(&self.path); // ignore error, file could have been gone somewhere
}
}

Expand Down
1 change: 1 addition & 0 deletions server-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ log = "0.4"
tokio = { version = "0.1.15" }
tokio-codec = { version = "0.1" }
unicase = "2.0"
parking_lot = "0.9"

[badges]
travis-ci = { repository = "paritytech/jsonrpc", branch = "master"}
Loading