Skip to content

Use NIO in Single Threaded Mode #68

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ let package = Package(
.library(name: "AWSLambdaTesting", targets: ["AWSLambdaTesting"]),
],
dependencies: [
.package(url: "https://github.com/apple/swift-nio.git", from: "2.8.0"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
.package(url: "https://github.com/swift-server/swift-backtrace.git", from: "1.1.0"),
.package(url: "https://github.com/apple/swift-nio.git", .upToNextMajor(from: "2.17.0")),
.package(url: "https://github.com/apple/swift-log.git", .upToNextMajor(from: "1.0.0")),
.package(url: "https://github.com/swift-server/swift-backtrace.git", .upToNextMajor(from: "1.1.0")),
],
targets: [
.target(name: "AWSLambdaRuntime", dependencies: [
Expand Down
54 changes: 31 additions & 23 deletions Sources/AWSLambdaRuntimeCore/Lambda.swift
Original file line number Diff line number Diff line change
Expand Up @@ -80,41 +80,49 @@ public enum Lambda {
@discardableResult
internal static func run(configuration: Configuration = .init(), factory: @escaping (EventLoop) throws -> Handler) -> Result<Int, Error> {
self.run(configuration: configuration, factory: { eventloop -> EventLoopFuture<Handler> in
do {
let handler = try factory(eventloop)
return eventloop.makeSucceededFuture(handler)
} catch {
return eventloop.makeFailedFuture(error)
let promise = eventloop.makePromise(of: Handler.self)
// if we have a callback based handler factory, we offload the creation of the handler
// onto the default offload queue, to ensure that the eventloop is never blocked.
Lambda.defaultOffloadQueue.async {
do {
promise.succeed(try factory(eventloop))
} catch {
promise.fail(error)
}
}
return promise.futureResult
Comment on lines +83 to +93
Copy link
Member Author

@fabianfett fabianfett May 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀 @tomerd @weissi I'm not sure if this is the best solution to the problem?!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm :)

})
}

// for testing and internal use
@discardableResult
internal static func run(configuration: Configuration = .init(), factory: @escaping HandlerFactory) -> Result<Int, Error> {
do {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) // only need one thread, will improve performance
defer { try! eventLoopGroup.syncShutdownGracefully() }
let result = try self.runAsync(eventLoopGroup: eventLoopGroup, configuration: configuration, factory: factory).wait()
return .success(result)
} catch {
return .failure(error)
}
}

internal static func runAsync(eventLoopGroup: EventLoopGroup, configuration: Configuration, factory: @escaping HandlerFactory) -> EventLoopFuture<Int> {
Backtrace.install()
var logger = Logger(label: "Lambda")
logger.logLevel = configuration.general.logLevel
let lifecycle = Lifecycle(eventLoop: eventLoopGroup.next(), logger: logger, configuration: configuration, factory: factory)
let signalSource = trap(signal: configuration.lifecycle.stopSignal) { signal in
logger.info("intercepted signal: \(signal)")
lifecycle.shutdown()
}
return lifecycle.start().flatMap {
return lifecycle.shutdownFuture.always { _ in

var result: Result<Int, Error>!
MultiThreadedEventLoopGroup.withCurrentThreadAsEventLoop { eventLoop in
let lifecycle = Lifecycle(eventLoop: eventLoop, logger: logger, configuration: configuration, factory: factory)
let signalSource = trap(signal: configuration.lifecycle.stopSignal) { signal in
logger.info("intercepted signal: \(signal)")
lifecycle.shutdown()
}

lifecycle.start().flatMap {
lifecycle.shutdownFuture
}.whenComplete { lifecycleResult in
signalSource.cancel()
eventLoop.shutdownGracefully { error in
if let error = error {
preconditionFailure("Failed to shutdown eventloop: \(error)")
}
}
result = lifecycleResult
}
}

logger.info("shutdown completed")
return result
}
}
2 changes: 1 addition & 1 deletion Sources/AWSLambdaRuntimeCore/LambdaHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public protocol LambdaHandler: EventLoopLambdaHandler {
func handle(context: Lambda.Context, payload: In, callback: @escaping (Result<Out, Error>) -> Void)
}

private extension Lambda {
internal extension Lambda {
static let defaultOffloadQueue = DispatchQueue(label: "LambdaHandler.offload")
}

Expand Down
18 changes: 10 additions & 8 deletions Tests/AWSLambdaRuntimeCoreTests/LambdaTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -128,27 +128,29 @@ class LambdaTest: XCTestCase {
assertLambdaLifecycleResult(result, shouldFailWithError: TestError("kaboom"))
}

func testStartStop() {
func testStartStopInDebugMode() {
let server = MockLambdaServer(behavior: Behavior())
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }

let signal = Signal.ALRM
let maxTimes = 1000
let configuration = Lambda.Configuration(lifecycle: .init(maxTimes: maxTimes, stopSignal: signal))
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

let future = Lambda.runAsync(eventLoopGroup: eventLoopGroup, configuration: configuration, factory: { $0.makeSucceededFuture(EchoHandler()) })
DispatchQueue(label: "test").async {
// we need to schedule the signal before we start the long running `Lambda.run`, since
// `Lambda.run` will block the main thread.
usleep(100_000)
kill(getpid(), signal.rawValue)
}
future.whenSuccess { result in
XCTAssertGreaterThan(result, 0, "should have stopped before any request made")
XCTAssertLessThan(result, maxTimes, "should have stopped before \(maxTimes)")
let result = Lambda.run(configuration: configuration, factory: { $0.makeSucceededFuture(EchoHandler()) })

guard case .success(let invocationCount) = result else {
return XCTFail("expected to have not failed")
}
XCTAssertNoThrow(try future.wait())

XCTAssertGreaterThan(invocationCount, 0, "should have stopped before any request made")
XCTAssertLessThan(invocationCount, maxTimes, "should have stopped before \(maxTimes)")
}

func testTimeout() {
Expand Down