Skip to content

Perf #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .swiftformat
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@
--patternlet inline
--stripunusedargs unnamed-only
--comments ignore
--ifdef no-indent

# rules
22 changes: 10 additions & 12 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,24 @@

import PackageDescription

var targets: [PackageDescription.Target] = [
.target(name: "SwiftAwsLambda", dependencies: ["Logging", "Backtrace", "NIOHTTP1"]),
.target(name: "SwiftAwsLambdaSample", dependencies: ["SwiftAwsLambda"]),
.target(name: "SwiftAwsLambdaStringSample", dependencies: ["SwiftAwsLambda"]),
.target(name: "SwiftAwsLambdaCodableSample", dependencies: ["SwiftAwsLambda"]),
.testTarget(name: "SwiftAwsLambdaTests", dependencies: ["SwiftAwsLambda"]),
]

let package = Package(
name: "swift-aws-lambda",
products: [
.library(name: "SwiftAwsLambda", targets: ["SwiftAwsLambda"]),
.executable(name: "SwiftAwsLambdaSample", targets: ["SwiftAwsLambdaSample"]),
.executable(name: "SwiftAwsLambdaStringSample", targets: ["SwiftAwsLambdaStringSample"]),
.executable(name: "SwiftAwsLambdaCodableSample", targets: ["SwiftAwsLambdaCodableSample"]),
],
dependencies: [
.package(url: "https://github.com/apple/swift-nio.git", from: "2.8.0"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
.package(url: "https://github.com/ianpartridge/swift-backtrace.git", from: "1.1.0"),
],
targets: targets
targets: [
.target(name: "SwiftAwsLambda", dependencies: ["Logging", "Backtrace", "NIOHTTP1"]),
.testTarget(name: "SwiftAwsLambdaTests", dependencies: ["SwiftAwsLambda"]),
// samples
.target(name: "SwiftAwsLambdaSample", dependencies: ["SwiftAwsLambda"]),
.target(name: "SwiftAwsLambdaStringSample", dependencies: ["SwiftAwsLambda"]),
.target(name: "SwiftAwsLambdaCodableSample", dependencies: ["SwiftAwsLambda"]),
// perf tests
.target(name: "MockServer", dependencies: ["Logging", "NIOHTTP1"]),
]
)
164 changes: 164 additions & 0 deletions Sources/MockServer/main.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import Foundation
import Logging
import NIO
import NIOHTTP1

internal struct MockServer {
private let logger: Logger
private let group: EventLoopGroup
private let host: String
private let port: Int
private let mode: Mode
private let keepAlive: Bool

public init() {
var logger = Logger(label: "MockServer")
logger.logLevel = env("LOG_LEVEL").flatMap(Logger.Level.init) ?? .info
self.logger = logger
self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
self.host = env("HOST") ?? "127.0.0.1"
self.port = env("PORT").flatMap(Int.init) ?? 7000
self.mode = env("MODE").flatMap(Mode.init) ?? .string
self.keepAlive = env("KEEP_ALIVE").flatMap(Bool.init) ?? true
}

func start() throws {
let bootstrap = ServerBootstrap(group: group)
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelInitializer { channel in
channel.pipeline.configureHTTPServerPipeline(withErrorHandling: true).flatMap { _ in
channel.pipeline.addHandler(HTTPHandler(logger: self.logger,
keepAlive: self.keepAlive,
mode: self.mode))
}
}
try bootstrap.bind(host: self.host, port: self.port).flatMap { channel -> EventLoopFuture<Void> in
guard let localAddress = channel.localAddress else {
return channel.eventLoop.makeFailedFuture(ServerError.cantBind)
}
self.logger.info("\(self) started and listening on \(localAddress)")
return channel.eventLoop.makeSucceededFuture(())
}.wait()
}
}

internal final class HTTPHandler: ChannelInboundHandler {
public typealias InboundIn = HTTPServerRequestPart
public typealias OutboundOut = HTTPServerResponsePart

private let logger: Logger
private let mode: Mode
private let keepAlive: Bool

private var requestHead: HTTPRequestHead!
private var requestBody: ByteBuffer?

public init(logger: Logger, keepAlive: Bool, mode: Mode) {
self.logger = logger
self.mode = mode
self.keepAlive = keepAlive
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let requestPart = unwrapInboundIn(data)

switch requestPart {
case .head(let head):
self.requestHead = head
self.requestBody?.clear()
case .body(var buffer):
if self.requestBody == nil {
self.requestBody = context.channel.allocator.buffer(capacity: buffer.readableBytes)
}
self.requestBody!.writeBuffer(&buffer)
case .end:
self.processRequest(context: context)
}
}

func processRequest(context: ChannelHandlerContext) {
self.logger.debug("\(self) processing \(self.requestHead.uri)")

var responseStatus: HTTPResponseStatus
var responseBody: String?
var responseHeaders: [(String, String)]?

if self.requestHead.uri.hasSuffix("/next") {
let requestId = UUID().uuidString
responseStatus = .ok
switch self.mode {
case .string:
responseBody = requestId
case .json:
responseBody = "{ \"body\": \"\(requestId)\" }"
}
responseHeaders = [(AmazonHeaders.requestID, requestId)]
} else if self.requestHead.uri.hasSuffix("/response") {
responseStatus = .accepted
} else {
responseStatus = .notFound
}
self.writeResponse(context: context, status: responseStatus, headers: responseHeaders, body: responseBody)
}

func writeResponse(context: ChannelHandlerContext, status: HTTPResponseStatus, headers: [(String, String)]? = nil, body: String? = nil) {
var headers = HTTPHeaders(headers ?? [])
headers.add(name: "Content-Length", value: "\(body?.utf8.count ?? 0)")
headers.add(name: "Connection", value: self.keepAlive ? "keep-alive" : "close")
let head = HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: status, headers: headers)

context.write(wrapOutboundOut(.head(head))).whenFailure { error in
self.logger.error("\(self) write error \(error)")
}

if let b = body {
var buffer = context.channel.allocator.buffer(capacity: b.utf8.count)
buffer.writeString(b)
context.write(wrapOutboundOut(.body(.byteBuffer(buffer)))).whenFailure { error in
self.logger.error("\(self) write error \(error)")
}
}

context.writeAndFlush(wrapOutboundOut(.end(nil))).whenComplete { result in
if case .failure(let error) = result {
self.logger.error("\(self) write error \(error)")
}
if !self.self.keepAlive {
context.close().whenFailure { error in
self.logger.error("\(self) close error \(error)")
}
}
}
}
}

