Skip to content

Commit 0388020

Browse files
committed
feat(server): allow creating Server with shared Handle
1. impl Future for Server [WIP] 2. add method bind_handle to Http 3. add an example to use shared Handle in multiple server
1 parent 528afb8 commit 0388020

File tree

2 files changed

+168
-4
lines changed

2 files changed

+168
-4
lines changed

examples/multi_server.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
#![deny(warnings)]
2+
extern crate hyper;
3+
extern crate futures;
4+
extern crate tokio_core;
5+
extern crate pretty_env_logger;
6+
7+
use futures::future::FutureResult;
8+
9+
use hyper::{Get, StatusCode};
10+
use tokio_core::reactor::Core;
11+
use hyper::header::ContentLength;
12+
use hyper::server::{Http, Service, Request, Response};
13+
14+
static INDEX1: &'static [u8] = b"The 1st service!";
15+
static INDEX2: &'static [u8] = b"The 2nd service!";
16+
17+
struct Service1;
18+
struct Service2;
19+
20+
impl Service for Service1 {
21+
type Request = Request;
22+
type Response = Response;
23+
type Error = hyper::Error;
24+
type Future = FutureResult<Response, hyper::Error>;
25+
26+
fn call(&self, req: Request) -> Self::Future {
27+
futures::future::ok(match (req.method(), req.path()) {
28+
(&Get, "/") => {
29+
Response::new()
30+
.with_header(ContentLength(INDEX1.len() as u64))
31+
.with_body(INDEX1)
32+
},
33+
_ => {
34+
Response::new()
35+
.with_status(StatusCode::NotFound)
36+
}
37+
})
38+
}
39+
40+
}
41+
42+
impl Service for Service2 {
43+
type Request = Request;
44+
type Response = Response;
45+
type Error = hyper::Error;
46+
type Future = FutureResult<Response, hyper::Error>;
47+
48+
fn call(&self, req: Request) -> Self::Future {
49+
futures::future::ok(match (req.method(), req.path()) {
50+
(&Get, "/") => {
51+
Response::new()
52+
.with_header(ContentLength(INDEX2.len() as u64))
53+
.with_body(INDEX2)
54+
},
55+
_ => {
56+
Response::new()
57+
.with_status(StatusCode::NotFound)
58+
}
59+
})
60+
}
61+
62+
}
63+
64+
65+
fn main() {
66+
pretty_env_logger::init().unwrap();
67+
let addr1 = "127.0.0.1:1337".parse().unwrap();
68+
let addr2 = "127.0.0.1:1338".parse().unwrap();
69+
70+
let mut core = Core::new().unwrap();
71+
let handle = core.handle();
72+
73+
let srv1 = Http::new().bind_handle(&addr1,|| Ok(Service1), &handle).unwrap();
74+
let srv2 = Http::new().bind_handle(&addr2,|| Ok(Service2), &handle).unwrap();
75+
76+
println!("Listening on http://{}", srv1.local_addr().unwrap());
77+
println!("Listening on http://{}", srv2.local_addr().unwrap());
78+
79+
handle.spawn(srv1);
80+
handle.spawn(srv2);
81+
core.run(futures::future::empty::<(), ()>()).unwrap();
82+
}

src/server/mod.rs

Lines changed: 86 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,26 @@ use proto::Body;
4141
pub use proto::response::Response;
4242
pub use proto::request::Request;
4343

44+
// The `Server` can be created use its own `Core`, or an shared `Handle`.
45+
enum Reactor {
46+
// Own its `Core`
47+
Core(Core),
48+
// Share `Handle` with others
49+
Handle(Handle),
50+
}
51+
52+
impl Reactor {
53+
/// Returns a handle to the underlying event loop that this server will be
54+
/// running on.
55+
#[inline]
56+
pub fn handle(&self) -> Handle {
57+
match *self {
58+
Reactor::Core(ref core) => core.handle(),
59+
Reactor::Handle(ref handle) => handle.clone(),
60+
}
61+
}
62+
}
63+
4464
/// An instance of the HTTP protocol, and implementation of tokio-proto's
4565
/// `ServerProto` trait.
4666
///
@@ -63,7 +83,7 @@ where B: Stream<Error=::Error>,
6383
{
6484
protocol: Http<B::Item>,
6585
new_service: S,
66-
core: Core,
86+
reactor: Reactor,
6787
listener: TcpListener,
6888
shutdown_timeout: Duration,
6989
}
@@ -117,10 +137,34 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
117137

