Skip to content

Commit 398ad6a

Browse files
committed
wip
1 parent 8153cfa commit 398ad6a

File tree

4 files changed

+60
-18
lines changed

4 files changed

+60
-18
lines changed

src/client/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ where T: AsyncRead + AsyncWrite + 'static,
265265
type Response = proto::ResponseHead;
266266
type ResponseBody = proto::Chunk;
267267
type Error = ::Error;
268-
type Transport = proto::Conn<T, B::Item, proto::ClientTransaction, Pooled<TokioClient<B>>>;
268+
type Transport = proto::Conn<T, B::Item, proto::ClientTransaction, proto::ProtoOnion, Pooled<TokioClient<B>>>;
269269
type BindTransport = BindingClient<T, B>;
270270

271271
fn bind_transport(&self, io: T) -> Self::BindTransport {
@@ -286,13 +286,13 @@ where T: AsyncRead + AsyncWrite + 'static,
286286
B: Stream<Error=::Error>,
287287
B::Item: AsRef<[u8]>,
288288
{
289-
type Item = proto::Conn<T, B::Item, proto::ClientTransaction, Pooled<TokioClient<B>>>;
289+
type Item = proto::Conn<T, B::Item, proto::ClientTransaction, proto::ProtoOnion, Pooled<TokioClient<B>>>;
290290
type Error = io::Error;
291291

292292
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
293293
match self.rx.poll() {
294294
Ok(Async::Ready(client)) => Ok(Async::Ready(
295-
proto::Conn::new(self.io.take().expect("binding client io lost"), client)
295+
proto::Conn::new(self.io.take().expect("binding client io lost"), client, proto::ProtoOnion)
296296
)),
297297
Ok(Async::NotReady) => Ok(Async::NotReady),
298298
Err(_canceled) => unreachable!(),

src/proto/conn.rs

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::fmt;
22
use std::io::{self, Write};
33
use std::marker::PhantomData;
44

5-
use futures::{Poll, Async, AsyncSink, Stream, Sink, StartSend};
5+
use futures::{Poll, Async, AsyncSink, Future, Stream, Sink, StartSend};
66
use futures::task::Task;
77
use tokio_io::{AsyncRead, AsyncWrite};
88
use tokio_proto::streaming::pipeline::{Frame, Transport};
@@ -21,21 +21,23 @@ use version::HttpVersion;
2121
/// The connection will determine when a message begins and ends as well as
2222
/// determine if this connection can be kept alive after the message,
2323
/// or if it is complete.
24-
pub struct Conn<I, B, T, K = KA> {
24+
pub struct Conn<I, B, T, O, K = KA> {
2525
io: Buffered<I>,
26+
onion: O,
2627
state: State<B, K>,
2728
_marker: PhantomData<T>
2829
}
2930

30-
impl<I, B, T, K> Conn<I, B, T, K>
31+
impl<I, B, T, O, K> Conn<I, B, T, O, K>
3132
where I: AsyncRead + AsyncWrite,
3233
B: AsRef<[u8]>,
3334
T: Http1Transaction,
3435
K: KeepAlive
3536
{
36-
pub fn new(io: I, keep_alive: K) -> Conn<I, B, T, K> {
37+
pub fn new(io: I, keep_alive: K, onion: O) -> Conn<I, B, T, O, K> {
3738
Conn {
3839
io: Buffered::new(io),
40+
onion: onion,
3941
state: State {
4042
keep_alive: keep_alive,
4143
method: None,
@@ -51,8 +53,8 @@ where I: AsyncRead + AsyncWrite,
5153
self.io.set_flush_pipeline(enabled);
5254
}
5355

54-
fn poll2(&mut self) -> Poll<Option<Frame<super::MessageHead<T::Incoming>, super::Chunk, ::Error>>, io::Error> {
55-
trace!("Conn::poll()");
56+
fn poll_incoming(&mut self) -> Poll<Option<Frame<super::MessageHead<T::Incoming>, super::Chunk, ::Error>>, io::Error> {
57+
trace!("Conn::poll_incoming()");
5658

5759
loop {
5860
if self.is_read_closed() {
@@ -412,7 +414,30 @@ where I: AsyncRead + AsyncWrite,
412414
}
413415
}
414416

415-
impl<I, B, T, K> Stream for Conn<I, B, T, K>
417+
// ==== future impl ====
418+
419+
pub struct NoOnion;
420+
421+
impl<I, B, T, K> Future for Conn<I, B, T, NoOnion, K>
422+
where I: AsyncRead + AsyncWrite,
423+
B: AsRef<[u8]>,
424+
T: Http1Transaction,
425+
K: KeepAlive,
426+
T::Outgoing: fmt::Debug {
427+
type Item = I;
428+
type Error = ::Error;
429+
430+
#[inline]
431+
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
432+
unimplemented!("future impl")
433+
}
434+
}
435+
436+
// ==== tokio_proto impl ====
437+
438+
pub struct ProtoOnion;
439+
440+
impl<I, B, T, K> Stream for Conn<I, B, T, ProtoOnion, K>
416441
where I: AsyncRead + AsyncWrite,
417442
B: AsRef<[u8]>,
418443
T: Http1Transaction,
@@ -423,14 +448,14 @@ where I: AsyncRead + AsyncWrite,
423448

424449
#[inline]
425450
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
426-
self.poll2().map_err(|err| {
451+
self.poll_incoming().map_err(|err| {
427452
debug!("poll error: {}", err);
428453
err
429454
})
430455
}
431456
}
432457

433-
impl<I, B, T, K> Sink for Conn<I, B, T, K>
458+
impl<I, B, T, K> Sink for Conn<I, B, T, ProtoOnion, K>
434459
where I: AsyncRead + AsyncWrite,
435460
B: AsRef<[u8]>,
436461
T: Http1Transaction,
@@ -501,14 +526,14 @@ where I: AsyncRead + AsyncWrite,
501526
}
502527
}
503528

504-
impl<I, B, T, K> Transport for Conn<I, B, T, K>
529+
impl<I, B, T, K> Transport for Conn<I, B, T, ProtoOnion, K>
505530
where I: AsyncRead + AsyncWrite + 'static,
506531
B: AsRef<[u8]> + 'static,
507532
T: Http1Transaction + 'static,
508533
K: KeepAlive + 'static,
509534
T::Outgoing: fmt::Debug {}
510535

511-
impl<I, B: AsRef<[u8]>, T, K: fmt::Debug> fmt::Debug for Conn<I, B, T, K> {
536+
impl<I, B: AsRef<[u8]>, T, O, K: fmt::Debug> fmt::Debug for Conn<I, B, T, O, K> {
512537
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
513538
f.debug_struct("Conn")
514539
.field("state", &self.state)

src/proto/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use uri::Uri;
1212
use version::HttpVersion;
1313
use version::HttpVersion::{Http10, Http11};
1414

15-
pub use self::conn::{Conn, KeepAlive, KA};
15+
pub use self::conn::{Conn, KeepAlive, KA, NoOnion, ProtoOnion};
1616
pub use self::body::{Body, TokioBody};
1717
pub use self::chunk::Chunk;
1818

src/server/mod.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,18 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
165165
})
166166
}
167167

168+
pub fn no_onion<S, I, Bd>(&self, io: I, _service: S) -> Connection<I, B, S>
169+
where S: Service<Request = Request, Response = Response<Bd>, Error = ::Error> + 'static,
170+
Bd: Stream<Item=B, Error=::Error> + 'static,
171+
I: AsyncRead + AsyncWrite + 'static,
172+
173+
{
174+
Connection {
175+
conn: proto::Conn::new(io, proto::KA::Busy, proto::NoOnion),
176+
_marker: PhantomData,
177+
}
178+
}
179+
168180
/// Bind a `Service` using types from the `http` crate.
169181
///
170182
/// See `Http::bind_connection`.
@@ -185,6 +197,11 @@ impl<B: AsRef<[u8]> + 'static> Http<B> {
185197
}
186198
}
187199

200+
pub struct Connection<I, B, S> {
201+
conn: proto::Conn<I, B, proto::ServerTransaction, proto::NoOnion>,
202+
_marker: PhantomData<S>,
203+
}
204+
188205
impl<B> Clone for Http<B> {
189206
fn clone(&self) -> Http<B> {
190207
Http {
@@ -210,11 +227,11 @@ pub struct __ProtoRequest(proto::RequestHead);
210227
pub struct __ProtoResponse(ResponseHead);
211228
#[doc(hidden)]
212229
#[allow(missing_debug_implementations)]
213-
pub struct __ProtoTransport<T, B>(proto::Conn<T, B, proto::ServerTransaction>);
230+
pub struct __ProtoTransport<T, B>(proto::Conn<T, B, proto::ServerTransaction, proto::ProtoOnion>);
214231
#[doc(hidden)]
215232
#[allow(missing_debug_implementations)]
216233
pub struct __ProtoBindTransport<T, B> {
217-
inner: future::FutureResult<proto::Conn<T, B, proto::ServerTransaction>, io::Error>,
234+
inner: future::FutureResult<proto::Conn<T, B, proto::ServerTransaction, proto::ProtoOnion>, io::Error>,
218235
}
219236

220237
impl<T, B> ServerProto<T> for Http<B>
@@ -236,7 +253,7 @@ impl<T, B> ServerProto<T> for Http<B>
236253
} else {
237254
proto::KA::Disabled
238255
};
239-
let mut conn = proto::Conn::new(io, ka);
256+
let mut conn = proto::Conn::new(io, ka, proto::ProtoOnion);
240257
conn.set_flush_pipeline(self.pipeline);
241258
__ProtoBindTransport {
242259
inner: future::ok(conn),

0 commit comments

Comments
 (0)