Skip to content

Added shutdown() -> EventLoopFuture<Void> to the ByteBufferLambdaHandler #122

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Sources/AWSLambdaRuntimeCore/LambdaContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,27 @@ extension Lambda {
}
}
}

// MARK: - ShutdownContext

extension Lambda {
/// Lambda runtime shutdown context.
/// The Lambda runtime generates and passes the `ShutdownContext` to the Lambda handler as an argument.
public final class ShutdownContext {
/// `Logger` to log with
///
/// - note: The `LogLevel` can be configured using the `LOG_LEVEL` environment variable.
public let logger: Logger

/// The `EventLoop` the Lambda is executed on. Use this to schedule work with.
///
/// - note: The `EventLoop` is shared with the Lambda runtime engine and should be handled with extra care.
/// Most importantly the `EventLoop` must never be blocked.
public let eventLoop: EventLoop

internal init(logger: Logger, eventLoop: EventLoop) {
self.eventLoop = eventLoop
self.logger = logger
}
}
}
13 changes: 13 additions & 0 deletions Sources/AWSLambdaRuntimeCore/LambdaHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,19 @@ public protocol ByteBufferLambdaHandler {
/// - Returns: An `EventLoopFuture` to report the result of the Lambda back to the runtime engine.
/// The `EventLoopFuture` should be completed with either a response encoded as `ByteBuffer` or an `Error`
func handle(context: Lambda.Context, event: ByteBuffer) -> EventLoopFuture<ByteBuffer?>

/// The method to clean up your resources.
/// Concrete Lambda handlers implement this method to shutdown their `HTTPClient`s and database connections.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth adding a mention about if your create/init future failed then this would not be invoked so take special case about any resources that need to be torn down there (as we discussed that case on chat)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ktoso fixed below

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good

///
/// - Note: In case your Lambda fails while creating your LambdaHandler in the `HandlerFactory`, this method
/// **is not invoked**. In this case you must cleanup the created resources immediately in the `HandlerFactory`.
func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this invoked once per lambda invocation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no. only in debug mode for shutdown. or if the lambda occurs an recoverable error. and the lambda is therefore closed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fabianfett ah right. In that case, couldn't we run it synchronously after MTELG.withCallingThreadAsEventLoop returns?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah no, doesn't make sense. Shutdown probably needs the EL :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore my comments. This looks good.

}

public extension ByteBufferLambdaHandler {
func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void> {
context.eventLoop.makeSucceededFuture(Void())
}
}

