diff --git a/pkg/enqueuer/enqueuer.go b/pkg/enqueuer/enqueuer.go index 106fef5bed..6eaaf64e8d 100644 --- a/pkg/enqueuer/enqueuer.go +++ b/pkg/enqueuer/enqueuer.go @@ -78,6 +78,10 @@ type JobSubmission struct { DelimitedFiles *DelimitedFiles `json:"delimited_files"` } +type onJobCompleteRequestBody struct { + Message string `json:"message"` +} + func randomMessageID() string { return random.String(40) // maximum is 80 (for sqs.SendMessageBatchRequestEntry.Id) but this ID may show up in a user error message } @@ -127,10 +131,17 @@ func (e *Enqueuer) Enqueue() (int, error) { } } + onJobCompleteBodyBytes, err := json.Marshal(onJobCompleteRequestBody{ + Message: "job_complete", + }) + if err != nil { + return 0, err + } + randomID := randomMessageID() _, err = e.aws.SQS().SendMessage(&sqs.SendMessageInput{ QueueUrl: aws.String(e.queueURL), - MessageBody: aws.String("\"job_complete\""), + MessageBody: aws.String(string(onJobCompleteBodyBytes)), MessageDeduplicationId: aws.String(randomID), // prevent content based deduping MessageGroupId: aws.String(randomID), // aws recommends message group id per message to improve chances of exactly-once MessageAttributes: map[string]*sqs.MessageAttributeValue{