19
19
#endif
20
20
import Logging
21
21
import NIOCore
22
+ import NIOHTTP1
22
23
import NIOPosix
23
24
import NIOSSL
24
25
import XCTest
25
26
26
27
class HTTP2ClientTests : XCTestCase {
27
- func makeDefaultHTTPClient( ) -> HTTPClient {
28
+ func makeDefaultHTTPClient(
29
+ eventLoopGroupProvider: HTTPClient . EventLoopGroupProvider = . createNew
30
+ ) -> HTTPClient {
28
31
var tlsConfig = TLSConfiguration . makeClientConfiguration ( )
29
32
tlsConfig. certificateVerification = . none
30
33
return HTTPClient (
31
- eventLoopGroupProvider: . createNew ,
34
+ eventLoopGroupProvider: eventLoopGroupProvider ,
32
35
configuration: HTTPClient . Configuration (
33
36
tlsConfiguration: tlsConfig,
34
37
httpVersion: . automatic
@@ -37,6 +40,18 @@ class HTTP2ClientTests: XCTestCase {
37
40
)
38
41
}
39
42
43
+ func makeClientWithActiveHTTP2Connection< RequestHandler> (
44
+ to bin: HTTPBin < RequestHandler > ,
45
+ eventLoopGroupProvider: HTTPClient . EventLoopGroupProvider = . createNew
46
+ ) -> HTTPClient {
47
+ let client = self . makeDefaultHTTPClient ( eventLoopGroupProvider: eventLoopGroupProvider)
48
+ var response : HTTPClient . Response ?
49
+ XCTAssertNoThrow ( response = try client. get ( url: " https://localhost: \( bin. port) /get " ) . wait ( ) )
50
+ XCTAssertEqual ( . ok, response? . status)
51
+ XCTAssertEqual ( response? . version, . http2)
52
+ return client
53
+ }
54
+
40
55
func testSimpleGet( ) {
41
56
let bin = HTTPBin ( . http2( compress: false ) )
42
57
defer { XCTAssertNoThrow ( try bin. shutdown ( ) ) }
@@ -92,7 +107,7 @@ class HTTP2ClientTests: XCTestCase {
92
107
93
108
for _ in 0 ..< numberOfRequestsPerWorkers {
94
109
var response : HTTPClient . Response ?
95
- XCTAssertNoThrow ( response = try client. get ( url: url ) . wait ( ) )
110
+ XCTAssertNoThrow ( response = try client. get ( url: " https://localhost: \( bin . port ) /get " ) . wait ( ) )
96
111
97
112
XCTAssertEqual ( . ok, response? . status)
98
113
XCTAssertEqual ( response? . version, . http2)
@@ -187,4 +202,193 @@ class HTTP2ClientTests: XCTestCase {
187
202
// all workers should be running, let's wait for them to finish
188
203
allDone. wait ( )
189
204
}
205
+
206
+ func testUncleanShutdownCancelsExecutingAndQueuedTasks( ) {
207
+ let bin = HTTPBin ( . http2( compress: false ) )
208
+ defer { XCTAssertNoThrow ( try bin. shutdown ( ) ) }
209
+ let clientGroup = MultiThreadedEventLoopGroup ( numberOfThreads: 1 )
210
+ defer { XCTAssertNoThrow ( try clientGroup. syncShutdownGracefully ( ) ) }
211
+ // we need an active connection to guarantee that requests are executed immediately
212
+ // without waiting for connection establishment
213
+ let client = self . makeClientWithActiveHTTP2Connection ( to: bin, eventLoopGroupProvider: . shared( clientGroup) )
214
+
215
+ // start 20 requests which are guaranteed to never get any response
216
+ // 10 of them will executed and the other 10 will be queued
217
+ // because HTTPBin has a default `maxConcurrentStreams` limit of 10
218
+ let responses = ( 0 ..< 20 ) . map { _ in
219
+ client. get ( url: " https://localhost: \( bin. port) /wait " )
220
+ }
221
+
222
+ XCTAssertNoThrow ( try client. syncShutdown ( ) )
223
+
224
+ var results : [ Result < HTTPClient . Response , Error > ] = [ ]
225
+ XCTAssertNoThrow ( results = try EventLoopFuture
226
+ . whenAllComplete ( responses, on: clientGroup. next ( ) )
227
+ . timeout ( after: . seconds( 2 ) )
228
+ . wait ( ) )
229
+
230
+ for result in results {
231
+ switch result {
232
+ case . success:
233
+ XCTFail ( " Shouldn't succeed " )
234
+ case . failure( let error) :
235
+ XCTAssertEqual ( error as? HTTPClientError , . cancelled)
236
+ }
237
+ }
238
+ }
239
+
240
+ func testCancelingRunningRequest( ) {
241
+ let bin = HTTPBin ( . http2( compress: false ) ) { _ in SendHeaderAndWaitChannelHandler ( ) }
242
+ defer { XCTAssertNoThrow ( try bin. shutdown ( ) ) }
243
+ let client = self . makeDefaultHTTPClient ( )
244
+ defer { XCTAssertNoThrow ( try client. syncShutdown ( ) ) }
245
+
246
+ var maybeRequest : HTTPClient . Request ?
247
+ XCTAssertNoThrow ( maybeRequest = try HTTPClient . Request ( url: " https://localhost: \( bin. port) " ) )
248
+ guard let request = maybeRequest else { return }
249
+
250
+ var task : HTTPClient . Task < Void > !
251
+ let delegate = HeadReceivedCallback { _ in
252
+ // request is definitely running because we just received a head from the server
253
+ task. cancel ( )
254
+ }
255
+ task = client. execute (
256
+ request: request,
257
+ delegate: delegate
258
+ )
259
+
260
+ XCTAssertThrowsError ( try task. futureResult. timeout ( after: . seconds( 2 ) ) . wait ( ) ) {
261
+ XCTAssertEqual ( $0 as? HTTPClientError , . cancelled)
262
+ }
263
+ }
264
+
265
+ func testStressCancelingRunningRequestFromDifferentThreads( ) {
266
+ let bin = HTTPBin ( . http2( compress: false ) ) { _ in SendHeaderAndWaitChannelHandler ( ) }
267
+ defer { XCTAssertNoThrow ( try bin. shutdown ( ) ) }
268
+ let client = self . makeDefaultHTTPClient ( )
269
+ defer { XCTAssertNoThrow ( try client. syncShutdown ( ) ) }
270
+ let cancelPool = MultiThreadedEventLoopGroup ( numberOfThreads: 10 )
271
+ defer { XCTAssertNoThrow ( try cancelPool. syncShutdownGracefully ( ) ) }
272
+
273
+ var maybeRequest : HTTPClient . Request ?
274
+ XCTAssertNoThrow ( maybeRequest = try HTTPClient . Request ( url: " https://localhost: \( bin. port) " ) )
275
+ guard let request = maybeRequest else { return }
276
+
277
+ let tasks = ( 0 ..< 100 ) . map { _ -> HTTPClient . Task < TestHTTPDelegate . Response > in
278
+ var task : HTTPClient . Task < Void > !
279
+ let delegate = HeadReceivedCallback { _ in
280
+ // request is definitely running because we just received a head from the server
281
+ cancelPool. next ( ) . execute {
282
+ // canceling from a different thread
283
+ task. cancel ( )
284
+ }
285
+ }
286
+ task = client. execute (
287
+ request: request,
288
+ delegate: delegate
289
+ )
290
+ return task
291
+ }
292
+
293
+ for task in tasks {
294
+ XCTAssertThrowsError ( try task. futureResult. timeout ( after: . seconds( 2 ) ) . wait ( ) ) {
295
+ XCTAssertEqual ( $0 as? HTTPClientError , . cancelled)
296
+ }
297
+ }
298
+ }
299
+
300
+ func testPlatformConnectErrorIsForwardedOnTimeout( ) {
301
+ let bin = HTTPBin ( . http2( compress: false ) )
302
+ let clientGroup = MultiThreadedEventLoopGroup ( numberOfThreads: 2 )
303
+ let el1 = clientGroup. next ( )
304
+ let el2 = clientGroup. next ( )
305
+ defer { XCTAssertNoThrow ( try clientGroup. syncShutdownGracefully ( ) ) }
306
+ var tlsConfig = TLSConfiguration . makeClientConfiguration ( )
307
+ tlsConfig. certificateVerification = . none
308
+ let client = HTTPClient (
309
+ eventLoopGroupProvider: . shared( clientGroup) ,
310
+ configuration: HTTPClient . Configuration (
311
+ tlsConfiguration: tlsConfig,
312
+ timeout: . init( connect: . milliseconds( 1000 ) ) ,
313
+ httpVersion: . automatic
314
+ ) ,
315
+ backgroundActivityLogger: Logger ( label: " HTTPClient " , factory: StreamLogHandler . standardOutput ( label: ) )
316
+ )
317
+ defer { XCTAssertNoThrow ( try client. syncShutdown ( ) ) }
318
+
319
+ var maybeRequest1 : HTTPClient . Request ?
320
+ XCTAssertNoThrow ( maybeRequest1 = try HTTPClient . Request ( url: " https://localhost: \( bin. port) /get " ) )
321
+ guard let request1 = maybeRequest1 else { return }
322
+
323
+ let task1 = client. execute ( request: request1, delegate: ResponseAccumulator ( request: request1) , eventLoop: . delegateAndChannel( on: el1) )
324
+ var response1 : ResponseAccumulator . Response ?
325
+ XCTAssertNoThrow ( response1 = try task1. wait ( ) )
326
+
327
+ XCTAssertEqual ( . ok, response1? . status)
328
+ XCTAssertEqual ( response1? . version, . http2)
329
+ let serverPort = bin. port
330
+ XCTAssertNoThrow ( try bin. shutdown ( ) )
331
+ // client is now in HTTP/2 state and the HTTPBin is closed
332
+ // start a new server on the old port which closes all connections immediately
333
+ let serverGroup = MultiThreadedEventLoopGroup ( numberOfThreads: 1 )
334
+ defer { XCTAssertNoThrow ( try serverGroup. syncShutdownGracefully ( ) ) }
335
+ var maybeServer : Channel ?
336
+ XCTAssertNoThrow ( maybeServer = try ServerBootstrap ( group: serverGroup)
337
+ . serverChannelOption ( ChannelOptions . socketOption ( . so_reuseaddr) , value: 1 )
338
+ . childChannelInitializer { channel in
339
+ channel. close ( )
340
+ }
341
+ . childChannelOption ( ChannelOptions . socketOption ( . so_reuseaddr) , value: 1 )
342
+ . bind ( host: " 0.0.0.0 " , port: serverPort)
343
+ . wait ( ) )
344
+ guard let server = maybeServer else { return }
345
+ defer { XCTAssertNoThrow ( try server. close ( ) . wait ( ) ) }
346
+
347
+ var maybeRequest2 : HTTPClient . Request ?
348
+ XCTAssertNoThrow ( maybeRequest2 = try HTTPClient . Request ( url: " https://localhost: \( serverPort) / " ) )
349
+ guard let request2 = maybeRequest2 else { return }
350
+
351
+ let task2 = client. execute ( request: request2, delegate: ResponseAccumulator ( request: request2) , eventLoop: . delegateAndChannel( on: el2) )
352
+ XCTAssertThrowsError ( try task2. wait ( ) ) { error in
353
+ XCTAssertNil (
354
+ error as? HTTPClientError ,
355
+ " error should be some platform specific error that the connection is closed/reset by the other side "
356
+ )
357
+ }
358
+ }
359
+ }
360
+
361
+ private final class HeadReceivedCallback : HTTPClientResponseDelegate {
362
+ typealias Response = Void
363
+ private let didReceiveHeadCallback : ( HTTPResponseHead ) -> Void
364
+ init ( didReceiveHead: @escaping ( HTTPResponseHead ) -> Void ) {
365
+ self . didReceiveHeadCallback = didReceiveHead
366
+ }
367
+
368
+ func didReceiveHead( task: HTTPClient . Task < Void > , _ head: HTTPResponseHead ) -> EventLoopFuture < Void > {
369
+ self . didReceiveHeadCallback ( head)
370
+ return task. eventLoop. makeSucceededVoidFuture ( )
371
+ }
372
+
373
+ func didFinishRequest( task: HTTPClient . Task < Void > ) throws { }
374
+ }
375
+
376
+ /// sends some headers and waits indefinitely afterwards
377
+ private final class SendHeaderAndWaitChannelHandler : ChannelInboundHandler {
378
+ typealias InboundIn = HTTPServerRequestPart
379
+ typealias OutboundOut = HTTPServerResponsePart
380
+
381
+ func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
382
+ let requestPart = self . unwrapInboundIn ( data)
383
+ switch requestPart {
384
+ case . head:
385
+ context. writeAndFlush ( self . wrapOutboundOut ( . head( HTTPResponseHead (
386
+ version: HTTPVersion ( major: 1 , minor: 1 ) ,
387
+ status: . ok
388
+ ) )
389
+ ) , promise: nil )
390
+ case . body, . end:
391
+ return
392
+ }
393
+ }
190
394
}
0 commit comments