Skip to content

Commit 0ec79dc

Browse files
committed
feat(server): intro struct ServerFuture and impl Future for it
1. add method `Server::shutdown_signal(F)` which produce ServerFuture 2. impl `Future` for `ServerFuture` 3. update example `multi_server` Note: This commit is a draft, the error handling inside Future::poll is an unsolved problem, which should be discussed.
1 parent 0388020 commit 0ec79dc

File tree

2 files changed

+96
-5
lines changed

2 files changed

+96
-5
lines changed

examples/multi_server.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ fn main() {
7676
println!("Listening on http://{}", srv1.local_addr().unwrap());
7777
println!("Listening on http://{}", srv2.local_addr().unwrap());
7878

79-
handle.spawn(srv1);
80-
handle.spawn(srv2);
79+
handle.spawn(srv1.shutdown_signal(futures::future::empty::<(), ()>()));
80+
handle.spawn(srv2.shutdown_signal(futures::future::empty::<(), ()>()));
8181
core.run(futures::future::empty::<(), ()>()).unwrap();
8282
}

src/server/mod.rs

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,9 @@ use std::net::SocketAddr;
1616
use std::rc::{Rc, Weak};
1717
use std::time::Duration;
1818

19-
use futures::future;
2019
use futures::task::{self, Task};
20+
use futures::future::{self, Select, Map};
2121
use futures::{Future, Stream, Poll, Async, Sink, StartSend, AsyncSink};
22-
use futures::future::Map;
2322

2423
#[cfg(feature = "compat")]
2524
use http;
@@ -88,6 +87,17 @@ where B: Stream<Error=::Error>,
8887
shutdown_timeout: Duration,
8988
}
9089

90+
/// The Future of an Server.
91+
pub struct ServerFuture<F, S, B>
92+
where B: Stream<Error=::Error>,
93+
B::Item: AsRef<[u8]>,
94+
{
95+
server: Server<S, B>,
96+
info: Rc<RefCell<Info>>,
97+
shutdown_signal: F,
98+
shutdown: Option<Select<WaitUntilZero, Timeout>>,
99+
}
100+
91101
impl<B: AsRef<[u8]> + 'static> Http<B> {
92102
/// Creates a new instance of the HTTP protocol, ready to spawn a server or
93103
/// start accepting connections.
@@ -464,6 +474,21 @@ impl<S, B> Server<S, B>
464474
self
465475
}
466476

477+
/// Configure the `shutdown_signal`.
478+
pub fn shutdown_signal<F>(self, signal: F) -> ServerFuture<F, S, B>
479+
where F: Future<Item = (), Error = ()>
480+
{
481+
ServerFuture {
482+
server: self,
483+
info: Rc::new(RefCell::new(Info {
484+
active: 0,
485+
blocker: None,
486+
})),
487+
shutdown_signal: signal,
488+
shutdown: None,
489+
}
490+
}
491+
467492
/// Execute this server infinitely.
468493
///
469494
/// This method does not currently return, but it will return an error if
@@ -578,19 +603,85 @@ impl<S, B> Future for Server<S, B>
578603
}
579604
}
580605

606+
impl<F, S, B> Future for ServerFuture<F, S, B>
607+
where F: Future<Item = (), Error = ()>,
608+
S: NewService<Request = Request, Response = Response<B>, Error = ::Error> + 'static,
609+
B: Stream<Error=::Error> + 'static,
610+
B::Item: AsRef<[u8]>,
611+
{
612+
type Item = ();
613+
type Error = ();
614+
615+
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
616+
loop {
617+
if let Some(ref mut shutdown) = self.shutdown {
618+
match shutdown.poll() {
619+
Ok(Async::Ready(_)) => return Ok(Async::Ready(())),
620+
Ok(Async::NotReady) => return Ok(Async::NotReady),
621+
Err((e, _)) => debug!("internal error: {:?}", e),
622+
}
623+
} else if let Ok(Async::Ready(())) = self.shutdown_signal.poll() {
624+
match Timeout::new(self.server.shutdown_timeout, &self.server.handle()) {
625+
Ok(timeout) => {
626+
let wait = WaitUntilZero { info: self.info.clone() };
627+
self.shutdown = Some(wait.select(timeout))
628+
},
629+
Err(e) => debug!("internal error: {:?}", e),
630+
}
631+
} else {
632+
match self.server.listener.accept() {
633+
Ok((socket, addr)) => {
634+
match self.server.new_service.new_service() {
635+
Ok(inner_srv) => {
636+
let srv = NotifyService {
637+
inner: inner_srv,
638+
info: Rc::downgrade(&self.info),
639+
};
640+
self.info.borrow_mut().active += 1;
641+
self.server.protocol.bind_connection(&self.server.handle(),
642+
socket,
643+
addr,
644+
srv)
645+
},
646+
Err(e) => debug!("internal error: {:?}", e),
647+
}
648+
},
649+
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
650+
Err(e) => debug!("internal error: {:?}", e),
651+
}
652+
}
653+
}
654+
}
655+
}
656+
657+
581658
impl<S: fmt::Debug, B: Stream<Error=::Error>> fmt::Debug for Server<S, B>
582659
where B::Item: AsRef<[u8]>
583660
{
584661
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
585662
f.debug_struct("Server")
586-
.field("core", &"...")
663+
.field("reactor", &"...")
587664
.field("listener", &self.listener)
588665
.field("new_service", &self.new_service)
589666
.field("protocol", &self.protocol)
590667
.finish()
591668
}
592669
}
593670

671+
impl <F, S: fmt::Debug, B: Stream<Error=::Error>> fmt::Debug for ServerFuture<F, S, B>
672+
where B::Item: AsRef<[u8]>,
673+
F: Future<Item = (), Error = ()>
674+
{
675+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
676+
f.debug_struct("ServerFuture")
677+
.field("server", &self.server)
678+
.field("info", &"...")
679+
.field("shutdown_signal", &"...")
680+
.field("shutdown", &"...")
681+
.finish()
682+
}
683+
}
684+
594685
struct NotifyService<S> {
595686
inner: S,
596687
info: Weak<RefCell<Info>>,

0 commit comments

Comments
 (0)