Skip to content

Commit b434e15

Browse files
committed
feat(server): Handle 100-continue
cc #838
1 parent 7369fe6 commit b434e15

File tree

3 files changed

+76
-20
lines changed

3 files changed

+76
-20
lines changed

src/http/conn.rs

Lines changed: 56 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,13 @@ where I: AsyncRead + AsyncWrite,
6363
}
6464
}
6565

66+
fn can_write_continue(&self) -> bool {
67+
match self.state.writing {
68+
Writing::Continue(..) => true,
69+
_ => false,
70+
}
71+
}
72+
6673
fn can_read_body(&self) -> bool {
6774
match self.state.reading {
6875
Reading::Body(..) => true,
@@ -106,6 +113,10 @@ where I: AsyncRead + AsyncWrite,
106113
}
107114
};
108115
self.state.busy();
116+
if head.expecting_continue() {
117+
let msg = b"HTTP/1.1 100 Continue\r\n\r\n";
118+
self.state.writing = Writing::Continue(Cursor::new(msg));
119+
}
109120
let wants_keep_alive = head.should_keep_alive();
110121
self.state.keep_alive &= wants_keep_alive;
111122
let (body, reading) = if decoder.is_eof() {
@@ -173,6 +184,7 @@ where I: AsyncRead + AsyncWrite,
173184
}
174185

