@@ -23,6 +23,11 @@ pub struct FramedWrite<T, B> {
23
23
/// Upstream `AsyncWrite`
24
24
inner : T ,
25
25
26
+ encoder : Encoder < B > ,
27
+ }
28
+
29
+ #[ derive( Debug ) ]
30
+ struct Encoder < B > {
26
31
/// HPACK encoder
27
32
hpack : hpack:: Encoder ,
28
33
@@ -74,12 +79,14 @@ where
74
79
let is_write_vectored = inner. is_write_vectored ( ) ;
75
80
FramedWrite {
76
81
inner,
77
- hpack : hpack:: Encoder :: default ( ) ,
78
- buf : Cursor :: new ( BytesMut :: with_capacity ( DEFAULT_BUFFER_CAPACITY ) ) ,
79
- next : None ,
80
- last_data_frame : None ,
81
- max_frame_size : frame:: DEFAULT_MAX_FRAME_SIZE ,
82
- is_write_vectored,
82
+ encoder : Encoder {
83
+ hpack : hpack:: Encoder :: default ( ) ,
84
+ buf : Cursor :: new ( BytesMut :: with_capacity ( DEFAULT_BUFFER_CAPACITY ) ) ,
85
+ next : None ,
86
+ last_data_frame : None ,
87
+ max_frame_size : frame:: DEFAULT_MAX_FRAME_SIZE ,
88
+ is_write_vectored,
89
+ } ,
83
90
}
84
91
}
85
92
@@ -88,11 +95,11 @@ where
88
95
/// Calling this function may result in the current contents of the buffer
89
96
/// to be flushed to `T`.
90
97
pub fn poll_ready ( & mut self , cx : & mut Context ) -> Poll < io:: Result < ( ) > > {
91
- if !self . has_capacity ( ) {
98
+ if !self . encoder . has_capacity ( ) {
92
99
// Try flushing
93
100
ready ! ( self . flush( cx) ) ?;
94
101
95
- if !self . has_capacity ( ) {
102
+ if !self . encoder . has_capacity ( ) {
96
103
return Poll :: Pending ;
97
104
}
98
105
}
@@ -105,6 +112,128 @@ where
105
112
/// `poll_ready` must be called first to ensure that a frame may be
106
113
/// accepted.
107
114
pub fn buffer ( & mut self , item : Frame < B > ) -> Result < ( ) , UserError > {
115
+ self . encoder . buffer ( item)
116
+ }
117
+
118
+ /// Flush buffered data to the wire
119
+ pub fn flush ( & mut self , cx : & mut Context ) -> Poll < io:: Result < ( ) > > {
120
+ let span = tracing:: trace_span!( "FramedWrite::flush" ) ;
121
+ let _e = span. enter ( ) ;
122
+
123
+ loop {
124
+ while !self . encoder . is_empty ( ) {
125
+ match self . encoder . next {
126
+ Some ( Next :: Data ( ref mut frame) ) => {
127
+ tracing:: trace!( queued_data_frame = true ) ;
128
+ let mut buf = ( & mut self . encoder . buf ) . chain ( frame. payload_mut ( ) ) ;
129
+ ready ! ( write(
130
+ & mut self . inner,
131
+ self . encoder. is_write_vectored,
132
+ & mut buf,
133
+ cx,
134
+ ) ) ?
135
+ }
136
+ _ => {
137
+ tracing:: trace!( queued_data_frame = false ) ;
138
+ ready ! ( write(
139
+ & mut self . inner,
140
+ self . encoder. is_write_vectored,
141
+ & mut self . encoder. buf,
142
+ cx,
143
+ ) ) ?
144
+ }
145
+ }
146
+ }
147
+
148
+ match self . encoder . unset_frame ( ) {
149
+ ControlFlow :: Continue => ( ) ,
150
+ ControlFlow :: Break => break ,
151
+ }
152
+ }
153
+
154
+ tracing:: trace!( "flushing buffer" ) ;
155
+ // Flush the upstream
156
+ ready ! ( Pin :: new( & mut self . inner) . poll_flush( cx) ) ?;
157
+
158
+ Poll :: Ready ( Ok ( ( ) ) )
159
+ }
160
+
161
+ /// Close the codec
162
+ pub fn shutdown ( & mut self , cx : & mut Context ) -> Poll < io:: Result < ( ) > > {
163
+ ready ! ( self . flush( cx) ) ?;
164
+ Pin :: new ( & mut self . inner ) . poll_shutdown ( cx)
165
+ }
166
+ }
167
+
168
+ fn write < T , B > (
169
+ writer : & mut T ,
170
+ is_write_vectored : bool ,
171
+ buf : & mut B ,
172
+ cx : & mut Context < ' _ > ,
173
+ ) -> Poll < io:: Result < ( ) > >
174
+ where
175
+ T : AsyncWrite + Unpin ,
176
+ B : Buf ,
177
+ {
178
+ // TODO(eliza): when tokio-util 0.5.1 is released, this
179
+ // could just use `poll_write_buf`...
180
+ const MAX_IOVS : usize = 64 ;
181
+ let n = if is_write_vectored {
182
+ let mut bufs = [ IoSlice :: new ( & [ ] ) ; MAX_IOVS ] ;
183
+ let cnt = buf. chunks_vectored ( & mut bufs) ;
184
+ ready ! ( Pin :: new( writer) . poll_write_vectored( cx, & bufs[ ..cnt] ) ) ?
185
+ } else {
186
+ ready ! ( Pin :: new( writer) . poll_write( cx, buf. chunk( ) ) ) ?
187
+ } ;
188
+ buf. advance ( n) ;
189
+ Ok ( ( ) ) . into ( )
190
+ }
191
+
192
+ #[ must_use]
193
+ enum ControlFlow {
194
+ Continue ,
195
+ Break ,
196
+ }
197
+
198
+ impl < B > Encoder < B >
199
+ where
200
+ B : Buf ,
201
+ {
202
+ fn unset_frame ( & mut self ) -> ControlFlow {
203
+ // Clear internal buffer
204
+ self . buf . set_position ( 0 ) ;
205
+ self . buf . get_mut ( ) . clear ( ) ;
206
+
207
+ // The data frame has been written, so unset it
208
+ match self . next . take ( ) {
209
+ Some ( Next :: Data ( frame) ) => {
210
+ self . last_data_frame = Some ( frame) ;
211
+ debug_assert ! ( self . is_empty( ) ) ;
212
+ ControlFlow :: Break
213
+ }
214
+ Some ( Next :: Continuation ( frame) ) => {
215
+ // Buffer the continuation frame, then try to write again
216
+ let mut buf = limited_write_buf ! ( self ) ;
217
+ if let Some ( continuation) = frame. encode ( & mut self . hpack , & mut buf) {
218
+ // We previously had a CONTINUATION, and after encoding
219
+ // it, we got *another* one? Let's just double check
220
+ // that at least some progress is being made...
221
+ if self . buf . get_ref ( ) . len ( ) == frame:: HEADER_LEN {
222
+ // If *only* the CONTINUATION frame header was
223
+ // written, and *no* header fields, we're stuck
224
+ // in a loop...
225
+ panic ! ( "CONTINUATION frame write loop; header value too big to encode" ) ;
226
+ }
227
+
228
+ self . next = Some ( Next :: Continuation ( continuation) ) ;
229
+ }
230
+ ControlFlow :: Continue
231
+ }
232
+ None => ControlFlow :: Break ,
233
+ }
234
+ }
235
+
236
+ fn buffer ( & mut self , item : Frame < B > ) -> Result < ( ) , UserError > {
108
237
// Ensure that we have enough capacity to accept the write.
109
238
assert ! ( self . has_capacity( ) ) ;
110
239
let span = tracing:: trace_span!( "FramedWrite::buffer" , frame = ?item) ;
@@ -185,93 +314,6 @@ where
185
314
Ok ( ( ) )
186
315
}
187
316
188
- /// Flush buffered data to the wire
189
- pub fn flush ( & mut self , cx : & mut Context ) -> Poll < io:: Result < ( ) > > {
190
- const MAX_IOVS : usize = 64 ;
191
-
192
- let span = tracing:: trace_span!( "FramedWrite::flush" ) ;
193
- let _e = span. enter ( ) ;
194
-
195
- loop {
196
- while !self . is_empty ( ) {
197
- match self . next {
198
- Some ( Next :: Data ( ref mut frame) ) => {
199
- tracing:: trace!( queued_data_frame = true ) ;
200
- let mut buf = ( & mut self . buf ) . chain ( frame. payload_mut ( ) ) ;
201
- // TODO(eliza): when tokio-util 0.5.1 is released, this
202
- // could just use `poll_write_buf`...
203
- let n = if self . is_write_vectored {
204
- let mut bufs = [ IoSlice :: new ( & [ ] ) ; MAX_IOVS ] ;
205
- let cnt = buf. chunks_vectored ( & mut bufs) ;
206
- ready ! ( Pin :: new( & mut self . inner) . poll_write_vectored( cx, & bufs[ ..cnt] ) ) ?
207
- } else {
208
- ready ! ( Pin :: new( & mut self . inner) . poll_write( cx, buf. chunk( ) ) ) ?
209
- } ;
210
- buf. advance ( n) ;
211
- }
212
- _ => {
213
- tracing:: trace!( queued_data_frame = false ) ;
214
- let n = if self . is_write_vectored {
215
- let mut iovs = [ IoSlice :: new ( & [ ] ) ; MAX_IOVS ] ;
216
- let cnt = self . buf . chunks_vectored ( & mut iovs) ;
217
- ready ! (
218
- Pin :: new( & mut self . inner) . poll_write_vectored( cx, & mut iovs[ ..cnt] )
219
- ) ?
220
- } else {
221
- ready ! ( Pin :: new( & mut self . inner) . poll_write( cx, & mut self . buf. chunk( ) ) ) ?
222
- } ;
223
- self . buf . advance ( n) ;
224
- }
225
- }
226
- }
227
-
228
- // Clear internal buffer
229
- self . buf . set_position ( 0 ) ;
230
- self . buf . get_mut ( ) . clear ( ) ;
231
-
232
- // The data frame has been written, so unset it
233
- match self . next . take ( ) {
234
- Some ( Next :: Data ( frame) ) => {
235
- self . last_data_frame = Some ( frame) ;
236
- debug_assert ! ( self . is_empty( ) ) ;
237
- break ;
238
- }
239
- Some ( Next :: Continuation ( frame) ) => {
240
- // Buffer the continuation frame, then try to write again
241
- let mut buf = limited_write_buf ! ( self ) ;
242
- if let Some ( continuation) = frame. encode ( & mut self . hpack , & mut buf) {
243
- // We previously had a CONTINUATION, and after encoding
244
- // it, we got *another* one? Let's just double check
245
- // that at least some progress is being made...
246
- if self . buf . get_ref ( ) . len ( ) == frame:: HEADER_LEN {
247
- // If *only* the CONTINUATION frame header was
248
- // written, and *no* header fields, we're stuck
249
- // in a loop...
250
- panic ! ( "CONTINUATION frame write loop; header value too big to encode" ) ;
251
- }
252
-
253
- self . next = Some ( Next :: Continuation ( continuation) ) ;
254
- }
255
- }
256
- None => {
257
- break ;
258
- }
259
- }
260
- }
261
-
262
- tracing:: trace!( "flushing buffer" ) ;
263
- // Flush the upstream
264
- ready ! ( Pin :: new( & mut self . inner) . poll_flush( cx) ) ?;
265
-
266
- Poll :: Ready ( Ok ( ( ) ) )
267
- }
268
-
269
- /// Close the codec
270
- pub fn shutdown ( & mut self , cx : & mut Context ) -> Poll < io:: Result < ( ) > > {
271
- ready ! ( self . flush( cx) ) ?;
272
- Pin :: new ( & mut self . inner ) . poll_shutdown ( cx)
273
- }
274
-
275
317
fn has_capacity ( & self ) -> bool {
276
318
self . next . is_none ( ) && self . buf . get_ref ( ) . remaining_mut ( ) >= MIN_BUFFER_CAPACITY
277
319
}
@@ -284,26 +326,32 @@ where
284
326
}
285
327
}
286
328
329
+ impl < B > Encoder < B > {
330
+ fn max_frame_size ( & self ) -> usize {
331
+ self . max_frame_size as usize
332
+ }
333
+ }
334
+
287
335
impl < T , B > FramedWrite < T , B > {
288
336
/// Returns the max frame size that can be sent
289
337
pub fn max_frame_size ( & self ) -> usize {
290
- self . max_frame_size as usize
338
+ self . encoder . max_frame_size ( )
291
339
}
292
340
293
341
/// Set the peer's max frame size.
294
342
pub fn set_max_frame_size ( & mut self , val : usize ) {
295
343
assert ! ( val <= frame:: MAX_MAX_FRAME_SIZE as usize ) ;
296
- self . max_frame_size = val as FrameSize ;
344
+ self . encoder . max_frame_size = val as FrameSize ;
297
345
}
298
346
299
347
/// Set the peer's header table size.
300
348
pub fn set_header_table_size ( & mut self , val : usize ) {
301
- self . hpack . update_max_size ( val) ;
349
+ self . encoder . hpack . update_max_size ( val) ;
302
350
}
303
351
304
352
/// Retrieve the last data frame that has been sent
305
353
pub fn take_last_data_frame ( & mut self ) -> Option < frame:: Data < B > > {
306
- self . last_data_frame . take ( )
354
+ self . encoder . last_data_frame . take ( )
307
355
}
308
356
309
357
pub fn get_mut ( & mut self ) -> & mut T {
0 commit comments