Skip to content

Commit 1bd1d22

Browse files
committed
Feature/ff local server (#70)
* Don’t exit immediately * Removed locks. Just running in one EL
1 parent 9c172fe commit 1bd1d22

File tree

1 file changed

+68
-28
lines changed

1 file changed

+68
-28
lines changed

Sources/AWSLambdaRuntime/Lambda+LocalServer.swift

+68-28
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ private enum LocalLambda {
5959
var logger = Logger(label: "LocalLambdaServer")
6060
logger.logLevel = configuration.general.logLevel
6161
self.logger = logger
62-
self.group = MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount)
62+
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
6363
self.host = configuration.runtimeEngine.ip
6464
self.port = configuration.runtimeEngine.port
6565
self.invocationEndpoint = invocationEndpoint ?? "/invoke"
@@ -88,13 +88,20 @@ private enum LocalLambda {
8888
}
8989

9090
final class HTTPHandler: ChannelInboundHandler {
91+
92+
enum InvocationState {
93+
case waitingForNextRequest
94+
case idle(EventLoopPromise<Pending>)
95+
case processing(Pending)
96+
}
97+
9198
public typealias InboundIn = HTTPServerRequestPart
9299
public typealias OutboundOut = HTTPServerResponsePart
93100

94-
private static let queueLock = Lock()
95-
private static var queue = [String: Pending]()
96-
97101
private var processing = CircularBuffer<(head: HTTPRequestHead, body: ByteBuffer?)>()
102+
103+
private static var queue = [Pending]()
104+
private static var invocationState: InvocationState = .waitingForNextRequest
98105

99106
private let logger: Logger
100107
private let invocationEndpoint: String
@@ -137,43 +144,63 @@ private enum LocalLambda {
137144
self.writeResponse(context: context, response: .init(status: .internalServerError))
138145
}
139146
}
140-
Self.queueLock.withLock {
141-
Self.queue[requestId] = Pending(requestId: requestId, request: work, responsePromise: promise)
147+
let pending = Pending(requestId: requestId, request: work, responsePromise: promise)
148+
switch Self.invocationState {
149+
case .idle(let promise):
150+
promise.succeed(pending)
151+
case .processing(_), .waitingForNextRequest:
152+
Self.queue.append(pending)
142153
}
143154
}
144155
} else if request.head.uri.hasSuffix("/next") {
145-
switch (Self.queueLock.withLock { Self.queue.popFirst() }) {
156+
// check if our server is in the correct state
157+
guard case .waitingForNextRequest = Self.invocationState else {
158+
#warning("better error code?!")
159+
self.writeResponse(context: context, response: .init(status: .conflict))
160+
return
161+
}
162+
163+
// pop the first task from the queue
164+
switch !Self.queue.isEmpty ? Self.queue.removeFirst() : nil {
146165
case .none:
147-
self.writeResponse(context: context, response: .init(status: .noContent))
148-
case .some(let pending):
149-
var response = Response()
150-
response.body = pending.value.request
151-
// required headers
152-
response.headers = [
153-
(AmazonHeaders.requestID, pending.key),
154-
(AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"),
155-
(AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"),
156-
(AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"),
157-
]
158-
Self.queueLock.withLock {
159-
Self.queue[pending.key] = pending.value
166+
// if there is nothing in the queue, create a promise that we can succeed,
167+
// when we get a new task
168+
let promise = context.eventLoop.makePromise(of: Pending.self)
169+
promise.futureResult.whenComplete { (result) in
170+
switch result {
171+
case .failure(let error):
172+
self.writeResponse(context: context, response: .init(status: .internalServerError))
173+
case .success(let pending):
174+
Self.invocationState = .processing(pending)
175+
self.writeResponse(context: context, response: pending.toResponse())
176+
}
160177
}
161-
self.writeResponse(context: context, response: response)
178+
Self.invocationState = .idle(promise)
179+
case .some(let pending):
180+
// if there is a task pending, we can immediatly respond with it.
181+
Self.invocationState = .processing(pending)
182+
self.writeResponse(context: context, response: pending.toResponse())
162183
}
163184

164185
} else if request.head.uri.hasSuffix("/response") {
165186
let parts = request.head.uri.split(separator: "/")
166187
guard let requestId = parts.count > 2 ? String(parts[parts.count - 2]) : nil else {
188+
// the request is malformed, since we were expecting a requestId in the path
167189
return self.writeResponse(context: context, response: .init(status: .badRequest))
168190
}
169-
switch (Self.queueLock.withLock { Self.queue[requestId] }) {
170-
case .none:
171-
self.writeResponse(context: context, response: .init(status: .badRequest))
172-
case .some(let pending):
173-
pending.responsePromise.succeed(.init(status: .ok, body: request.body))
174-
self.writeResponse(context: context, response: .init(status: .accepted))
175-
Self.queueLock.withLock { Self.queue[requestId] = nil }
191+
guard case .processing(let pending) = Self.invocationState else {
192+
// a response was send, but we did not expect to receive one
193+
#warning("better error code?!")
194+
return self.writeResponse(context: context, response: .init(status: .conflict))
176195
}
196+
guard requestId == pending.requestId else {
197+
// the request's requestId is not matching the one we are expecting
198+
return self.writeResponse(context: context, response: .init(status: .badRequest))
199+
}
200+
201+
pending.responsePromise.succeed(.init(status: .ok, body: request.body))
202+
self.writeResponse(context: context, response: .init(status: .accepted))
203+
Self.invocationState = .waitingForNextRequest
177204
} else {
178205
self.writeResponse(context: context, response: .init(status: .notFound))
179206
}
@@ -211,6 +238,19 @@ private enum LocalLambda {
211238
let requestId: String
212239
let request: ByteBuffer
213240
let responsePromise: EventLoopPromise<Response>
241+
242+
func toResponse() -> Response {
243+
var response = Response()
244+
response.body = self.request
245+
// required headers
246+
response.headers = [
247+
(AmazonHeaders.requestID, self.requestId),
248+
(AmazonHeaders.invokedFunctionARN, "arn:aws:lambda:us-east-1:\(Int16.random(in: Int16.min ... Int16.max)):function:custom-runtime"),
249+
(AmazonHeaders.traceID, "Root=\(Int16.random(in: Int16.min ... Int16.max));Parent=\(Int16.random(in: Int16.min ... Int16.max));Sampled=1"),
250+
(AmazonHeaders.deadline, "\(DispatchWallTime.distantFuture.millisSinceEpoch)"),
251+
]
252+
return response
253+
}
214254
}
215255
}
216256

0 commit comments

Comments
 (0)