internal enum ServerError: Error {
case notReady
case cantBind
}

internal enum AmazonHeaders {
static let requestID = "Lambda-Runtime-Aws-Request-Id"
static let traceID = "Lambda-Runtime-Trace-Id"
static let clientContext = "X-Amz-Client-Context"
static let cognitoIdentity = "X-Amz-Cognito-Identity"
static let deadline = "Lambda-Runtime-Deadline-Ms"
static let invokedFunctionARN = "Lambda-Runtime-Invoked-Function-Arn"
}

internal enum Mode: String {
case string
case json
}

func env(_ name: String) -> String? {
guard let value = getenv(name) else {
return nil
}
return String(utf8String: value)
}

// main
let server = MockServer()
try! server.start()
dispatchMain()
42 changes: 19 additions & 23 deletions Sources/SwiftAwsLambda/HttpClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,61 +12,57 @@
//
//===----------------------------------------------------------------------===//

import Foundation
import NIO
import NIOConcurrencyHelpers
import NIOHTTP1

/// A barebone HTTP client to interact with AWS Runtime Engine which is an HTTP server.
internal class HTTPClient {
private let eventLoop: EventLoop
private let config: Lambda.Config.RuntimeEngine
private let configuration: Lambda.Configuration.RuntimeEngine

private var _state = State.disconnected
private var state = State.disconnected
private let lock = Lock()

init(eventLoop: EventLoop, config: Lambda.Config.RuntimeEngine) {
init(eventLoop: EventLoop, configuration: Lambda.Configuration.RuntimeEngine) {
self.eventLoop = eventLoop
self.config = config
self.configuration = configuration
}

private var state: State {
get {
return self.lock.withLock {
self._state
}
}
set {
self.lock.withLockVoid {
self._state = newValue
}
}
}

func get(url: String, timeout: TimeAmount? = nil) -> EventLoopFuture<Response> {
return self.execute(Request(url: self.config.baseURL.appendingPathComponent(url), method: .GET, timeout: timeout ?? self.config.requestTimeout))
return self.execute(Request(url: self.configuration.baseURL.appendingPathComponent(url),
method: .GET,
timeout: timeout ?? self.configuration.requestTimeout))
}

func post(url: String, body: ByteBuffer, timeout: TimeAmount? = nil) -> EventLoopFuture<Response> {
return self.execute(Request(url: self.config.baseURL.appendingPathComponent(url), method: .POST, body: body, timeout: timeout ?? self.config.requestTimeout))
return self.execute(Request(url: self.configuration.baseURL.appendingPathComponent(url),
method: .POST,
body: body,
timeout: timeout ?? self.configuration.requestTimeout))
}

private func execute(_ request: Request) -> EventLoopFuture<Response> {
self.lock.lock()
switch self.state {
case .connected(let channel):
guard channel.isActive else {
// attempt to reconnect
self.state = .disconnected
self.lock.unlock()
return self.execute(request)
}
self.lock.unlock()
let promise = channel.eventLoop.makePromise(of: Response.self)
let wrapper = HTTPRequestWrapper(request: request, promise: promise)
return channel.writeAndFlush(wrapper).flatMap {
promise.futureResult
}
case .disconnected:
return self.connect().flatMap {
self.execute(request)
self.lock.unlock()
return self.execute(request)
}
default:
preconditionFailure("invalid state \(self.state)")
Expand All @@ -81,11 +77,11 @@ internal class HTTPClient {
let bootstrap = ClientBootstrap(group: eventLoop)
.channelInitializer { channel in
channel.pipeline.addHTTPClientHandlers().flatMap {
channel.pipeline.addHandlers([HTTPHandler(keepAlive: self.config.keepAlive),
UnaryHandler(keepAlive: self.config.keepAlive)])
channel.pipeline.addHandlers([HTTPHandler(keepAlive: self.configuration.keepAlive),
UnaryHandler(keepAlive: self.configuration.keepAlive)])
}
}
return bootstrap.connect(host: self.config.baseURL.host, port: self.config.baseURL.port).flatMapThrowing { channel in
return bootstrap.connect(host: self.configuration.baseURL.host, port: self.configuration.baseURL.port).flatMapThrowing { channel in
self.state = .connected(channel)
}
}
Expand Down
13 changes: 7 additions & 6 deletions Sources/SwiftAwsLambda/Lambda+Codable.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//
//===----------------------------------------------------------------------===//

import Foundation
import Foundation // for JSON

/// Extension to the `Lambda` companion to enable execution of Lambdas that take and return `Codable` payloads.
/// This is the most common way to use this library in AWS Lambda, since its JSON based.
Expand All @@ -32,13 +32,13 @@ extension Lambda {
}

// for testing
internal static func run<In: Decodable, Out: Encodable>(maxTimes: Int = 0, closure: @escaping LambdaCodableClosure<In, Out>) -> LambdaLifecycleResult {
return self.run(handler: LambdaClosureWrapper(closure), maxTimes: maxTimes)
internal static func run<In: Decodable, Out: Encodable>(configuration: Configuration = .init(), closure: @escaping LambdaCodableClosure<In, Out>) -> LambdaLifecycleResult {
return self.run(handler: LambdaClosureWrapper(closure), configuration: configuration)
}

// for testing
internal static func run<Handler>(handler: Handler, maxTimes: Int = 0) -> LambdaLifecycleResult where Handler: LambdaCodableHandler {
return self.run(handler: handler as LambdaHandler, maxTimes: maxTimes)
internal static func run<Handler>(handler: Handler, configuration: Configuration = .init()) -> LambdaLifecycleResult where Handler: LambdaCodableHandler {
return self.run(handler: handler as LambdaHandler, configuration: configuration)
}
}

Expand Down Expand Up @@ -104,9 +104,10 @@ public extension LambdaCodableHandler {
/// LambdaCodableJsonCodec is an implementation of `LambdaCodableCodec` which does `Encodable` -> `[UInt8]` encoding and `[UInt8]` -> `Decodable' decoding
/// using JSONEncoder and JSONDecoder respectively.
// This is a class as encoder amd decoder are a class, which means its cheaper to hold a reference to both in a class then a struct.
private class LambdaCodableJsonCodec<In: Decodable, Out: Encodable>: LambdaCodableCodec<In, Out> {
private final class LambdaCodableJsonCodec<In: Decodable, Out: Encodable>: LambdaCodableCodec<In, Out> {
private let encoder = JSONEncoder()
private let decoder = JSONDecoder()

public override func encode(_ value: Out) -> Result<[UInt8], Error> {
do {
return .success(try [UInt8](self.encoder.encode(value)))
Expand Down
8 changes: 4 additions & 4 deletions Sources/SwiftAwsLambda/Lambda+String.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ extension Lambda {
}

// for testing
internal static func run(maxTimes: Int = 0, _ closure: @escaping LambdaStringClosure) -> LambdaLifecycleResult {
return self.run(handler: LambdaClosureWrapper(closure), maxTimes: maxTimes)
internal static func run(configuration: Configuration = .init(), _ closure: @escaping LambdaStringClosure) -> LambdaLifecycleResult {
return self.run(handler: LambdaClosureWrapper(closure), configuration: configuration)
}

// for testing
internal static func run(handler: LambdaStringHandler, maxTimes: Int = 0) -> LambdaLifecycleResult {
return self.run(handler: handler as LambdaHandler, maxTimes: maxTimes)
internal static func run(handler: LambdaStringHandler, configuration: Configuration = .init()) -> LambdaLifecycleResult {
return self.run(handler: handler as LambdaHandler, configuration: configuration)
}
}

Expand Down
Loading