175186
match self.state.writing {
187+
Writing::Continue(..) |
176188
Writing::Body(..) |
177189
Writing::Ending(..) => return,
178190
Writing::Init |
@@ -192,14 +204,15 @@ where I: AsyncRead + AsyncWrite,
192204

193205
fn can_write_head(&self) -> bool {
194206
match self.state.writing {
195-
Writing::Init => true,
207+
Writing::Continue(..) | Writing::Init => true,
196208
_ => false
197209
}
198210
}
199211

200212
fn can_write_body(&self) -> bool {
201213
match self.state.writing {
202214
Writing::Body(..) => true,
215+
Writing::Continue(..) |
203216
Writing::Init |
204217
Writing::Ending(..) |
205218
Writing::KeepAlive |
@@ -229,6 +242,13 @@ where I: AsyncRead + AsyncWrite,
229242
let wants_keep_alive = head.should_keep_alive();
230243
self.state.keep_alive &= wants_keep_alive;
231244
let mut buf = Vec::new();
245+
// if a 100-continue has started but not finished sending, tack the
246+
// remainder on to the start of the buffer.
247+
if let Writing::Continue(ref pending) = self.state.writing {
248+
if pending.has_started() {
249+
buf.extend_from_slice(pending.buf());
250+
}
251+
}
232252
let encoder = T::encode(head, &mut buf);
233253
//TODO: handle when there isn't enough room to buffer the head
234254
assert!(self.io.buffer(buf) > 0);
@@ -296,6 +316,15 @@ where I: AsyncRead + AsyncWrite,
296316
fn write_queued(&mut self) -> Poll<(), io::Error> {
297317
trace!("Conn::write_queued()");
298318
let state = match self.state.writing {
319+
Writing::Continue(ref mut queued) => {
320+
let n = self.io.buffer(queued.buf());
321+
queued.consume(n);
322+
if queued.is_written() {
323+
Writing::Init
324+
} else {
325+
return Ok(Async::NotReady);
326+
}
327+
}
299328
Writing::Body(ref mut encoder, ref mut queued) => {
300329
let complete = if let Some(chunk) = queued.as_mut() {
301330
let n = try_nb!(encoder.encode(&mut self.io, chunk.buf()));
@@ -355,24 +384,28 @@ where I: AsyncRead + AsyncWrite,
355384
trace!("Conn::poll()");
356385
self.state.read_task.take();
357386

358-
if self.is_read_closed() {
359-
trace!("Conn::poll when closed");
360-
Ok(Async::Ready(None))
361-
} else if self.can_read_head() {
362-
self.read_head()
363-
} else if self.can_read_body() {
364-
self.read_body()
365-
.map(|async| async.map(|chunk| Some(Frame::Body {
366-
chunk: chunk
367-
})))
368-
.or_else(|err| {
369-
self.state.close_read();
370-
Ok(Async::Ready(Some(Frame::Error { error: err.into() })))
371-
})
372-
} else {
373-
trace!("poll when on keep-alive");
374-
self.maybe_park_read();
375-
Ok(Async::NotReady)
387+
loop {
388+
if self.is_read_closed() {
389+
trace!("Conn::poll when closed");
390+
return Ok(Async::Ready(None));
391+
} else if self.can_read_head() {
392+
return self.read_head();
393+
} else if self.can_write_continue() {
394+
try_nb!(self.flush());
395+
} else if self.can_read_body() {
396+
return self.read_body()
397+
.map(|async| async.map(|chunk| Some(Frame::Body {
398+
chunk: chunk
399+
})))
400+
.or_else(|err| {
401+
self.state.close_read();
402+
Ok(Async::Ready(Some(Frame::Error { error: err.into() })))
403+
});
404+
} else {
405+
trace!("poll when on keep-alive");
406+
self.maybe_park_read();
407+
return Ok(Async::NotReady);
408+
}
376409
}
377410
}
378411
}
@@ -482,6 +515,7 @@ enum Reading {
482515
}
483516

484517
enum Writing<B> {
518+
Continue(Cursor<&'static [u8]>),
485519
Init,
486520
Body(Encoder, Option<Cursor<B>>),
487521
Ending(Cursor<&'static [u8]>),
@@ -503,6 +537,9 @@ impl<B: AsRef<[u8]>, K: fmt::Debug> fmt::Debug for State<B, K> {
503537
impl<B: AsRef<[u8]>> fmt::Debug for Writing<B> {
504538
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
505539
match *self {
540+
Writing::Continue(ref buf) => f.debug_tuple("Continue")
541+
.field(buf)
542+
.finish(),
506543
Writing::Init => f.write_str("Init"),
507544
Writing::Body(ref enc, ref queued) => f.debug_tuple("Body")
508545
.field(enc)

src/http/io.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,10 @@ impl<T: AsRef<[u8]>> Cursor<T> {
175175
}
176176
}
177177

178+
pub fn has_started(&self) -> bool {
179+
self.pos != 0
180+
}
181+
178182
pub fn is_written(&self) -> bool {
179183
trace!("Cursor::is_written pos = {}, len = {}", self.pos, self.bytes.as_ref().len());
180184
self.pos >= self.bytes.as_ref().len()

src/http/mod.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::fmt;
44

55
use bytes::BytesMut;
66

7-
use header::{Connection, ConnectionOption};
7+
use header::{Connection, ConnectionOption, Expect};
88
use header::Headers;
99
use method::Method;
1010
use status::StatusCode;
@@ -56,6 +56,10 @@ impl<S> MessageHead<S> {
5656
pub fn should_keep_alive(&self) -> bool {
5757
should_keep_alive(self.version, &self.headers)
5858
}
59+
60+
pub fn expecting_continue(&self) -> bool {
61+
expecting_continue(self.version, &self.headers)
62+
}
5963
}
6064

6165
impl ResponseHead {
@@ -119,6 +123,17 @@ pub fn should_keep_alive(version: HttpVersion, headers: &Headers) -> bool {
119123
ret
120124
}
121125

126+
/// Checks if a connection is expecting a `100 Continue` before sending its body.
127+
#[inline]
128+
pub fn expecting_continue(version: HttpVersion, headers: &Headers) -> bool {
129+
let ret = match (version, headers.get::<Expect>()) {
130+
(Http11, Some(&Expect::Continue)) => true,
131+
_ => false
132+
};
133+
trace!("expecting_continue(version={:?}, header={:?} = {:?}", version, headers.get::<Expect>(), ret);
134+
ret
135+
}
136+
122137
#[derive(Debug)]
123138
pub enum ServerTransaction {}
124139

0 commit comments

Comments
 (0)