Skip to content

feat(batch-processing): move non retry-able message to DLQ #500

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* calling {@link SqsMessageHandler#process(SQSMessage)} method for each {@link SQSMessage} in the received {@link SQSEvent}
* </p>
*
* </p>
* <p>
* If any exception is thrown from {@link SqsMessageHandler#process(SQSMessage)} during processing of a messages, Utility
* will take care of deleting all the successful messages from SQS. When one or more single message fails processing due
* to exception thrown from {@link SqsMessageHandler#process(SQSMessage)}, Lambda execution will fail
Expand All @@ -32,6 +32,24 @@
* {@link SqsBatch#suppressException()} to true. By default its value is false
* </p>
*
* <p>
* If you want certain exceptions to be treated as permanent failures, i.e. exceptions where the result of retrying will
* always be a failure and want these can be immediately moved to the dead letter queue associated to the source SQS queue,
*
* you can use {@link SqsBatch#nonRetryableExceptions()} to configure such exceptions.
* Make sure function execution role has sqs:GetQueueAttributes permission on source SQS queue and sqs:SendMessage,
* sqs:SendMessageBatch permission for configured DLQ.
*
* If you want such messages to be deleted instead, set {@link SqsBatch#deleteNonRetryableMessageFromQueue()} to true.
* By default its value is false.
*
* If there is no DLQ configured on source SQS queue and {@link SqsBatch#nonRetryableExceptions()} attribute is set, if
* nonRetryableExceptions occurs from {@link SqsMessageHandler}, such exceptions will still be treated as temporary
* exceptions and the message will be moved back to source SQS queue for reprocessing. The same behaviour will occur if
* for some reason the utility is unable to move the message to the DLQ. An example of this could be because the function
* is missing the correct permissions.
* </p>
*
* <pre>
* public class SqsMessageHandler implements RequestHandler<SQSEvent, String> {
*
Expand All @@ -51,6 +69,7 @@
*
* ...
* </pre>
* @see <a href="https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-dead-letter-queues.html">Amazon SQS dead-letter queues</a>
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
Expand All @@ -59,4 +78,8 @@
Class<? extends SqsMessageHandler<Object>> value();

boolean suppressException() default false;

Class<? extends Exception>[] nonRetryableExceptions() default {};

boolean deleteNonRetryableMessageFromQueue() default false;
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,28 +1,42 @@
package software.amazon.lambda.powertools.sqs.internal;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
import software.amazon.lambda.powertools.sqs.SQSBatchProcessingException;
import software.amazon.lambda.powertools.sqs.SqsUtils;

import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import static java.lang.String.format;
import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.toList;

public final class BatchContext {
private static final Logger LOG = LoggerFactory.getLogger(BatchContext.class);
private static final Map<String, String> QUEUE_ARN_TO_DLQ_URL_MAPPING = new HashMap<>();

private final Map<SQSMessage, Exception> messageToException = new HashMap<>();
private final List<SQSMessage> success = new ArrayList<>();
private final List<SQSMessage> failures = new ArrayList<>();
private final List<Exception> exceptions = new ArrayList<>();

private final SqsClient client;

public BatchContext(SqsClient client) {
Expand All @@ -34,53 +48,170 @@ public void addSuccess(SQSMessage event) {
}

public void addFailure(SQSMessage event, Exception e) {
failures.add(event);
exceptions.add(e);
messageToException.put(event, e);
}

public <T> void processSuccessAndHandleFailed(final List<T> successReturns,
final boolean suppressException) {
@SafeVarargs
public final <T> void processSuccessAndHandleFailed(final List<T> successReturns,
final boolean suppressException,
final boolean deleteNonRetryableMessageFromQueue,
final Class<? extends Exception>... nonRetryableExceptions) {
if (hasFailures()) {
deleteSuccessMessage();

if (suppressException) {
List<String> messageIds = failures.stream().
map(SQSMessage::getMessageId)
.collect(toList());
List<Exception> exceptions = new ArrayList<>();
List<SQSMessage> failedMessages = new ArrayList<>();
Map<SQSMessage, Exception> nonRetryableMessageToException = new HashMap<>();

LOG.debug(format("[%s] records failed processing, but exceptions are suppressed. " +
"Failed messages %s", failures.size(), messageIds));
if (nonRetryableExceptions.length == 0) {
exceptions.addAll(messageToException.values());
failedMessages.addAll(messageToException.keySet());
} else {
throw new SQSBatchProcessingException(exceptions, failures, successReturns);
messageToException.forEach((sqsMessage, exception) -> {
boolean nonRetryableException = isNonRetryableException(exception, nonRetryableExceptions);

if (nonRetryableException) {
nonRetryableMessageToException.put(sqsMessage, exception);
} else {
exceptions.add(exception);
failedMessages.add(sqsMessage);
}
});
}

List<SQSMessage> messagesToBeDeleted = new ArrayList<>(success);

if (!nonRetryableMessageToException.isEmpty() && deleteNonRetryableMessageFromQueue) {
messagesToBeDeleted.addAll(nonRetryableMessageToException.keySet());
} else if (!nonRetryableMessageToException.isEmpty()) {

boolean isMovedToDlq = moveNonRetryableMessagesToDlqIfConfigured(nonRetryableMessageToException);

if (!isMovedToDlq) {
exceptions.addAll(nonRetryableMessageToException.values());
failedMessages.addAll(nonRetryableMessageToException.keySet());
}
}

deleteMessagesFromQueue(messagesToBeDeleted);

processFailedMessages(successReturns, suppressException, exceptions, failedMessages);
}
}

private <T> void processFailedMessages(List<T> successReturns,
boolean suppressException,
List<Exception> exceptions,
List<SQSMessage> failedMessages) {
if (failedMessages.isEmpty()) {
return;
}

if (suppressException) {
List<String> messageIds = failedMessages.stream().
map(SQSMessage::getMessageId)
.collect(toList());

LOG.debug(format("[%s] records failed processing, but exceptions are suppressed. " +
"Failed messages %s", failedMessages.size(), messageIds));
} else {
throw new SQSBatchProcessingException(exceptions, failedMessages, successReturns);
}
}

private boolean isNonRetryableException(Exception exception, Class<? extends Exception>[] nonRetryableExceptions) {
return Arrays.stream(nonRetryableExceptions)
.anyMatch(aClass -> aClass.isInstance(exception));
}

private boolean moveNonRetryableMessagesToDlqIfConfigured(Map<SQSMessage, Exception> nonRetryableMessageToException) {
Optional<String> dlqUrl = fetchDlqUrl(nonRetryableMessageToException);

if (!dlqUrl.isPresent()) {
return false;
}

List<SendMessageBatchRequestEntry> dlqMessages = nonRetryableMessageToException.keySet().stream()
.map(sqsMessage -> {
Map<String, MessageAttributeValue> messageAttributesMap = new HashMap<>();

sqsMessage.getMessageAttributes().forEach((s, messageAttribute) -> {
MessageAttributeValue.Builder builder = MessageAttributeValue.builder();

builder
.dataType(messageAttribute.getDataType())
.stringValue(messageAttribute.getStringValue());

if (null != messageAttribute.getBinaryValue()) {
builder.binaryValue(SdkBytes.fromByteBuffer(messageAttribute.getBinaryValue()));
}

messageAttributesMap.put(s, builder.build());
});

return SendMessageBatchRequestEntry.builder()
.messageBody(sqsMessage.getBody())
.id(sqsMessage.getMessageId())
.messageAttributes(messageAttributesMap)
.build();
})
.collect(toList());

SendMessageBatchResponse sendMessageBatchResponse = client.sendMessageBatch(builder -> builder.queueUrl(dlqUrl.get())
.entries(dlqMessages));

LOG.debug("Response from send batch message to DLQ request {}", sendMessageBatchResponse);

return true;
}

private Optional<String> fetchDlqUrl(Map<SQSMessage, Exception> nonRetryableMessageToException) {
return nonRetryableMessageToException.keySet().stream()
.findFirst()
.map(sqsMessage -> QUEUE_ARN_TO_DLQ_URL_MAPPING.computeIfAbsent(sqsMessage.getEventSourceArn(), sourceArn -> {
String queueUrl = url(sourceArn);

GetQueueAttributesResponse queueAttributes = client.getQueueAttributes(GetQueueAttributesRequest.builder()
.attributeNames(QueueAttributeName.REDRIVE_POLICY)
.queueUrl(queueUrl)
.build());

return ofNullable(queueAttributes.attributes().get(QueueAttributeName.REDRIVE_POLICY))
.map(policy -> {
try {
return SqsUtils.objectMapper().readTree(policy);
} catch (JsonProcessingException e) {
LOG.debug("Unable to parse Re drive policy for queue {}. Even if DLQ exists, failed messages will be send back to main queue.", queueUrl, e);
return null;
}
})
.map(node -> node.get("deadLetterTargetArn"))
.map(JsonNode::asText)
.map(this::url)
.orElse(null);
}));
}

private boolean hasFailures() {
return !failures.isEmpty();
return !messageToException.isEmpty();
}

private void deleteSuccessMessage() {
if (!success.isEmpty()) {
private void deleteMessagesFromQueue(final List<SQSMessage> messages) {
if (!messages.isEmpty()) {
DeleteMessageBatchRequest request = DeleteMessageBatchRequest.builder()
.queueUrl(url())
.entries(success.stream().map(m -> DeleteMessageBatchRequestEntry.builder()
.queueUrl(url(messages.get(0).getEventSourceArn()))
.entries(messages.stream().map(m -> DeleteMessageBatchRequestEntry.builder()
.id(m.getMessageId())
.receiptHandle(m.getReceiptHandle())
.build()).collect(toList()))
.build();

DeleteMessageBatchResponse deleteMessageBatchResponse = client.deleteMessageBatch(request);
LOG.debug(format("Response from delete request %s", deleteMessageBatchResponse));
LOG.debug("Response from delete request {}", deleteMessageBatchResponse);
}
}

private String url() {
String[] arnArray = success.get(0).getEventSourceArn().split(":");
return client.getQueueUrl(GetQueueUrlRequest.builder()
.queueOwnerAWSAccountId(arnArray[4])
.queueName(arnArray[5])
.build())
.queueUrl();
private String url(String queueArn) {
String[] arnArray = queueArn.split(":");
return String.format("https://sqs.%s.amazonaws.com/%s/%s", arnArray[3], arnArray[4], arnArray[5]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ && placedOnSqsEventRequestHandler(pjp)) {

SQSEvent sqsEvent = (SQSEvent) proceedArgs[0];

batchProcessor(sqsEvent, sqsBatch.suppressException(), sqsBatch.value());
batchProcessor(sqsEvent,
sqsBatch.suppressException(),
sqsBatch.value(),
sqsBatch.deleteNonRetryableMessageFromQueue(),
sqsBatch.nonRetryableExceptions());
}

return pjp.proceed(proceedArgs);
Expand Down
Loading