@@ -59,7 +59,7 @@ private enum LocalLambda {
59
59
var logger = Logger ( label: " LocalLambdaServer " )
60
60
logger. logLevel = configuration. general. logLevel
61
61
self . logger = logger
62
- self . group = MultiThreadedEventLoopGroup ( numberOfThreads: System . coreCount )
62
+ self . group = MultiThreadedEventLoopGroup ( numberOfThreads: 1 )
63
63
self . host = configuration. runtimeEngine. ip
64
64
self . port = configuration. runtimeEngine. port
65
65
self . invocationEndpoint = invocationEndpoint ?? " /invoke "
@@ -88,13 +88,20 @@ private enum LocalLambda {
88
88
}
89
89
90
90
final class HTTPHandler : ChannelInboundHandler {
91
+
92
+ enum InvocationState {
93
+ case waitingForNextRequest
94
+ case idle( EventLoopPromise < Pending > )
95
+ case processing( Pending )
96
+ }
97
+
91
98
public typealias InboundIn = HTTPServerRequestPart
92
99
public typealias OutboundOut = HTTPServerResponsePart
93
100
94
- private static let queueLock = Lock ( )
95
- private static var queue = [ String: Pending] ( )
96
-
97
101
private var processing = CircularBuffer < ( head: HTTPRequestHead , body: ByteBuffer ? ) > ( )
102
+
103
+ private static var queue = [ Pending] ( )
104
+ private static var invocationState : InvocationState = . waitingForNextRequest
98
105
99
106
private let logger : Logger
100
107
private let invocationEndpoint : String
@@ -137,43 +144,63 @@ private enum LocalLambda {
137
144
self . writeResponse ( context: context, response: . init( status: . internalServerError) )
138
145
}
139
146
}
140
- Self . queueLock. withLock {
141
- Self . queue [ requestId] = Pending ( requestId: requestId, request: work, responsePromise: promise)
147
+ let pending = Pending ( requestId: requestId, request: work, responsePromise: promise)
148
+ switch Self . invocationState {
149
+ case . idle( let promise) :
150
+ promise. succeed ( pending)
151
+ case . processing( _) , . waitingForNextRequest:
152
+ Self . queue. append ( pending)
142
153
}
143
154
}
144
155
} else if request. head. uri. hasSuffix ( " /next " ) {
145
- switch ( Self . queueLock. withLock { Self . queue. popFirst ( ) } ) {
156
+ // check if our server is in the correct state
157
+ guard case . waitingForNextRequest = Self . invocationState else {
158
+ #warning("better error code?!")
159
+ self . writeResponse ( context: context, response: . init( status: . conflict) )
160
+ return
161
+ }
162
+
163
+ // pop the first task from the queue
164
+ switch !Self. queue. isEmpty ? Self . queue. removeFirst ( ) : nil {
146
165
case . none:
147
- self . writeResponse ( context: context, response: . init( status: . noContent) )
148
- case . some( let pending) :
149
- var response = Response ( )
150
- response. body = pending. value. request
151
- // required headers
152
- response. headers = [
153
- ( AmazonHeaders . requestID, pending. key) ,
154
- ( AmazonHeaders . invokedFunctionARN, " arn:aws:lambda:us-east-1: \( Int16 . random ( in: Int16 . min ... Int16 . max) ) :function:custom-runtime " ) ,
155
- ( AmazonHeaders . traceID, " Root= \( Int16 . random ( in: Int16 . min ... Int16 . max) ) ;Parent= \( Int16 . random ( in: Int16 . min ... Int16 . max) ) ;Sampled=1 " ) ,
156
- ( AmazonHeaders . deadline, " \( DispatchWallTime . distantFuture. millisSinceEpoch) " ) ,
157
- ]
158
- Self . queueLock. withLock {
159
- Self . queue [ pending. key] = pending. value
166
+ // if there is nothing in the queue, create a promise that we can succeed,
167
+ // when we get a new task
168
+ let promise = context. eventLoop. makePromise ( of: Pending . self)
169
+ promise. futureResult. whenComplete { ( result) in
170
+ switch result {
171
+ case . failure( let error) :
172
+ self . writeResponse ( context: context, response: . init( status: . internalServerError) )
173
+ case . success( let pending) :
174
+ Self . invocationState = . processing( pending)
175
+ self . writeResponse ( context: context, response: pending. toResponse ( ) )
176
+ }
160
177
}
161
- self . writeResponse ( context: context, response: response)
178
+ Self . invocationState = . idle( promise)
179
+ case . some( let pending) :
180
+ // if there is a task pending, we can immediatly respond with it.
181
+ Self . invocationState = . processing( pending)
182
+ self . writeResponse ( context: context, response: pending. toResponse ( ) )
162
183
}
163
184
164
185
} else if request. head. uri. hasSuffix ( " /response " ) {
165
186
let parts = request. head. uri. split ( separator: " / " )
166
187
guard let requestId = parts. count > 2 ? String ( parts [ parts. count - 2 ] ) : nil else {
188
+ // the request is malformed, since we were expecting a requestId in the path
167
189
return self . writeResponse ( context: context, response: . init( status: . badRequest) )
168
190
}
169
- switch ( Self . queueLock. withLock { Self . queue [ requestId] } ) {
170
- case . none:
171
- self . writeResponse ( context: context, response: . init( status: . badRequest) )
172
- case . some( let pending) :
173
- pending. responsePromise. succeed ( . init( status: . ok, body: request. body) )
174
- self . writeResponse ( context: context, response: . init( status: . accepted) )
175
- Self . queueLock. withLock { Self . queue [ requestId] = nil }
191
+ guard case . processing( let pending) = Self . invocationState else {
192
+ // a response was send, but we did not expect to receive one
193
+ #warning("better error code?!")
194
+ return self . writeResponse ( context: context, response: . init( status: . conflict) )
176
195
}
196
+ guard requestId == pending. requestId else {
197
+ // the request's requestId is not matching the one we are expecting
198
+ return self . writeResponse ( context: context, response: . init( status: . badRequest) )
199
+ }
200
+
201
+ pending. responsePromise. succeed ( . init( status: . ok, body: request. body) )
202
+ self . writeResponse ( context: context, response: . init( status: . accepted) )
203
+ Self . invocationState = . waitingForNextRequest
177
204
} else {
178
205
self . writeResponse ( context: context, response: . init( status: . notFound) )
179
206
}
@@ -211,6 +238,19 @@ private enum LocalLambda {
211
238
let requestId : String
212
239
let request : ByteBuffer
213
240
let responsePromise : EventLoopPromise < Response >
241
+
242
+ func toResponse( ) -> Response {
243
+ var response = Response ( )
244
+ response. body = self . request
245
+ // required headers
246
+ response. headers = [
247
+ ( AmazonHeaders . requestID, self . requestId) ,
248
+ ( AmazonHeaders . invokedFunctionARN, " arn:aws:lambda:us-east-1: \( Int16 . random ( in: Int16 . min ... Int16 . max) ) :function:custom-runtime " ) ,
249
+ ( AmazonHeaders . traceID, " Root= \( Int16 . random ( in: Int16 . min ... Int16 . max) ) ;Parent= \( Int16 . random ( in: Int16 . min ... Int16 . max) ) ;Sampled=1 " ) ,
250
+ ( AmazonHeaders . deadline, " \( DispatchWallTime . distantFuture. millisSinceEpoch) " ) ,
251
+ ]
252
+ return response
253
+ }
214
254
}
215
255
}
216
256
0 commit comments