Skip to content

Commit 7cfb222

Browse files
author
Miguel Varela Ramos
authored
Forward headers to AsyncAPI (#2329)
1 parent 4d34228 commit 7cfb222

File tree

7 files changed

+64
-41
lines changed

7 files changed

+64
-41
lines changed

pkg/async-gateway/endpoint.go

+2-8
Original file line numberDiff line numberDiff line change
@@ -50,20 +50,14 @@ func (e *Endpoint) CreateWorkload(w http.ResponseWriter, r *http.Request) {
5050
return
5151
}
5252

53-
contentType := r.Header.Get("Content-Type")
54-
if contentType == "" {
55-
respondPlainText(w, http.StatusBadRequest, "error: missing Content-Type key in request header")
56-
return
57-
}
58-
5953
body := r.Body
6054
defer func() {
6155
_ = r.Body.Close()
6256
}()
6357

64-
log := e.logger.With(zap.String("id", requestID), zap.String("contentType", contentType))
58+
log := e.logger.With(zap.String("id", requestID))
6559

66-
id, err := e.service.CreateWorkload(requestID, body, contentType)
60+
id, err := e.service.CreateWorkload(requestID, body, r.Header)
6761
if err != nil {
6862
respondPlainText(w, http.StatusInternalServerError, fmt.Sprintf("error: %v", err))
6963
logErrorWithTelemetry(log, errors.Wrap(err, "failed to create workload"))

pkg/async-gateway/service.go

+22-7
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,21 @@ limitations under the License.
1717
package gateway
1818

1919
import (
20+
"bytes"
2021
"encoding/json"
2122
"fmt"
2223
"io"
24+
"net/http"
2325
"strings"
2426

27+
"github.com/cortexlabs/cortex/pkg/lib/errors"
2528
"github.com/cortexlabs/cortex/pkg/types/async"
2629
"go.uber.org/zap"
2730
)
2831

2932
// Service provides an interface to the async-gateway business logic
3033
type Service interface {
31-
CreateWorkload(id string, payload io.Reader, contentType string) (string, error)
34+
CreateWorkload(id string, payload io.Reader, headers http.Header) (string, error)
3235
GetWorkload(id string) (GetWorkloadResponse, error)
3336
}
3437

@@ -52,25 +55,37 @@ func NewService(clusterUID, apiName string, queue Queue, storage Storage, logger
5255
}
5356

5457
// CreateWorkload enqueues an async workload request and uploads the request payload to S3
55-
func (s *service) CreateWorkload(id string, payload io.Reader, contentType string) (string, error) {
58+
func (s *service) CreateWorkload(id string, payload io.Reader, headers http.Header) (string, error) {
5659
prefix := async.StoragePath(s.clusterUID, s.apiName)
57-
log := s.logger.With(zap.String("id", id), zap.String("contentType", contentType))
60+
log := s.logger.With(zap.String("id", id))
61+
62+
buf := &bytes.Buffer{}
63+
if err := json.NewEncoder(buf).Encode(headers); err != nil {
64+
return "", errors.Wrap(err, "failed to dump headers")
65+
}
66+
67+
headersPath := async.HeadersPath(prefix, id)
68+
log.Debugw("uploading headers", zap.String("path", headersPath))
69+
if err := s.storage.Upload(headersPath, buf, "application/json"); err != nil {
70+
return "", errors.Wrap(err, "failed to upload headers")
71+
}
5872

73+
contentType := headers.Get("Content-Type")
5974
payloadPath := async.PayloadPath(prefix, id)
60-
log.Debug("uploading payload", zap.String("path", payloadPath))
75+
log.Debugw("uploading payload", zap.String("path", payloadPath))
6176
if err := s.storage.Upload(payloadPath, payload, contentType); err != nil {
62-
return "", err
77+
return "", errors.Wrap(err, "failed to upload payload")
6378
}
6479

6580
log.Debug("sending message to queue")
6681
if err := s.queue.SendMessage(id, id); err != nil {
67-
return "", err
82+
return "", errors.Wrap(err, "failed to send message to queue")
6883
}
6984

7085
statusPath := fmt.Sprintf("%s/%s/status/%s", prefix, id, async.StatusInQueue)
7186
log.Debug(fmt.Sprintf("setting status to %s", async.StatusInQueue))
7287
if err := s.storage.Upload(statusPath, strings.NewReader(""), "text/plain"); err != nil {
73-
return "", err
88+
return "", errors.Wrap(err, "failed to upload workload status")
7489
}
7590

7691
return id, nil

pkg/dequeuer/async_handler.go

+30-21
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,6 @@ type AsyncMessageHandlerConfig struct {
5555
TargetURL string
5656
}
5757

58-
type userPayload struct {
59-
Body io.ReadCloser
60-
ContentType string
61-
}
62-
6358
func NewAsyncMessageHandler(config AsyncMessageHandlerConfig, awsClient *awslib.Client, eventHandler RequestEventHandler, logger *zap.SugaredLogger) *AsyncMessageHandler {
6459
return &AsyncMessageHandler{
6560
config: config,
@@ -104,9 +99,21 @@ func (h *AsyncMessageHandler) handleMessage(requestID string) error {
10499
}
105100
return errors.Wrap(err, "failed to get payload")
106101
}
107-
defer h.deletePayload(requestID)
102+
defer func() {
103+
h.deletePayload(requestID)
104+
_ = payload.Close()
105+
}()
108106

109-
result, err := h.submitRequest(payload, requestID)
107+
headers, err := h.getHeaders(requestID)
108+
if err != nil {
109+
updateStatusErr := h.updateStatus(requestID, async.StatusFailed)
110+
if updateStatusErr != nil {
111+
h.log.Errorw("failed to update status after failure to get headers", "id", requestID, "error", updateStatusErr)
112+
}
113+
return errors.Wrap(err, "failed to get payload")
114+
}
115+
116+
result, err := h.submitRequest(payload, headers, requestID)
110117
if err != nil {
111118
h.log.Errorw("failed to submit request to user container", "id", requestID, "error", err)
112119
updateStatusErr := h.updateStatus(requestID, async.StatusFailed)
@@ -138,7 +145,7 @@ func (h *AsyncMessageHandler) updateStatus(requestID string, status async.Status
138145
return h.aws.UploadStringToS3("", h.config.Bucket, key)
139146
}
140147

141-
func (h *AsyncMessageHandler) getPayload(requestID string) (*userPayload, error) {
148+
func (h *AsyncMessageHandler) getPayload(requestID string) (io.ReadCloser, error) {
142149
key := async.PayloadPath(h.storagePath, requestID)
143150
output, err := h.aws.S3().GetObject(
144151
&s3.GetObjectInput{
@@ -149,16 +156,7 @@ func (h *AsyncMessageHandler) getPayload(requestID string) (*userPayload, error)
149156
if err != nil {
150157
return nil, errors.WithStack(err)
151158
}
152-
153-
contentType := "application/octet-stream"
154-
if output.ContentType != nil {
155-
contentType = *output.ContentType
156-
}
157-
158-
return &userPayload{
159-
Body: output.Body,
160-
ContentType: contentType,
161-
}, nil
159+
return output.Body, nil
162160
}
163161

164162
func (h *AsyncMessageHandler) deletePayload(requestID string) {
@@ -170,13 +168,13 @@ func (h *AsyncMessageHandler) deletePayload(requestID string) {
170168
}
171169
}
172170

173-
func (h *AsyncMessageHandler) submitRequest(payload *userPayload, requestID string) (interface{}, error) {
174-
req, err := http.NewRequest(http.MethodPost, h.config.TargetURL, payload.Body)
171+
func (h *AsyncMessageHandler) submitRequest(payload io.Reader, headers http.Header, requestID string) (interface{}, error) {
172+
req, err := http.NewRequest(http.MethodPost, h.config.TargetURL, payload)
175173
if err != nil {
176174
return nil, errors.WithStack(err)
177175
}
178176

179-
req.Header.Set("Content-Type", payload.ContentType)
177+
req.Header = headers
180178
req.Header.Set(CortexRequestIDHeader, requestID)
181179

182180
startTime := time.Now()
@@ -216,3 +214,14 @@ func (h *AsyncMessageHandler) uploadResult(requestID string, result interface{})
216214
key := async.ResultPath(h.storagePath, requestID)
217215
return h.aws.UploadJSONToS3(result, h.config.Bucket, key)
218216
}
217+
218+
func (h *AsyncMessageHandler) getHeaders(requestID string) (http.Header, error) {
219+
key := async.HeadersPath(h.storagePath, requestID)
220+
221+
var headers http.Header
222+
if err := h.aws.ReadJSONFromS3(&headers, h.config.Bucket, key); err != nil {
223+
return nil, err
224+
}
225+
226+
return headers, nil
227+
}

pkg/dequeuer/async_handler_test.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,10 @@ func TestAsyncMessageHandler_Handle(t *testing.T) {
6868
})
6969
require.NoError(t, err)
7070

71-
err = awsClient.UploadStringToS3("{}", asyncHandler.config.Bucket, fmt.Sprintf("%s/%s/payload", asyncHandler.storagePath, requestID))
71+
err = awsClient.UploadStringToS3("{}", asyncHandler.config.Bucket, async.PayloadPath(asyncHandler.storagePath, requestID))
72+
require.NoError(t, err)
73+
74+
err = awsClient.UploadStringToS3("{}", asyncHandler.config.Bucket, async.HeadersPath(asyncHandler.storagePath, requestID))
7275
require.NoError(t, err)
7376

7477
err = asyncHandler.Handle(&sqs.Message{

pkg/types/async/s3_paths.go

+4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ func PayloadPath(storagePath string, requestID string) string {
2828
return fmt.Sprintf("%s/%s/payload", storagePath, requestID)
2929
}
3030

31+
func HeadersPath(storagePath string, requestID string) string {
32+
return fmt.Sprintf("%s/%s/headers.json", storagePath, requestID)
33+
}
34+
3135
func ResultPath(storagePath string, requestID string) string {
3236
return fmt.Sprintf("%s/%s/result.json", storagePath, requestID)
3337
}

test/e2e/tests/aws/test_autoscaling.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232

3333
@pytest.mark.usefixtures("client")
34-
@pytest.mark.parametrize("apis", TEST_APIS)
34+
@pytest.mark.parametrize("apis", TEST_APIS, ids=[api["primary"] for api in TEST_APIS])
3535
def test_autoscaling(printer: Callable, config: Dict, client: cx.Client, apis: Dict[str, Any]):
3636
skip_autoscaling_test = config["global"].get("skip_autoscaling", False)
3737
if skip_autoscaling_test:

test/e2e/tests/aws/test_realtime.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,8 @@ def test_realtime_api(printer: Callable, config: Dict, client: cx.Client, api: D
6464

6565

6666
@pytest.mark.usefixtures("client")
67-
@pytest.mark.parametrize("api", TEST_APIS_ARM)
67+
@pytest.mark.parametrize("api", TEST_APIS_ARM, ids=[api["name"] for api in TEST_APIS_ARM])
6868
def test_realtime_api_arm(printer: Callable, config: Dict, client: cx.Client, api: Dict[str, str]):
69-
70-
printer(f"testing {api['name']}")
7169
e2e.tests.test_realtime_api(
7270
printer=printer,
7371
client=client,

0 commit comments

Comments
 (0)