Skip to content

Commit 892a787

Browse files
weissiLukasa
authored andcommitted
allow completely single-threaded NIO programs (apple#1499)
Motivation: Usually, you spawn a new MultiThreadedEventLoopGroup to run NIO programs. But if you're a very simple command line utility, that may not be necessary, so why not just re-use the main thread? Modifications: Allow taking over threads and making them EventLoops. Result: Fewer threads if you want. Co-authored-by: Cory Benfield <[email protected]>
1 parent d986cef commit 892a787

File tree

4 files changed

+177
-26
lines changed

4 files changed

+177
-26
lines changed

Sources/NIO/EventLoop.swift

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,31 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
748748
private let shutdownLock: Lock = Lock()
749749
private var runState: RunState = .running
750750

751+
private static func runTheLoop(thread: NIOThread,
752+
canEventLoopBeShutdownIndividually: Bool,
753+
selectorFactory: @escaping () throws -> NIO.Selector<NIORegistration>,
754+
initializer: @escaping ThreadInitializer,
755+
_ callback: @escaping (SelectableEventLoop) -> Void) {
756+
assert(NIOThread.current == thread)
757+
initializer(thread)
758+
759+
do {
760+
let loop = SelectableEventLoop(thread: thread,
761+
selector: try selectorFactory(),
762+
canBeShutdownIndividually: canEventLoopBeShutdownIndividually)
763+
threadSpecificEventLoop.currentValue = loop
764+
defer {
765+
threadSpecificEventLoop.currentValue = nil
766+
}
767+
callback(loop)
768+
try loop.run()
769+
} catch {
770+
// We fatalError here because the only reasons this can be hit is if the underlying kqueue/epoll give us
771+
// errors that we cannot handle which is an unrecoverable error for us.
772+
fatalError("Unexpected error while running SelectableEventLoop: \(error).")
773+
}
774+
}
775+
751776
private static func setupThreadAndEventLoop(name: String,
752777
selectorFactory: @escaping () throws -> NIO.Selector<NIORegistration>,
753778
initializer: @escaping ThreadInitializer) -> SelectableEventLoop {
@@ -760,22 +785,14 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
760785

761786
loopUpAndRunningGroup.enter()
762787
NIOThread.spawnAndRun(name: name, detachThread: false) { t in
763-
initializer(t)
764-
765-
do {
766-
/* we try! this as this must work (just setting up kqueue/epoll) or else there's not much we can do here */
767-
let l = SelectableEventLoop(thread: t, selector: try! selectorFactory())
768-
threadSpecificEventLoop.currentValue = l
769-
defer {
770-
threadSpecificEventLoop.currentValue = nil
771-
}
788+
MultiThreadedEventLoopGroup.runTheLoop(thread: t,
789+
canEventLoopBeShutdownIndividually: false, // part of MTELG
790+
selectorFactory: selectorFactory,
791+
initializer: initializer) { l in
772792
lock.withLock {
773793
_loop = l
774794
}
775795
loopUpAndRunningGroup.leave()
776-
try l.run()
777-
} catch let err {
778-
fatalError("unexpected error while executing EventLoop \(err)")
779796
}
780797
}
781798
loopUpAndRunningGroup.wait()
@@ -907,7 +924,7 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
907924

908925
g.notify(queue: q) {
909926
for loop in self.eventLoops {
910-
loop.syncFinaliseClose()
927+
loop.syncFinaliseClose(joinThread: true)
911928
}
912929
var overallError: Error?
913930
var queueCallbackPairs: [(DispatchQueue, (Error?) -> Void)]? = nil
@@ -937,6 +954,25 @@ public final class MultiThreadedEventLoopGroup: EventLoopGroup {
937954
}
938955
}
939956
}
957+
958+
/// Convert the calling thread into an `EventLoop`.
959+
///
960+
/// This function will not return until the `EventLoop` has stopped. You can initiate stopping the `EventLoop` by
961+
/// calling `eventLoop.shutdownGracefully` which will eventually make this function return.
962+
///
963+
/// - parameters:
964+
/// - callback: Called _on_ the `EventLoop` that the calling thread was converted to, providing you the
965+
/// `EventLoop` reference. Just like usually on the `EventLoop`, do not block in `callback`.
966+
public static func withCurrentThreadAsEventLoop(_ callback: @escaping (EventLoop) -> Void) {
967+
let callingThread = NIOThread.current
968+
MultiThreadedEventLoopGroup.runTheLoop(thread: callingThread,
969+
canEventLoopBeShutdownIndividually: true,
970+
selectorFactory: NIO.Selector<NIORegistration>.init,
971+
initializer: { _ in }) { loop in
972+
loop.assertInEventLoop()
973+
callback(loop)
974+
}
975+
}
940976
}
941977

942978
extension MultiThreadedEventLoopGroup: CustomStringConvertible {

Sources/NIO/SelectableEventLoop.swift

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ internal final class SelectableEventLoop: EventLoop {
6060
internal var _scheduledTasks = PriorityQueue<ScheduledTask>(ascending: true)
6161
private var tasksCopy = ContiguousArray<() -> Void>()
6262

63+
private let canBeShutdownIndividually: Bool
6364
@usableFromInline
6465
internal let _tasksLock = Lock()
6566
private let _externalStateLock = Lock()
@@ -131,7 +132,7 @@ internal final class SelectableEventLoop: EventLoop {
131132
}
132133
}
133134

134-
internal init(thread: NIOThread, selector: NIO.Selector<NIORegistration>) {
135+
internal init(thread: NIOThread, selector: NIO.Selector<NIORegistration>, canBeShutdownIndividually: Bool) {
135136
self._selector = selector
136137
self.thread = thread
137138
self._iovecs = UnsafeMutablePointer.allocate(capacity: Socket.writevLimitIOVectors)
@@ -144,6 +145,7 @@ internal final class SelectableEventLoop: EventLoop {
144145
self.addresses = UnsafeMutableBufferPointer(start: _addresses, count: Socket.writevLimitIOVectors)
145146
// We will process 4096 tasks per while loop.
146147
self.tasksCopy.reserveCapacity(4096)
148+
self.canBeShutdownIndividually = canBeShutdownIndividually
147149
}
148150

149151
deinit {
@@ -369,11 +371,21 @@ internal final class SelectableEventLoop: EventLoop {
369371
}
370372
}
371373
var nextReadyTask: ScheduledTask? = nil
374+
self._tasksLock.withLock {
375+
if let firstTask = self._scheduledTasks.peek() {
376+
// The reason this is necessary is a very interesting race:
377+
// In theory (and with `makeEventLoopFromCallingThread` even in practise), we could publish an
378+
// `EventLoop` reference _before_ the EL thread has entered the `run` function.
379+
// If that is the case, we need to schedule the first wakeup at the ready time for this task that was
380+
// enqueued really early on, so let's do that :).
381+
nextReadyTask = firstTask
382+
}
383+
}
372384
while self.internalState != .noLongerRunning && self.internalState != .exitingThread {
373385
// Block until there are events to handle or the selector was woken up
374386
/* for macOS: in case any calls we make to Foundation put objects into an autoreleasepool */
375387
try withAutoReleasePool {
376-
try _selector.whenReady(strategy: currentSelectorStrategy(nextReadyTask: nextReadyTask)) { ev in
388+
try self._selector.whenReady(strategy: currentSelectorStrategy(nextReadyTask: nextReadyTask)) { ev in
377389
switch ev.registration {
378390
case .serverSocketChannel(let chan, _):
379391
self.handleEvent(ev.io, channel: chan)
@@ -397,7 +409,7 @@ internal final class SelectableEventLoop: EventLoop {
397409
// We need to ensure we process all tasks, even if a task added another task again
398410
while true {
399411
// TODO: Better locking
400-
_tasksLock.withLockVoid {
412+
self._tasksLock.withLockVoid {
401413
if !self._scheduledTasks.isEmpty {
402414
// We only fetch the time one time as this may be expensive and is generally good enough as if we miss anything we will just do a non-blocking select again anyway.
403415
let now: NIODeadline = .now()
@@ -406,7 +418,7 @@ internal final class SelectableEventLoop: EventLoop {
406418
while tasksCopy.count < tasksCopy.capacity, let task = self._scheduledTasks.peek() {
407419
if task.readyIn(now) <= .nanoseconds(0) {
408420
self._scheduledTasks.pop()
409-
tasksCopy.append(task.task)
421+
self.tasksCopy.append(task.task)
410422
} else {
411423
nextReadyTask = task
412424
break
@@ -419,19 +431,19 @@ internal final class SelectableEventLoop: EventLoop {
419431
}
420432

421433
// all pending tasks are set to occur in the future, so we can stop looping.
422-
if tasksCopy.isEmpty {
434+
if self.tasksCopy.isEmpty {
423435
break
424436
}
425437

426438
// Execute all the tasks that were summited
427-
for task in tasksCopy {
439+
for task in self.tasksCopy {
428440
/* for macOS: in case any calls we make to Foundation put objects into an autoreleasepool */
429441
withAutoReleasePool {
430442
task()
431443
}
432444
}
433445
// Drop everything (but keep the capacity) so we can fill it again on the next iteration.
434-
tasksCopy.removeAll(keepingCapacity: true)
446+
self.tasksCopy.removeAll(keepingCapacity: true)
435447
}
436448
}
437449

@@ -490,7 +502,9 @@ internal final class SelectableEventLoop: EventLoop {
490502
}
491503
}
492504

493-
internal func syncFinaliseClose() {
505+
internal func syncFinaliseClose(joinThread: Bool) {
506+
// This may not be true in the future but today we need to join all ELs that can't be shut down individually.
507+
assert(joinThread != self.canBeShutdownIndividually)
494508
let goAhead = self.externalStateLock.withLock { () -> Bool in
495509
switch self.externalState {
496510
case .closed:
@@ -505,7 +519,9 @@ internal final class SelectableEventLoop: EventLoop {
505519
guard goAhead else {
506520
return
507521
}
508-
self.thread.join()
522+
if joinThread {
523+
self.thread.join()
524+
}
509525
self.externalStateLock.withLock {
510526
precondition(self.externalState == .reclaimingResources)
511527
self.externalState = .resourcesReclaimed
@@ -514,10 +530,22 @@ internal final class SelectableEventLoop: EventLoop {
514530

515531
@usableFromInline
516532
func shutdownGracefully(queue: DispatchQueue, _ callback: @escaping (Error?) -> Void) {
517-
// This function is never called legally because the only possibly owner of an `SelectableEventLoop` is
518-
// `MultiThreadedEventLoopGroup` which calls `closeGently`.
519-
queue.async {
520-
callback(EventLoopError.unsupportedOperation)
533+
if self.canBeShutdownIndividually {
534+
self.initiateClose(queue: queue) { result in
535+
self.syncFinaliseClose(joinThread: false) // This thread was taken over by somebody else
536+
switch result {
537+
case .success:
538+
callback(nil)
539+
case .failure(let error):
540+
callback(error)
541+
}
542+
}
543+
} else {
544+
// This function is never called legally because the only possibly owner of an `SelectableEventLoop` is
545+
// `MultiThreadedEventLoopGroup` which calls `initiateClose` followed by `syncFinaliseClose`.
546+
queue.async {
547+
callback(EventLoopError.unsupportedOperation)
548+
}
521549
}
522550
}
523551
}

Tests/NIOTests/EventLoopTest+XCTest.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ extension EventLoopTest {
6969
("testMultiThreadedEventLoopGroupDescription", testMultiThreadedEventLoopGroupDescription),
7070
("testSafeToExecuteTrue", testSafeToExecuteTrue),
7171
("testSafeToExecuteFalse", testSafeToExecuteFalse),
72+
("testTakeOverThreadAndAlsoTakeItBack", testTakeOverThreadAndAlsoTakeItBack),
73+
("testWeCanDoTrulySingleThreadedNetworking", testWeCanDoTrulySingleThreadedNetworking),
7274
]
7375
}
7476
}

Tests/NIOTests/EventLoopTest.swift

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1085,4 +1085,89 @@ public final class EventLoopTest : XCTestCase {
10851085
XCTAssertFalse(loop.testsOnly_validExternalStateToScheduleTasks)
10861086
XCTAssertFalse(loop.testsOnly_validExternalStateToScheduleTasks)
10871087
}
1088+
1089+
func testTakeOverThreadAndAlsoTakeItBack() {
1090+
let currentNIOThread = NIOThread.current
1091+
let currentNSThread = Thread.current
1092+
let lock = Lock()
1093+
var hasBeenShutdown = false
1094+
let allDoneGroup = DispatchGroup()
1095+
allDoneGroup.enter()
1096+
MultiThreadedEventLoopGroup.withCurrentThreadAsEventLoop { loop in
1097+
XCTAssertEqual(currentNIOThread, NIOThread.current)
1098+
XCTAssertEqual(currentNSThread, Thread.current)
1099+
XCTAssert(loop === MultiThreadedEventLoopGroup.currentEventLoop)
1100+
loop.shutdownGracefully(queue: DispatchQueue.global()) { error in
1101+
XCTAssertNil(error)
1102+
lock.withLock {
1103+
hasBeenShutdown = error == nil
1104+
}
1105+
allDoneGroup.leave()
1106+
}
1107+
}
1108+
allDoneGroup.wait()
1109+
XCTAssertTrue(lock.withLock { hasBeenShutdown })
1110+
}
1111+
1112+
func testWeCanDoTrulySingleThreadedNetworking() {
1113+
final class SaveReceivedByte: ChannelInboundHandler {
1114+
typealias InboundIn = ByteBuffer
1115+
1116+
// For once, we don't need thread-safety as we're taking the calling thread :)
1117+
var received: UInt8? = nil
1118+
var readCalls: Int = 0
1119+
var allDonePromise: EventLoopPromise<Void>? = nil
1120+
1121+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
1122+
self.readCalls += 1
1123+
XCTAssertEqual(1, self.readCalls)
1124+
1125+
var data = self.unwrapInboundIn(data)
1126+
XCTAssertEqual(1, data.readableBytes)
1127+
1128+
XCTAssertNil(self.received)
1129+
self.received = data.readInteger()
1130+
1131+
self.allDonePromise?.succeed(())
1132+
1133+
context.close(promise: nil)
1134+
}
1135+
}
1136+
1137+
let receiveHandler = SaveReceivedByte() // There'll be just one connection, we can share.
1138+
MultiThreadedEventLoopGroup.withCurrentThreadAsEventLoop { loop in
1139+
ServerBootstrap(group: loop)
1140+
.serverChannelOption(ChannelOptions.socket(.init(SOL_SOCKET), .init(SO_REUSEADDR)), value: 1)
1141+
.childChannelInitializer { accepted in
1142+
accepted.pipeline.addHandler(receiveHandler)
1143+
}
1144+
.bind(host: "127.0.0.1", port: 0)
1145+
.flatMap { serverChannel in
1146+
ClientBootstrap(group: loop).connect(to: serverChannel.localAddress!).flatMap { clientChannel in
1147+
var buffer = clientChannel.allocator.buffer(capacity: 1)
1148+
buffer.writeString("J")
1149+
return clientChannel.writeAndFlush(buffer)
1150+
}.flatMap {
1151+
XCTAssertNil(receiveHandler.allDonePromise)
1152+
receiveHandler.allDonePromise = loop.makePromise()
1153+
return receiveHandler.allDonePromise!.futureResult
1154+
}.flatMap {
1155+
serverChannel.close()
1156+
}
1157+
}.whenComplete { (result: Result<Void, Error>) -> Void in
1158+
func workaroundSR9815withAUselessFunction() {
1159+
XCTAssertNoThrow(try result.get())
1160+
}
1161+
workaroundSR9815withAUselessFunction()
1162+
1163+
// All done, let's return back into the calling thread.
1164+
loop.shutdownGracefully { error in
1165+
XCTAssertNil(error)
1166+
}
1167+
}
1168+
}
1169+
1170+
// All done, the EventLoop is terminated so we should be able to check the results.
1171+
XCTAssertEqual(UInt8(ascii: "J"), receiveHandler.received)
1172+
}
10881173
}

0 commit comments

Comments
 (0)