private enum CodecError: Error {
Expand Down
37 changes: 29 additions & 8 deletions Sources/AWSLambdaRuntimeCore/LambdaLifecycle.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ extension Lambda {

private var state = State.idle {
willSet {
assert(self.eventLoop.inEventLoop, "State may only be changed on the `Lifecycle`'s `eventLoop`")
self.eventLoop.assertInEventLoop()
precondition(newValue.order > self.state.order, "invalid state \(newValue) after \(self.state.order)")
}
}
Expand Down Expand Up @@ -71,22 +71,43 @@ extension Lambda {
///
/// - note: This method must be called on the `EventLoop` the `Lifecycle` has been initialized with.
public func start() -> EventLoopFuture<Void> {
assert(self.eventLoop.inEventLoop, "Start must be called on the `EventLoop` the `Lifecycle` has been initialized with.")
self.eventLoop.assertInEventLoop()

logger.info("lambda lifecycle starting with \(self.configuration)")
self.state = .initializing
// triggered when the Lambda has finished its last run
let finishedPromise = self.eventLoop.makePromise(of: Int.self)
finishedPromise.futureResult.always { _ in
self.markShutdown()
}.cascade(to: self.shutdownPromise)

var logger = self.logger
logger[metadataKey: "lifecycleId"] = .string(self.configuration.lifecycle.id)
let runner = Runner(eventLoop: self.eventLoop, configuration: self.configuration)
return runner.initialize(logger: logger, factory: self.factory).map { handler in

let startupFuture = runner.initialize(logger: logger, factory: self.factory)
startupFuture.flatMap { handler -> EventLoopFuture<(ByteBufferLambdaHandler, Result<Int, Error>)> in
// after the startup future has succeeded, we have a handler that we can use
// to `run` the lambda.
let finishedPromise = self.eventLoop.makePromise(of: Int.self)
self.state = .active(runner, handler)
self.run(promise: finishedPromise)
return finishedPromise.futureResult.mapResult { (handler, $0) }
}
.flatMap { (handler, runnerResult) -> EventLoopFuture<Int> in
// after the lambda finishPromise has succeeded or failed we need to
// shutdown the handler
let shutdownContext = ShutdownContext(logger: logger, eventLoop: self.eventLoop)
return handler.shutdown(context: shutdownContext).flatMapErrorThrowing { error in
// if, we had an error shuting down the lambda, we want to concatenate it with
// the runner result
logger.error("Error shutting down handler: \(error)")
Copy link
Contributor

@tomerd tomerd Jun 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a blocker, since this is debug mode only, but ideally the shutdown error would bubble into the shutdownPromise instead of being logged

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tomerd I hope I fixed this with my last push.

throw RuntimeError.shutdownError(shutdownError: error, runnerResult: runnerResult)
}.flatMapResult { (_) -> Result<Int, Error> in
// we had no error shutting down the lambda. let's return the runner's result
runnerResult
}
}.always { _ in
// triggered when the Lambda has finished its last run or has a startup failure.
self.markShutdown()
}.cascade(to: self.shutdownPromise)

return startupFuture.map { _ in }
}

// MARK: - Private
Expand Down
2 changes: 1 addition & 1 deletion Sources/AWSLambdaRuntimeCore/LambdaRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private extension Lambda.Context {
}

// TODO: move to nio?
private extension EventLoopFuture {
extension EventLoopFuture {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change leftover from previous iteration?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no it's needed, since I use mapResult in another file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also: Please don't use any access qualifiers (especially not public) in front of extension. It makes code reviews really difficult because a func foo() {} can be public if maybe 100 lines above there's a public extension XYZ. I think all qualifiers should be banned in front of extension

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope there's some linter for this perhaps? Maybe time to look around for one hm

// callback does not have side effects, failing with original result
func peekError(_ callback: @escaping (Error) -> Void) -> EventLoopFuture<Value> {
self.flatMapError { error in
Expand Down
1 change: 1 addition & 0 deletions Sources/AWSLambdaRuntimeCore/LambdaRuntimeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ internal extension Lambda {
case invocationMissingHeader(String)
case noBody
case json(Error)
case shutdownError(shutdownError: Error, runnerResult: Result<Int, Error>)
}
}

Expand Down
142 changes: 142 additions & 0 deletions Tests/AWSLambdaRuntimeCoreTests/LambdaLifecycleTest.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2017-2018 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

@testable import AWSLambdaRuntimeCore
import Logging
import NIO
import NIOHTTP1
import XCTest

class LambdaLifecycleTest: XCTestCase {
func testShutdownFutureIsFulfilledWithStartUpError() {
let server = MockLambdaServer(behavior: FailedBootstrapBehavior())
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

let eventLoop = eventLoopGroup.next()
let logger = Logger(label: "TestLogger")
let testError = TestError("kaboom")
let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, logger: logger, factory: {
$0.eventLoop.makeFailedFuture(testError)
})

// eventLoop.submit in this case returns an EventLoopFuture<EventLoopFuture<ByteBufferHandler>>
// which is why we need `wait().wait()`
XCTAssertThrowsError(_ = try eventLoop.flatSubmit { lifecycle.start() }.wait()) { error in
XCTAssertEqual(testError, error as? TestError)
}

XCTAssertThrowsError(_ = try lifecycle.shutdownFuture.wait()) { error in
XCTAssertEqual(testError, error as? TestError)
}
}

struct CallbackLambdaHandler: ByteBufferLambdaHandler {
let handler: (Lambda.Context, ByteBuffer) -> (EventLoopFuture<ByteBuffer?>)
let shutdown: (Lambda.ShutdownContext) -> EventLoopFuture<Void>

init(_ handler: @escaping (Lambda.Context, ByteBuffer) -> (EventLoopFuture<ByteBuffer?>), shutdown: @escaping (Lambda.ShutdownContext) -> EventLoopFuture<Void>) {
self.handler = handler
self.shutdown = shutdown
}

func handle(context: Lambda.Context, event: ByteBuffer) -> EventLoopFuture<ByteBuffer?> {
self.handler(context, event)
}

func shutdown(context: Lambda.ShutdownContext) -> EventLoopFuture<Void> {
self.shutdown(context)
}
}

func testShutdownIsCalledWhenLambdaShutsdown() {
let server = MockLambdaServer(behavior: BadBehavior())
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

var count = 0
let handler = CallbackLambdaHandler({ XCTFail("Should not be reached"); return $0.eventLoop.makeSucceededFuture($1) }) { context in
count += 1
return context.eventLoop.makeSucceededFuture(Void())
}

let eventLoop = eventLoopGroup.next()
let logger = Logger(label: "TestLogger")
let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, logger: logger, factory: {
$0.eventLoop.makeSucceededFuture(handler)
})

XCTAssertNoThrow(_ = try eventLoop.flatSubmit { lifecycle.start() }.wait())
XCTAssertThrowsError(_ = try lifecycle.shutdownFuture.wait()) { error in
XCTAssertEqual(.badStatusCode(HTTPResponseStatus.internalServerError), error as? Lambda.RuntimeError)
}
XCTAssertEqual(count, 1)
}

func testLambdaResultIfShutsdownIsUnclean() {
let server = MockLambdaServer(behavior: BadBehavior())
XCTAssertNoThrow(try server.start().wait())
defer { XCTAssertNoThrow(try server.stop().wait()) }
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }

var count = 0
let handler = CallbackLambdaHandler({ XCTFail("Should not be reached"); return $0.eventLoop.makeSucceededFuture($1) }) { context in
count += 1
return context.eventLoop.makeFailedFuture(TestError("kaboom"))
}

let eventLoop = eventLoopGroup.next()
let logger = Logger(label: "TestLogger")
let lifecycle = Lambda.Lifecycle(eventLoop: eventLoop, logger: logger, factory: {
$0.eventLoop.makeSucceededFuture(handler)
})

XCTAssertNoThrow(_ = try eventLoop.flatSubmit { lifecycle.start() }.wait())
XCTAssertThrowsError(_ = try lifecycle.shutdownFuture.wait()) { error in
guard case Lambda.RuntimeError.shutdownError(let shutdownError, .failure(let runtimeError)) = error else {
XCTFail("Unexpected error"); return
}

XCTAssertEqual(shutdownError as? TestError, TestError("kaboom"))
XCTAssertEqual(runtimeError as? Lambda.RuntimeError, .badStatusCode(.internalServerError))
}
XCTAssertEqual(count, 1)
}
}

struct BadBehavior: LambdaServerBehavior {
func getInvocation() -> GetInvocationResult {
.failure(.internalServerError)
}

func processResponse(requestId: String, response: String?) -> Result<Void, ProcessResponseError> {
XCTFail("should not report a response")
return .failure(.internalServerError)
}

func processError(requestId: String, error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report an error")
return .failure(.internalServerError)
}

func processInitError(error: ErrorResponse) -> Result<Void, ProcessErrorError> {
XCTFail("should not report an error")
return .failure(.internalServerError)
}
}