diff --git a/Sources/AWSLambdaRuntimeCore/LambdaContext.swift b/Sources/AWSLambdaRuntimeCore/LambdaContext.swift index a5ce528a..ab30dd7b 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaContext.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaContext.swift @@ -122,3 +122,27 @@ extension Lambda { } } } + +// MARK: - ShutdownContext + +extension Lambda { + /// Lambda runtime shutdown context. + /// The Lambda runtime generates and passes the `ShutdownContext` to the Lambda handler as an argument. + public final class ShutdownContext { + /// `Logger` to log with + /// + /// - note: The `LogLevel` can be configured using the `LOG_LEVEL` environment variable. + public let logger: Logger + + /// The `EventLoop` the Lambda is executed on. Use this to schedule work with. + /// + /// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care. + /// Most importantly the `EventLoop` must never be blocked. + public let eventLoop: EventLoop + + internal init(logger: Logger, eventLoop: EventLoop) { + self.eventLoop = eventLoop + self.logger = logger + } + } +} diff --git a/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift b/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift index f14fb787..f7559218 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaHandler.swift @@ -164,6 +164,19 @@ public protocol ByteBufferLambdaHandler { /// - Returns: An `EventLoopFuture` to report the result of the Lambda back to the runtime engine. /// The `EventLoopFuture` should be completed with either a response encoded as `ByteBuffer` or an `Error` func handle(context: Lambda.Context, event: ByteBuffer) -> EventLoopFuture + + /// The method to clean up your resources. + /// Concrete Lambda handlers implement this method to shutdown their `HTTPClient`s and database connections. + /// + /// - Note: In case your Lambda fails while creating your LambdaHandler in the `HandlerFactory`, this method + /// **is not invoked**. In this case you must cleanup the created resources immediately in the `HandlerFactory`. + func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture +} + +public extension ByteBufferLambdaHandler { + func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture { + context.eventLoop.makeSucceededFuture(Void()) + } } private enum CodecError: Error { diff --git a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift index 0c96a05f..ec609901 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift @@ -29,7 +29,7 @@ extension Lambda { private var state = State.idle { willSet { - assert(self.eventLoop.inEventLoop, "State may only be changed on the `Lifecycle`'s `eventLoop`") + self.eventLoop.assertInEventLoop() precondition(newValue.order > self.state.order, "invalid state \(newValue) after \(self.state.order)") } } @@ -71,22 +71,43 @@ extension Lambda { /// /// - note: This method must be called on the `EventLoop` the `Lifecycle` has been initialized with. public func start() -> EventLoopFuture { - assert(self.eventLoop.inEventLoop, "Start must be called on the `EventLoop` the `Lifecycle` has been initialized with.") + self.eventLoop.assertInEventLoop() logger.info("lambda lifecycle starting with \(self.configuration)") self.state = .initializing - // triggered when the Lambda has finished its last run - let finishedPromise = self.eventLoop.makePromise(of: Int.self) - finishedPromise.futureResult.always { _ in - self.markShutdown() - }.cascade(to: self.shutdownPromise) + var logger = self.logger logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id) let runner = Runner(eventLoop: self.eventLoop, configuration: self.configuration) - return runner.initialize(logger: logger, factory: self.factory).map { handler in + + let startupFuture = runner.initialize(logger: logger, factory: self.factory) + startupFuture.flatMap { handler -> EventLoopFuture<(ByteBufferLambdaHandler, Result)> in + // after the startup future has succeeded, we have a handler that we can use + // to `run` the lambda. + let finishedPromise = self.eventLoop.makePromise(of: Int.self) self.state = .active(runner, handler) self.run(promise: finishedPromise) + return finishedPromise.futureResult.mapResult { (handler, $0) } } + .flatMap { (handler, runnerResult) -> EventLoopFuture in + // after the lambda finishPromise has succeeded or failed we need to + // shutdown the handler + let shutdownContext = ShutdownContext(logger: logger, eventLoop: self.eventLoop) + return handler.shutdown(context: shutdownContext).flatMapErrorThrowing { error in + // if, we had an error shuting down the lambda, we want to concatenate it with + // the runner result + logger.error("Error shutting down handler: \(error)") + throw RuntimeError.shutdownError(shutdownError: error, runnerResult: runnerResult) + }.flatMapResult { (_) -> Result in + // we had no error shutting down the lambda. let's return the runner's result + runnerResult + } + }.always { _ in + // triggered when the Lambda has finished its last run or has a startup failure. + self.markShutdown() + }.cascade(to: self.shutdownPromise) + + return startupFuture.map { _ in } } // MARK: - Private diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift index 08aacd75..8fc22de3 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRunner.swift @@ -115,7 +115,7 @@ private extension Lambda.Context { } // TODO: move to nio? -private extension EventLoopFuture { +extension EventLoopFuture { // callback does not have side effects, failing with original result func peekError(_ callback: @escaping (Error) -> Void) -> EventLoopFuture { self.flatMapError { error in diff --git a/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift b/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift index fe43ac0a..5e9e6aea 100644 --- a/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift +++ b/Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift @@ -133,6 +133,7 @@ internal extension Lambda { case invocationMissingHeader(String) case noBody case json(Error) + case shutdownError(shutdownError: Error, runnerResult: Result) } } diff --git a/Tests/AWSLambdaRuntimeCoreTests/LambdaLifecycleTest.swift b/Tests/AWSLambdaRuntimeCoreTests/LambdaLifecycleTest.swift new file mode 100644 index 00000000..a485530d --- /dev/null +++ b/Tests/AWSLambdaRuntimeCoreTests/LambdaLifecycleTest.swift @@ -0,0 +1,142 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@testable import AWSLambdaRuntimeCore +import Logging +import NIO +import NIOHTTP1 +import XCTest + +class LambdaLifecycleTest: XCTestCase { + func testShutdownFutureIsFulfilledWithStartUpError() { + let server = MockLambdaServer(behavior: FailedBootstrapBehavior()) + XCTAssertNoThrow(try server.start().wait()) + defer { XCTAssertNoThrow(try server.stop().wait()) } + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + let eventLoop = eventLoopGroup.next() + let logger = Logger(label: "TestLogger") + let testError = TestError("kaboom") + let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, logger: logger, factory: { + $0.eventLoop.makeFailedFuture(testError) + }) + + // eventLoop.submit in this case returns an EventLoopFuture> + // which is why we need `wait().wait()` + XCTAssertThrowsError(_ = try eventLoop.flatSubmit { lifecycle.start() }.wait()) { error in + XCTAssertEqual(testError, error as? TestError) + } + + XCTAssertThrowsError(_ = try lifecycle.shutdownFuture.wait()) { error in + XCTAssertEqual(testError, error as? TestError) + } + } + + struct CallbackLambdaHandler: ByteBufferLambdaHandler { + let handler: (Lambda.Context, ByteBuffer) -> (EventLoopFuture) + let shutdown: (Lambda.ShutdownContext) -> EventLoopFuture + + init(_ handler: @escaping (Lambda.Context, ByteBuffer) -> (EventLoopFuture), shutdown: @escaping (Lambda.ShutdownContext) -> EventLoopFuture) { + self.handler = handler + self.shutdown = shutdown + } + + func handle(context: Lambda.Context, event: ByteBuffer) -> EventLoopFuture { + self.handler(context, event) + } + + func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture { + self.shutdown(context) + } + } + + func testShutdownIsCalledWhenLambdaShutsdown() { + let server = MockLambdaServer(behavior: BadBehavior()) + XCTAssertNoThrow(try server.start().wait()) + defer { XCTAssertNoThrow(try server.stop().wait()) } + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + var count = 0 + let handler = CallbackLambdaHandler({ XCTFail("Should not be reached"); return $0.eventLoop.makeSucceededFuture($1) }) { context in + count += 1 + return context.eventLoop.makeSucceededFuture(Void()) + } + + let eventLoop = eventLoopGroup.next() + let logger = Logger(label: "TestLogger") + let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, logger: logger, factory: { + $0.eventLoop.makeSucceededFuture(handler) + }) + + XCTAssertNoThrow(_ = try eventLoop.flatSubmit { lifecycle.start() }.wait()) + XCTAssertThrowsError(_ = try lifecycle.shutdownFuture.wait()) { error in + XCTAssertEqual(.badStatusCode(HTTPResponseStatus.internalServerError), error as? Lambda.RuntimeError) + } + XCTAssertEqual(count, 1) + } + + func testLambdaResultIfShutsdownIsUnclean() { + let server = MockLambdaServer(behavior: BadBehavior()) + XCTAssertNoThrow(try server.start().wait()) + defer { XCTAssertNoThrow(try server.stop().wait()) } + let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) + defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } + + var count = 0 + let handler = CallbackLambdaHandler({ XCTFail("Should not be reached"); return $0.eventLoop.makeSucceededFuture($1) }) { context in + count += 1 + return context.eventLoop.makeFailedFuture(TestError("kaboom")) + } + + let eventLoop = eventLoopGroup.next() + let logger = Logger(label: "TestLogger") + let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, logger: logger, factory: { + $0.eventLoop.makeSucceededFuture(handler) + }) + + XCTAssertNoThrow(_ = try eventLoop.flatSubmit { lifecycle.start() }.wait()) + XCTAssertThrowsError(_ = try lifecycle.shutdownFuture.wait()) { error in + guard case Lambda.RuntimeError.shutdownError(let shutdownError, .failure(let runtimeError)) = error else { + XCTFail("Unexpected error"); return + } + + XCTAssertEqual(shutdownError as? TestError, TestError("kaboom")) + XCTAssertEqual(runtimeError as? Lambda.RuntimeError, .badStatusCode(.internalServerError)) + } + XCTAssertEqual(count, 1) + } +} + +struct BadBehavior: LambdaServerBehavior { + func getInvocation() -> GetInvocationResult { + .failure(.internalServerError) + } + + func processResponse(requestId: String, response: String?) -> Result { + XCTFail("should not report a response") + return .failure(.internalServerError) + } + + func processError(requestId: String, error: ErrorResponse) -> Result { + XCTFail("should not report an error") + return .failure(.internalServerError) + } + + func processInitError(error: ErrorResponse) -> Result { + XCTFail("should not report an error") + return .failure(.internalServerError) + } +}