1
1
extern crate futures;
2
- extern crate tokio_core ;
2
+ extern crate tokio ;
3
3
extern crate tokio_io;
4
4
extern crate tokio_service;
5
5
extern crate tokio_io_timeout;
@@ -8,22 +8,18 @@ extern crate hyper;
8
8
use std:: time:: Duration ;
9
9
use std:: io;
10
10
11
- use futures:: future :: { Either , Future } ;
11
+ use futures:: Future ;
12
12
13
- use tokio_core:: reactor:: { Handle , Timeout } ;
14
- use tokio_io:: { AsyncRead , AsyncWrite } ;
15
- use tokio_service:: Service ;
13
+ use tokio:: timer:: Timeout ;
16
14
use tokio_io_timeout:: TimeoutStream ;
17
15
18
- use hyper:: client:: Connect ;
16
+ use hyper:: client:: connect :: { Connect , Connected , Destination } ;
19
17
20
18
/// A connector that enforces as connection timeout
21
19
#[ derive( Debug ) ]
22
20
pub struct TimeoutConnector < T > {
23
21
/// A connector implementing the `Connect` trait
24
22
connector : T ,
25
- /// Handle to be used to set the timeout within tokio's core
26
- handle : Handle ,
27
23
/// Amount of time to wait connecting
28
24
connect_timeout : Option < Duration > ,
29
25
/// Amount of time to wait reading response
@@ -34,16 +30,56 @@ pub struct TimeoutConnector<T> {
34
30
35
31
impl < T : Connect > TimeoutConnector < T > {
36
32
/// Construct a new TimeoutConnector with a given connector implementing the `Connect` trait
37
- pub fn new ( connector : T , handle : & Handle ) -> Self {
33
+ pub fn new ( connector : T ) -> Self {
38
34
TimeoutConnector {
39
35
connector : connector,
40
- handle : handle. clone ( ) ,
41
36
connect_timeout : None ,
42
37
read_timeout : None ,
43
38
write_timeout : None ,
44
39
}
45
40
}
41
+ }
42
+
43
+ impl < T : Connect > Connect for TimeoutConnector < T >
44
+ where
45
+ T : Connect < Error = io:: Error > + ' static ,
46
+ T :: Future : ' static ,
47
+ {
48
+ type Transport = TimeoutStream < T :: Transport > ;
49
+ type Error = T :: Error ;
50
+ type Future = Box < Future < Item = ( Self :: Transport , Connected ) , Error = Self :: Error > + Send > ;
51
+
52
+ fn connect ( & self , dst : Destination ) -> Self :: Future {
53
+
54
+ let read_timeout = self . read_timeout . clone ( ) ;
55
+ let write_timeout = self . write_timeout . clone ( ) ;
56
+ let connecting = self . connector . connect ( dst) ;
57
+
58
+ if self . connect_timeout . is_none ( ) {
59
+ return Box :: new ( connecting. map ( move |( io, c) | {
60
+ let mut tm = TimeoutStream :: new ( io) ;
61
+ tm. set_read_timeout ( read_timeout) ;
62
+ tm. set_write_timeout ( write_timeout) ;
63
+ ( tm, c)
64
+ } ) ) ;
65
+ }
46
66
67
+ let connect_timeout = self . connect_timeout . expect ( "Connect timeout should be set" ) ;
68
+ let timeout = Timeout :: new ( connecting, connect_timeout) ;
69
+
70
+ Box :: new ( timeout. then ( move |res| match res {
71
+ Ok ( ( io, c) ) => {
72
+ let mut tm = TimeoutStream :: new ( io) ;
73
+ tm. set_read_timeout ( read_timeout) ;
74
+ tm. set_write_timeout ( write_timeout) ;
75
+ Ok ( ( tm, c) )
76
+ }
77
+ Err ( e) => Err ( io:: Error :: new ( io:: ErrorKind :: TimedOut , e) ) ,
78
+ } ) )
79
+ }
80
+ }
81
+
82
+ impl < T > TimeoutConnector < T > {
47
83
/// Set the timeout for connecting to a URL.
48
84
///
49
85
/// Default is no timeout.
@@ -69,76 +105,70 @@ impl<T: Connect> TimeoutConnector<T> {
69
105
}
70
106
}
71
107
72
- impl < T > Service for TimeoutConnector < T >
73
- where
74
- T : Service < Error = io:: Error > + ' static ,
75
- T :: Response : AsyncRead + AsyncWrite ,
76
- T :: Future : Future < Error = io:: Error > ,
77
- {
78
- type Request = T :: Request ;
79
- type Response = TimeoutStream < T :: Response > ;
80
- type Error = T :: Error ;
81
- type Future = Box < Future < Item = Self :: Response , Error = Self :: Error > > ;
82
-
83
- fn call ( & self , req : Self :: Request ) -> Self :: Future {
84
- let handle = self . handle . clone ( ) ;
85
- let read_timeout = self . read_timeout . clone ( ) ;
86
- let write_timeout = self . write_timeout . clone ( ) ;
87
- let connecting = self . connector . call ( req) ;
88
-
89
- if self . connect_timeout . is_none ( ) {
90
- return Box :: new ( connecting. map ( move |io| {
91
- let mut tm = TimeoutStream :: new ( io, & handle) ;
92
- tm. set_read_timeout ( read_timeout) ;
93
- tm. set_write_timeout ( write_timeout) ;
94
- tm
95
- } ) ) ;
96
- }
97
-
98
- let connect_timeout = self . connect_timeout . expect ( "Connect timeout should be set" ) ;
99
- let timeout = Timeout :: new ( connect_timeout, & self . handle ) . unwrap ( ) ;
100
-
101
- Box :: new ( connecting. select2 ( timeout) . then ( move |res| match res {
102
- Ok ( Either :: A ( ( io, _) ) ) => {
103
- let mut tm = TimeoutStream :: new ( io, & handle) ;
104
- tm. set_read_timeout ( read_timeout) ;
105
- tm. set_write_timeout ( write_timeout) ;
106
- Ok ( tm)
107
- }
108
- Ok ( Either :: B ( ( _, _) ) ) => {
109
- Err ( io:: Error :: new (
110
- io:: ErrorKind :: TimedOut ,
111
- "Client timed out while connecting" ,
112
- ) )
113
- }
114
- Err ( Either :: A ( ( e, _) ) ) => Err ( e) ,
115
- Err ( Either :: B ( ( e, _) ) ) => Err ( e) ,
116
- } ) )
117
- }
118
- }
119
-
120
108
#[ cfg( test) ]
121
109
mod tests {
110
+ use std:: error:: Error ;
122
111
use std:: io;
123
112
use std:: time:: Duration ;
124
- use tokio_core:: reactor:: Core ;
125
- use hyper:: client:: { Connect , HttpConnector } ;
113
+ use futures:: future;
114
+ use tokio:: runtime:: current_thread:: Runtime ;
115
+ use hyper:: Client ;
116
+ use hyper:: client:: HttpConnector ;
126
117
use super :: TimeoutConnector ;
127
118
128
119
#[ test]
129
120
fn test_timeout_connector ( ) {
130
- let mut core = Core :: new ( ) . unwrap ( ) ;
131
- // 10.255.255.1 is a not a routable IP address
132
- let url = "http://10.255.255.1" . parse ( ) . unwrap ( ) ;
133
- let mut connector =
134
- TimeoutConnector :: new ( HttpConnector :: new ( 1 , & core. handle ( ) ) , & core. handle ( ) ) ;
135
- connector. set_connect_timeout ( Some ( Duration :: from_millis ( 1 ) ) ) ;
136
-
137
- match core. run ( connector. connect ( url) ) {
121
+ let mut rt = Runtime :: new ( ) . unwrap ( ) ;
122
+ let res = rt. block_on ( future:: lazy ( || {
123
+ // 10.255.255.1 is a not a routable IP address
124
+ let url = "http://10.255.255.1" . parse ( ) . unwrap ( ) ;
125
+
126
+ let http = HttpConnector :: new ( 1 ) ;
127
+ let mut connector = TimeoutConnector :: new ( http) ;
128
+ connector. set_connect_timeout ( Some ( Duration :: from_millis ( 1 ) ) ) ;
129
+
130
+ let client = Client :: builder ( ) . build :: < _ , hyper:: Body > ( connector) ;
131
+
132
+ client. get ( url)
133
+ } ) ) ;
134
+
135
+ match res {
136
+ Ok ( _) => panic ! ( "Expected a timeout" ) ,
137
+ Err ( e) => {
138
+ if let Some ( io_e) = e. source ( ) . unwrap ( ) . downcast_ref :: < io:: Error > ( ) {
139
+ assert_eq ! ( io_e. kind( ) , io:: ErrorKind :: TimedOut ) ;
140
+ } else {
141
+ panic ! ( "Expected timeout error" ) ;
142
+ }
143
+ }
144
+ }
145
+ }
146
+
147
+ #[ test]
148
+ fn test_read_timeout ( ) {
149
+ let mut rt = Runtime :: new ( ) . unwrap ( ) ;
150
+ let res = rt. block_on ( future:: lazy ( || {
151
+ let url = "http://example.com" . parse ( ) . unwrap ( ) ;
152
+
153
+ let http = HttpConnector :: new ( 1 ) ;
154
+ let mut connector = TimeoutConnector :: new ( http) ;
155
+ // A 1 ms read timeout should be so short that we trigger a timeout error
156
+ connector. set_read_timeout ( Some ( Duration :: from_millis ( 1 ) ) ) ;
157
+
158
+ let client = Client :: builder ( ) . build :: < _ , hyper:: Body > ( connector) ;
159
+
160
+ client. get ( url)
161
+ } ) ) ;
162
+
163
+ match res {
164
+ Ok ( _) => panic ! ( "Expected a timeout" ) ,
138
165
Err ( e) => {
139
- assert_eq ! ( e. kind( ) , io:: ErrorKind :: TimedOut ) ;
166
+ if let Some ( io_e) = e. source ( ) . unwrap ( ) . downcast_ref :: < io:: Error > ( ) {
167
+ assert_eq ! ( io_e. kind( ) , io:: ErrorKind :: TimedOut ) ;
168
+ } else {
169
+ panic ! ( "Expected timeout error" ) ;
170
+ }
140
171
}
141
- _ => panic ! ( "Expected timeout error" ) ,
142
172
}
143
173
}
144
174
}
0 commit comments