12
12
//
13
13
//===----------------------------------------------------------------------===//
14
14
15
+ #if DEBUG
15
16
import Dispatch
16
17
import Logging
17
18
import NIO
@@ -26,8 +27,6 @@ import NIOHTTP1
26
27
// callback(.success("Hello, \(payload)!"))
27
28
// }
28
29
// }
29
-
30
- #if DEBUG
31
30
extension Lambda {
32
31
/// Execute code in the context of a mock Lambda server.
33
32
///
@@ -88,20 +87,13 @@ private enum LocalLambda {
88
87
}
89
88
90
89
final class HTTPHandler : ChannelInboundHandler {
91
-
92
- enum InvocationState {
93
- case waitingForNextRequest
94
- case idle( EventLoopPromise < Pending > )
95
- case processing( Pending )
96
- }
97
-
98
90
public typealias InboundIn = HTTPServerRequestPart
99
91
public typealias OutboundOut = HTTPServerResponsePart
100
92
101
- private var processing = CircularBuffer < ( head: HTTPRequestHead , body: ByteBuffer ? ) > ( )
102
-
103
- private static var queue = [ Pending ] ( )
104
- private static var invocationState : InvocationState = . waitingForNextRequest
93
+ private var pending = CircularBuffer < ( head: HTTPRequestHead , body: ByteBuffer ? ) > ( )
94
+
95
+ private static var invocations = CircularBuffer < Invocation > ( )
96
+ private static var invocationState = InvocationState . waitingForLambdaRequest
105
97
106
98
private let logger : Logger
107
99
private let invocationEndpoint : String
@@ -116,92 +108,100 @@ private enum LocalLambda {
116
108
117
109
switch requestPart {
118
110
case . head( let head) :
119
- self . processing . append ( ( head: head, body: nil ) )
111
+ self . pending . append ( ( head: head, body: nil ) )
120
112
case . body( var buffer) :
121
- var request = self . processing . removeFirst ( )
113
+ var request = self . pending . removeFirst ( )
122
114
if request. body == nil {
123
115
request. body = buffer
124
116
} else {
125
117
request. body!. writeBuffer ( & buffer)
126
118
}
127
- self . processing . prepend ( request)
119
+ self . pending . prepend ( request)
128
120
case . end:
129
- let request = self . processing . removeFirst ( )
121
+ let request = self . pending . removeFirst ( )
130
122
self . processRequest ( context: context, request: request)
131
123
}
132
124
}
133
125
134
126
func processRequest( context: ChannelHandlerContext , request: ( head: HTTPRequestHead , body: ByteBuffer ? ) ) {
135
- if request. head. uri. hasSuffix ( self . invocationEndpoint) {
136
- if let work = request. body {
137
- let requestId = " \( DispatchTime . now ( ) . uptimeNanoseconds) " // FIXME:
138
- let promise = context. eventLoop. makePromise ( of: Response . self)
139
- promise. futureResult. whenComplete { result in
140
- switch result {
141
- case . success( let response) :
142
- self . writeResponse ( context: context, response: response)
143
- case . failure:
144
- self . writeResponse ( context: context, response: . init( status: . internalServerError) )
145
- }
146
- }
147
- let pending = Pending ( requestId: requestId, request: work, responsePromise: promise)
148
- switch Self . invocationState {
149
- case . idle( let promise) :
150
- promise. succeed ( pending)
151
- case . processing( _) , . waitingForNextRequest:
152
- Self . queue. append ( pending)
127
+ switch ( request. head. method, request. head. uri) {
128
+ // this endpoint is called by the client invoking the lambda
129
+ case ( . POST, let url) where url. hasSuffix ( self . invocationEndpoint) :
130
+ guard let work = request. body else {
131
+ return self . writeResponse ( context: context, response: . init( status: . badRequest) )
132
+ }
133
+ let requestID = " \( DispatchTime . now ( ) . uptimeNanoseconds) " // FIXME:
134
+ let promise = context. eventLoop. makePromise ( of: Response . self)
135
+ promise. futureResult. whenComplete { result in
136
+ switch result {
137
+ case . failure( let error) :
138
+ self . logger. error ( " invocation error: \( error) " )
139
+ self . writeResponse ( context: context, response: . init( status: . internalServerError) )
140
+ case . success( let response) :
141
+ self . writeResponse ( context: context, response: response)
153
142
}
154
143
}
155
- } else if request. head. uri. hasSuffix ( " /next " ) {
144
+ let invocation = Invocation ( requestID: requestID, request: work, responsePromise: promise)
145
+ switch Self . invocationState {
146
+ case . waitingForInvocation( let promise) :
147
+ promise. succeed ( invocation)
148
+ case . waitingForLambdaRequest, . waitingForLambdaResponse:
149
+ Self . invocations. append ( invocation)
150
+ }
151
+ // /next endpoint is called by the lambda polling for work
152
+ case ( . GET, let url) where url. hasSuffix ( Consts . requestWorkURLSuffix) :
156
153
// check if our server is in the correct state
157
- guard case . waitingForNextRequest = Self . invocationState else {
158
- #warning("better error code?! ")
159
- self . writeResponse ( context: context, response: . init( status: . conflict ) )
154
+ guard case . waitingForLambdaRequest = Self . invocationState else {
155
+ self . logger . error ( " invalid invocation state \( Self . invocationState ) " )
156
+ self . writeResponse ( context: context, response: . init( status: . unprocessableEntity ) )
160
157
return
161
158
}
162
-
159
+
163
160
// pop the first task from the queue
164
- switch ! Self. queue . isEmpty ? Self . queue . removeFirst ( ) : nil {
161
+ switch Self . invocations . popFirst ( ) {
165
162
case . none:
166
- // if there is nothing in the queue, create a promise that we can succeed,
167
- // when we get a new task
168
- let promise = context. eventLoop. makePromise ( of: Pending . self)
169
- promise. futureResult. whenComplete { ( result) in
163
+ // if there is nothing in the queue,
164
+ // create a promise that we can fullfill when we get a new task
165
+ let promise = context. eventLoop. makePromise ( of: Invocation . self)
166
+ promise. futureResult. whenComplete { result in
170
167
switch result {
171
168
case . failure( let error) :
169
+ self . logger. error ( " invocation error: \( error) " )
172
170
self . writeResponse ( context: context, response: . init( status: . internalServerError) )
173
- case . success( let pending ) :
174
- Self . invocationState = . processing ( pending )
175
- self . writeResponse ( context: context, response: pending . toResponse ( ) )
171
+ case . success( let invocation ) :
172
+ Self . invocationState = . waitingForLambdaResponse ( invocation )
173
+ self . writeResponse ( context: context, response: invocation . makeResponse ( ) )
176
174
}
177
175
}
178
- Self . invocationState = . idle ( promise)
179
- case . some( let pending ) :
176
+ Self . invocationState = . waitingForInvocation ( promise)
177
+ case . some( let invocation ) :
180
178
// if there is a task pending, we can immediatly respond with it.
181
- Self . invocationState = . processing ( pending )
182
- self . writeResponse ( context: context, response: pending . toResponse ( ) )
179
+ Self . invocationState = . waitingForLambdaResponse ( invocation )
180
+ self . writeResponse ( context: context, response: invocation . makeResponse ( ) )
183
181
}
184
-
185
- } else if request . head . uri . hasSuffix ( " /response " ) {
182
+ // :requestID/response endpoint is called by the lambda posting the response
183
+ case ( . POST , let url ) where url . hasSuffix ( Consts . postResponseURLSuffix ) :
186
184
let parts = request. head. uri. split ( separator: " / " )
187
- guard let requestId = parts. count > 2 ? String ( parts [ parts. count - 2 ] ) : nil else {
185
+ guard let requestID = parts. count > 2 ? String ( parts [ parts. count - 2 ] ) : nil else {
188
186
// the request is malformed, since we were expecting a requestId in the path
189
187
return self . writeResponse ( context: context, response: . init( status: . badRequest) )
190
188
}
191
- guard case . processing ( let pending ) = Self . invocationState else {
189
+ guard case . waitingForLambdaResponse ( let invocation ) = Self . invocationState else {
192
190
// a response was send, but we did not expect to receive one
193
- #warning("better error code?! ")
194
- return self . writeResponse ( context: context, response: . init( status: . conflict ) )
191
+ self . logger . error ( " invalid invocation state \( Self . invocationState ) " )
192
+ return self . writeResponse ( context: context, response: . init( status: . unprocessableEntity ) )
195
193
}
196
- guard requestId == pending . requestId else {
194
+ guard requestID == invocation . requestID else {
197
195
// the request's requestId is not matching the one we are expecting
196
+ self . logger. error ( " invalid invocation state request ID \( requestID) does not match expected \( invocation. requestID) " )
198
197
return self . writeResponse ( context: context, response: . init( status: . badRequest) )
199
198
}
200
-
201
- pending . responsePromise. succeed ( . init( status: . ok, body: request. body) )
199
+
200
+ invocation . responsePromise. succeed ( . init( status: . ok, body: request. body) )
202
201
self . writeResponse ( context: context, response: . init( status: . accepted) )
203
- Self . invocationState = . waitingForNextRequest
204
- } else {
202
+ Self . invocationState = . waitingForLambdaRequest
203
+ // unknown call
204
+ default :
205
205
self . writeResponse ( context: context, response: . init( status: . notFound) )
206
206
}
207
207
}
@@ -234,24 +234,30 @@ private enum LocalLambda {
234
234
var body : ByteBuffer ?
235
235
}
236
236
237
- struct Pending {
238
- let requestId : String
237
+ struct Invocation {
238
+ let requestID : String
239
239
let request : ByteBuffer
240
240
let responsePromise : EventLoopPromise < Response >
241
-
242
- func toResponse ( ) -> Response {
241
+
242
+ func makeResponse ( ) -> Response {
243
243
var response = Response ( )
244
244
response. body = self . request
245
245
// required headers
246
246
response. headers = [
247
- ( AmazonHeaders . requestID, self . requestId ) ,
247
+ ( AmazonHeaders . requestID, self . requestID ) ,
248
248
( AmazonHeaders . invokedFunctionARN, " arn:aws:lambda:us-east-1: \( Int16 . random ( in: Int16 . min ... Int16 . max) ) :function:custom-runtime " ) ,
249
249
( AmazonHeaders . traceID, " Root= \( Int16 . random ( in: Int16 . min ... Int16 . max) ) ;Parent= \( Int16 . random ( in: Int16 . min ... Int16 . max) ) ;Sampled=1 " ) ,
250
250
( AmazonHeaders . deadline, " \( DispatchWallTime . distantFuture. millisSinceEpoch) " ) ,
251
251
]
252
252
return response
253
253
}
254
254
}
255
+
256
+ enum InvocationState {
257
+ case waitingForInvocation( EventLoopPromise < Invocation > )
258
+ case waitingForLambdaRequest
259
+ case waitingForLambdaResponse( Invocation )
260
+ }
255
261
}
256
262
257
263
enum ServerError : Error {
0 commit comments