118138
Ok(Server {
119139
new_service: new_service,
120-
core: core,
140+
reactor: Reactor::Core(core),
141+
listener: listener,
142+
protocol: self.clone(),
143+
shutdown_timeout: Duration::new(1, 0),
144+
})
145+
}
146+
147+
/// This method allows the ability to share a `Core` with multiple servers.
148+
///
149+
/// Bind the provided `addr` and return a server with a shared `Core`.
150+
///
151+
/// This is method will bind the `addr` provided with a new TCP listener ready
152+
/// to accept connections. Each connection will be processed with the
153+
/// `new_service` object provided as well, creating a new service per
154+
/// connection.
155+
pub fn bind_handle<S, Bd>(&self, addr: &SocketAddr, new_service: S, handle: &Handle) -> ::Result<Server<S, Bd>>
156+
where S: NewService<Request = Request, Response = Response<Bd>, Error = ::Error> + 'static,
157+
Bd: Stream<Item=B, Error=::Error>,
158+
{
159+
let listener = TcpListener::bind(addr, &handle)?;
160+
161+
Ok(Server {
162+
new_service: new_service,
163+
reactor: Reactor::Handle(handle.clone()),
121164
listener: listener,
122165
protocol: self.clone(),
123166
shutdown_timeout: Duration::new(1, 0),
167+
124168
})
125169
}
126170

@@ -404,7 +448,7 @@ impl<S, B> Server<S, B>
404448
/// Returns a handle to the underlying event loop that this server will be
405449
/// running on.
406450
pub fn handle(&self) -> Handle {
407-
self.core.handle()
451+
self.reactor.handle()
408452
}
409453

410454
/// Configure the amount of time this server will wait for a "graceful
@@ -444,7 +488,13 @@ impl<S, B> Server<S, B>
444488
pub fn run_until<F>(self, shutdown_signal: F) -> ::Result<()>
445489
where F: Future<Item = (), Error = ()>,
446490
{
447-
let Server { protocol, new_service, mut core, listener, shutdown_timeout } = self;
491+
let Server { protocol, new_service, reactor, listener, shutdown_timeout } = self;
492+
493+
let mut core = match reactor {
494+
Reactor::Core(core) => core,
495+
_ => panic!("Server does not own its core, use `Handle::spawn()` to run the service!"),
496+
};
497+
448498
let handle = core.handle();
449499

450500
// Mini future to track the number of active services
@@ -496,6 +546,38 @@ impl<S, B> Server<S, B>
496546
}
497547
}
498548

549+
impl<S, B> Future for Server<S, B>
550+
where S: NewService<Request = Request, Response = Response<B>, Error = ::Error> + 'static,
551+
B: Stream<Error=::Error> + 'static,
552+
B::Item: AsRef<[u8]>,
553+
{
554+
type Item = ();
555+
type Error = ();
556+
557+
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
558+
if let Reactor::Core(_) = self.reactor {
559+
panic!("Server owns its core, use `Server::run()` to run the service!")
560+
}
561+
562+
loop {
563+
match self.listener.accept() {
564+
Ok((socket, addr)) => {
565+
// TODO: use the NotifyService
566+
match self.new_service.new_service() {
567+
Ok(srv) => self.protocol.bind_connection(&self.handle(),
568+
socket,
569+
addr,
570+
srv),
571+
Err(e) => debug!("internal error: {:?}", e),
572+
}
573+
}
574+
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => return Ok(Async::NotReady),
575+
Err(e) => debug!("internal error: {:?}", e),
576+
}
577+
}
578+
}
579+
}
580+
499581
impl<S: fmt::Debug, B: Stream<Error=::Error>> fmt::Debug for Server<S, B>
500582
where B::Item: AsRef<[u8]>
501583
{

0 commit comments

Comments
 (0)