@@ -20,7 +20,7 @@ use tokio_proto::util::client_proxy::ClientProxy;
20
20
pub use tokio_service:: Service ;
21
21
22
22
use header:: { Headers , Host } ;
23
- use proto:: { self , TokioBody } ;
23
+ use proto:: { self , RequestHead , TokioBody } ;
24
24
use proto:: response;
25
25
use proto:: request;
26
26
use method:: Method ;
@@ -45,7 +45,7 @@ pub mod compat;
45
45
pub struct Client < C , B = proto:: Body > {
46
46
connector : C ,
47
47
handle : Handle ,
48
- pool : Pool < TokioClient < B > > ,
48
+ pool : Dispatch < B > ,
49
49
}
50
50
51
51
impl Client < HttpConnector , proto:: Body > {
@@ -93,7 +93,11 @@ impl<C, B> Client<C, B> {
93
93
Client {
94
94
connector : config. connector ,
95
95
handle : handle. clone ( ) ,
96
- pool : Pool :: new ( config. keep_alive , config. keep_alive_timeout ) ,
96
+ pool : if config. no_proto {
97
+ Dispatch :: Hyper ( Pool :: new ( config. keep_alive , config. keep_alive_timeout ) )
98
+ } else {
99
+ Dispatch :: Proto ( Pool :: new ( config. keep_alive , config. keep_alive_timeout ) )
100
+ }
97
101
}
98
102
}
99
103
}
@@ -187,48 +191,100 @@ where C: Connect,
187
191
headers. extend ( head. headers . iter ( ) ) ;
188
192
head. headers = headers;
189
193
190
- let checkout = self . pool . checkout ( domain. as_ref ( ) ) ;
191
- let connect = {
192
- let handle = self . handle . clone ( ) ;
193
- let pool = self . pool . clone ( ) ;
194
- let pool_key = Rc :: new ( domain. to_string ( ) ) ;
195
- self . connector . connect ( url)
196
- . map ( move |io| {
197
- let ( tx, rx) = oneshot:: channel ( ) ;
198
- let client = HttpClient {
199
- client_rx : RefCell :: new ( Some ( rx) ) ,
200
- } . bind_client ( & handle, io) ;
201
- let pooled = pool. pooled ( pool_key, client) ;
202
- drop ( tx. send ( pooled. clone ( ) ) ) ;
203
- pooled
204
- } )
205
- } ;
194
+ match self . pool {
195
+ Dispatch :: Proto ( ref pool) => {
196
+ trace ! ( "proto_dispatch" ) ;
197
+ let checkout = pool. checkout ( domain. as_ref ( ) ) ;
198
+ let connect = {
199
+ let handle = self . handle . clone ( ) ;
200
+ let pool = pool. clone ( ) ;
201
+ let pool_key = Rc :: new ( domain. to_string ( ) ) ;
202
+ self . connector . connect ( url)
203
+ . map ( move |io| {
204
+ let ( tx, rx) = oneshot:: channel ( ) ;
205
+ let client = HttpClient {
206
+ client_rx : RefCell :: new ( Some ( rx) ) ,
207
+ } . bind_client ( & handle, io) ;
208
+ let pooled = pool. pooled ( pool_key, client) ;
209
+ drop ( tx. send ( pooled. clone ( ) ) ) ;
210
+ pooled
211
+ } )
212
+ } ;
213
+
214
+ let race = checkout. select ( connect)
215
+ . map ( |( client, _work) | client)
216
+ . map_err ( |( e, _work) | {
217
+ // the Pool Checkout cannot error, so the only error
218
+ // is from the Connector
219
+ // XXX: should wait on the Checkout? Problem is
220
+ // that if the connector is failing, it may be that we
221
+ // never had a pooled stream at all
222
+ e. into ( )
223
+ } ) ;
224
+ let resp = race. and_then ( move |client| {
225
+ let msg = match body {
226
+ Some ( body) => {
227
+ Message :: WithBody ( head, body. into ( ) )
228
+ } ,
229
+ None => Message :: WithoutBody ( head) ,
230
+ } ;
231
+ client. call ( msg)
232
+ } ) ;
233
+ FutureResponse ( Box :: new ( resp. map ( |msg| {
234
+ match msg {
235
+ Message :: WithoutBody ( head) => response:: from_wire ( head, None ) ,
236
+ Message :: WithBody ( head, body) => response:: from_wire ( head, Some ( body. into ( ) ) ) ,
237
+ }
238
+ } ) ) )
239
+ } ,
240
+ Dispatch :: Hyper ( ref pool) => {
241
+ trace ! ( "no_proto dispatch" ) ;
242
+ use futures:: Sink ;
243
+ use futures:: sync:: { mpsc, oneshot} ;
244
+
245
+ let checkout = pool. checkout ( domain. as_ref ( ) ) ;
246
+ let connect = {
247
+ let handle = self . handle . clone ( ) ;
248
+ let pool = pool. clone ( ) ;
249
+ let pool_key = Rc :: new ( domain. to_string ( ) ) ;
250
+ self . connector . connect ( url)
251
+ . map ( move |io| {
252
+ let ( tx, rx) = mpsc:: channel ( 1 ) ;
253
+ let pooled = pool. pooled ( pool_key, RefCell :: new ( tx) ) ;
254
+ let conn = proto:: Conn :: < _ , _ , proto:: ClientTransaction , _ > :: new ( io, pooled. clone ( ) ) ;
255
+ let dispatch = proto:: dispatch:: Dispatcher :: new ( proto:: dispatch:: Client :: new ( rx) , conn) ;
256
+ handle. spawn ( dispatch. map_err ( |err| error ! ( "no_proto error: {}" , err) ) ) ;
257
+ pooled
258
+ } )
259
+ } ;
260
+
261
+ let race = checkout. select ( connect)
262
+ . map ( |( client, _work) | client)
263
+ . map_err ( |( e, _work) | {
264
+ // the Pool Checkout cannot error, so the only error
265
+ // is from the Connector
266
+ // XXX: should wait on the Checkout? Problem is
267
+ // that if the connector is failing, it may be that we
268
+ // never had a pooled stream at all
269
+ e. into ( )
270
+ } ) ;
271
+
272
+ let resp = race. and_then ( move |client| {
273
+ let ( callback, rx) = oneshot:: channel ( ) ;
274
+ client. borrow_mut ( ) . start_send ( ( head, body, callback) ) . unwrap ( ) ;
275
+ rx. then ( |res| {
276
+ match res {
277
+ Ok ( Ok ( res) ) => Ok ( res) ,
278
+ Ok ( Err ( err) ) => Err ( err) ,
279
+ Err ( _) => unimplemented ! ( "dispatch dropped" ) ,
280
+ }
281
+ } )
282
+ } ) ;
283
+
284
+ FutureResponse ( Box :: new ( resp) )
206
285
207
- let race = checkout. select ( connect)
208
- . map ( |( client, _work) | client)
209
- . map_err ( |( e, _work) | {
210
- // the Pool Checkout cannot error, so the only error
211
- // is from the Connector
212
- // XXX: should wait on the Checkout? Problem is
213
- // that if the connector is failing, it may be that we
214
- // never had a pooled stream at all
215
- e. into ( )
216
- } ) ;
217
- let resp = race. and_then ( move |client| {
218
- let msg = match body {
219
- Some ( body) => {
220
- Message :: WithBody ( head, body. into ( ) )
221
- } ,
222
- None => Message :: WithoutBody ( head) ,
223
- } ;
224
- client. call ( msg)
225
- } ) ;
226
- FutureResponse ( Box :: new ( resp. map ( |msg| {
227
- match msg {
228
- Message :: WithoutBody ( head) => response:: from_wire ( head, None ) ,
229
- Message :: WithBody ( head, body) => response:: from_wire ( head, Some ( body. into ( ) ) ) ,
230
286
}
231
- } ) ) )
287
+ }
232
288
}
233
289
234
290
}
@@ -238,7 +294,10 @@ impl<C: Clone, B> Clone for Client<C, B> {
238
294
Client {
239
295
connector : self . connector . clone ( ) ,
240
296
handle : self . handle . clone ( ) ,
241
- pool : self . pool . clone ( ) ,
297
+ pool : match self . pool {
298
+ Dispatch :: Proto ( ref pool) => Dispatch :: Proto ( pool. clone ( ) ) ,
299
+ Dispatch :: Hyper ( ref pool) => Dispatch :: Hyper ( pool. clone ( ) ) ,
300
+ }
242
301
}
243
302
}
244
303
}
@@ -249,10 +308,16 @@ impl<C, B> fmt::Debug for Client<C, B> {
249
308
}
250
309
}
251
310
252
- type TokioClient < B > = ClientProxy < Message < proto:: RequestHead , B > , Message < proto:: ResponseHead , TokioBody > , :: Error > ;
311
+ type ProtoClient < B > = ClientProxy < Message < RequestHead , B > , Message < proto:: ResponseHead , TokioBody > , :: Error > ;
312
+ type HyperClient < B > = RefCell < :: futures:: sync:: mpsc:: Sender < ( RequestHead , Option < B > , :: futures:: sync:: oneshot:: Sender < :: Result < :: Response > > ) > > ;
313
+
314
+ enum Dispatch < B > {
315
+ Proto ( Pool < ProtoClient < B > > ) ,
316
+ Hyper ( Pool < HyperClient < B > > ) ,
317
+ }
253
318
254
319
struct HttpClient < B > {
255
- client_rx : RefCell < Option < oneshot:: Receiver < Pooled < TokioClient < B > > > > > ,
320
+ client_rx : RefCell < Option < oneshot:: Receiver < Pooled < ProtoClient < B > > > > > ,
256
321
}
257
322
258
323
impl < T , B > ClientProto < T > for HttpClient < B >
@@ -265,7 +330,7 @@ where T: AsyncRead + AsyncWrite + 'static,
265
330
type Response = proto:: ResponseHead ;
266
331
type ResponseBody = proto:: Chunk ;
267
332
type Error = :: Error ;
268
- type Transport = proto:: Conn < T , B :: Item , proto:: ClientTransaction , proto :: ProtoDispatch , Pooled < TokioClient < B > > > ;
333
+ type Transport = proto:: Conn < T , B :: Item , proto:: ClientTransaction , Pooled < ProtoClient < B > > > ;
269
334
type BindTransport = BindingClient < T , B > ;
270
335
271
336
fn bind_transport ( & self , io : T ) -> Self :: BindTransport {
@@ -277,7 +342,7 @@ where T: AsyncRead + AsyncWrite + 'static,
277
342
}
278
343
279
344
struct BindingClient < T , B > {
280
- rx : oneshot:: Receiver < Pooled < TokioClient < B > > > ,
345
+ rx : oneshot:: Receiver < Pooled < ProtoClient < B > > > ,
281
346
io : Option < T > ,
282
347
}
283
348
@@ -286,13 +351,13 @@ where T: AsyncRead + AsyncWrite + 'static,
286
351
B : Stream < Error =:: Error > ,
287
352
B :: Item : AsRef < [ u8 ] > ,
288
353
{
289
- type Item = proto:: Conn < T , B :: Item , proto:: ClientTransaction , proto :: ProtoDispatch , Pooled < TokioClient < B > > > ;
354
+ type Item = proto:: Conn < T , B :: Item , proto:: ClientTransaction , Pooled < ProtoClient < B > > > ;
290
355
type Error = io:: Error ;
291
356
292
357
fn poll ( & mut self ) -> Poll < Self :: Item , Self :: Error > {
293
358
match self . rx . poll ( ) {
294
359
Ok ( Async :: Ready ( client) ) => Ok ( Async :: Ready (
295
- proto:: Conn :: new ( self . io . take ( ) . expect ( "binding client io lost" ) , client, proto :: ProtoDispatch )
360
+ proto:: Conn :: new ( self . io . take ( ) . expect ( "binding client io lost" ) , client)
296
361
) ) ,
297
362
Ok ( Async :: NotReady ) => Ok ( Async :: NotReady ) ,
298
363
Err ( _canceled) => unreachable ! ( ) ,
@@ -309,6 +374,7 @@ pub struct Config<C, B> {
309
374
keep_alive_timeout : Option < Duration > ,
310
375
//TODO: make use of max_idle config
311
376
max_idle : usize ,
377
+ no_proto : bool ,
312
378
}
313
379
314
380
/// Phantom type used to signal that `Config` should create a `HttpConnector`.
@@ -324,6 +390,7 @@ impl Default for Config<UseDefaultConnector, proto::Body> {
324
390
keep_alive : true ,
325
391
keep_alive_timeout : Some ( Duration :: from_secs ( 90 ) ) ,
326
392
max_idle : 5 ,
393
+ no_proto : false ,
327
394
}
328
395
}
329
396
}
@@ -347,6 +414,7 @@ impl<C, B> Config<C, B> {
347
414
keep_alive : self . keep_alive ,
348
415
keep_alive_timeout : self . keep_alive_timeout ,
349
416
max_idle : self . max_idle ,
417
+ no_proto : self . no_proto ,
350
418
}
351
419
}
352
420
@@ -360,6 +428,7 @@ impl<C, B> Config<C, B> {
360
428
keep_alive : self . keep_alive ,
361
429
keep_alive_timeout : self . keep_alive_timeout ,
362
430
max_idle : self . max_idle ,
431
+ no_proto : self . no_proto ,
363
432
}
364
433
}
365
434
@@ -393,6 +462,13 @@ impl<C, B> Config<C, B> {
393
462
self
394
463
}
395
464
*/
465
+
466
+ /// Disable tokio-proto internal usage.
467
+ #[ inline]
468
+ pub fn no_proto ( mut self ) -> Config < C , B > {
469
+ self . no_proto = true ;
470
+ self
471
+ }
396
472
}
397
473
398
474
impl < C , B > Config < C , B >
@@ -431,11 +507,8 @@ impl<C, B> fmt::Debug for Config<C, B> {
431
507
impl < C : Clone , B > Clone for Config < C , B > {
432
508
fn clone ( & self ) -> Config < C , B > {
433
509
Config {
434
- _body_type : PhantomData :: < B > ,
435
510
connector : self . connector . clone ( ) ,
436
- keep_alive : self . keep_alive ,
437
- keep_alive_timeout : self . keep_alive_timeout ,
438
- max_idle : self . max_idle ,
511
+ .. * self
439
512
}
440
513
}
441
514
}
0 commit comments