From b37c2e580fb05913b9acb167a8b0d6a743647b66 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 4 Mar 2021 18:20:35 +0100 Subject: [PATCH 1/2] docs: improve batch processing navigation and reading Signed-off-by: heitorlessa --- docs/utilities/batch.md | 135 ++++++++++++++++++++++++---------------- 1 file changed, 83 insertions(+), 52 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index aa284e7f38b..cd78e5b0b5b 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -5,13 +5,13 @@ description: Utility The SQS batch processing utility provides a way to handle partial failures when processing batches of messages from SQS. -**Key Features** +## Key Features * Prevent successfully processed messages being returned to SQS * Simple interface for individually processing messages from a batch * Build your own batch processor using the base classes -**Background** +## Background When using SQS as a Lambda event source mapping, Lambda functions are triggered with a batch of messages from SQS. @@ -25,35 +25,76 @@ are returned to the queue. More details on how Lambda works with SQS can be found in the [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html) +## Getting started -**IAM Permissions** +### IAM Permissions -This utility requires additional permissions to work as expected. Lambda functions using this utility require the `sqs:DeleteMessageBatch` permission. +Before your use this utility, your AWS Lambda function must have `sqs:DeleteMessageBatch` permission to delete successful messages directly from the queue. -## Processing messages from SQS +> Example using AWS Serverless Application Model (SAM) -You can use either **[sqs_batch_processor](#sqs_batch_processor-decorator)** decorator, or **[PartialSQSProcessor](#partialsqsprocessor-context-manager)** as a context manager. +=== "template.yml" + ```yaml hl_lines="2-3 12-15" + Resources: + MyQueue: + Type: AWS::SQS::Queue -They have nearly the same behaviour when it comes to processing messages from the batch: + HelloWorldFunction: + Type: AWS::Serverless::Function + Properties: + Runtime: python3.8 + Environment: + Variables: + POWERTOOLS_SERVICE_NAME: example + Policies: + - SQSPollerPolicy: + QueueName: + !GetAtt MyQueue.QueueName + ``` -* **Entire batch has been successfully processed**, where your Lambda handler returned successfully, we will let SQS delete the batch to optimize your cost -* **Entire Batch has been partially processed successfully**, where exceptions were raised within your `record handler`, we will: - - **1)** Delete successfully processed messages from the queue by directly calling `sqs:DeleteMessageBatch` - - **2)** Raise `SQSBatchProcessingError` to ensure failed messages return to your SQS queue +### Processing messages from SQS -The only difference is that **PartialSQSProcessor** will give you access to processed messages if you need. +You can use either **[sqs_batch_processor](#sqs_batch_processor-decorator)** decorator, or **[PartialSQSProcessor](#partialsqsprocessor-context-manager)** as a context manager if you'd like access to the processed results. + +You need to create a function to handle each record from the batch - We call it `record_handler` from here on. + +=== "Decorator" + + ```python hl_lines="3 6" + from aws_lambda_powertools.utilities.batch import sqs_batch_processor + + def record_handler(record): + return do_something_with(record["body"]) + + @sqs_batch_processor(record_handler=record_handler) + def lambda_handler(event, context): + return {"statusCode": 200} + ``` +=== "Context manager" -## Record Handler + ```python hl_lines="3 9 11-12" + from aws_lambda_powertools.utilities.batch import PartialSQSProcessor + + def record_handler(record): + return_value = do_something_with(record["body"]) + return return_value -Both decorator and context managers require an explicit function to process the batch of messages - namely `record_handler` parameter. + def lambda_handler(event, context): + records = event["Records"] + processor = PartialSQSProcessor() -This function is responsible for processing each individual message from the batch, and to raise an exception if unable to process any of the messages sent. + with processor(records, record_handler) as proc: + result = proc.process() # Returns a list of all results from record_handler -**Any non-exception/successful return from your record handler function** will instruct both decorator and context manager to queue up each individual message for deletion. + return result + ``` -### sqs_batch_processor decorator +!!! tip + **Any non-exception/successful return from your record handler function** will instruct both decorator and context manager to queue up each individual message for deletion. -When using this decorator, you need provide a function via `record_handler` param that will process individual messages from the batch - It should raise an exception if it is unable to process the record. + If the entire batch succeeds, we let Lambda to proceed in deleting the records from the queue for cost reasons. + +### Partial failure mechanics All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch: @@ -61,29 +102,26 @@ All records in the batch will be passed to this handler for processing, even if * **Any unprocessed messages detected**, we will raise `SQSBatchProcessingError` to ensure failed messages return to your SQS queue !!! warning - You will not have accessed to the processed messages within the Lambda Handler - all processing logic will and should be performed by the record_handler function. + You will not have accessed to the **processed messages** within the Lambda Handler. -=== "app.py" + All processing logic will and should be performed by the `record_handler` function. - ```python - from aws_lambda_powertools.utilities.batch import sqs_batch_processor +## Advanced - def record_handler(record): - # This will be called for each individual message from a batch - # It should raise an exception if the message was not processed successfully - return_value = do_something_with(record["body"]) - return return_value +### Choosing between decorator and context manager - @sqs_batch_processor(record_handler=record_handler) - def lambda_handler(event, context): - return {"statusCode": 200} - ``` +They have nearly the same behaviour when it comes to processing messages from the batch: -### PartialSQSProcessor context manager +* **Entire batch has been successfully processed**, where your Lambda handler returned successfully, we will let SQS delete the batch to optimize your cost +* **Entire Batch has been partially processed successfully**, where exceptions were raised within your `record handler`, we will: + - **1)** Delete successfully processed messages from the queue by directly calling `sqs:DeleteMessageBatch` + - **2)** Raise `SQSBatchProcessingError` to ensure failed messages return to your SQS queue + +The only difference is that **PartialSQSProcessor** will give you access to processed messages if you need. -If you require access to the result of processed messages, you can use this context manager. +### Accessing processed messages -The result from calling `process()` on the context manager will be a list of all the return values from your `record_handler` function. +Use `PartialSQSProcessor` context manager to access a list of all return values from your `record_handler` function. === "app.py" @@ -91,11 +129,7 @@ The result from calling `process()` on the context manager will be a list of all from aws_lambda_powertools.utilities.batch import PartialSQSProcessor def record_handler(record): - # This will be called for each individual message from a batch - # It should raise an exception if the message was not processed successfully - return_value = do_something_with(record["body"]) - return return_value - + return do_something_with(record["body"]) def lambda_handler(event, context): records = event["Records"] @@ -108,7 +142,7 @@ The result from calling `process()` on the context manager will be a list of all return result ``` -## Passing custom boto3 config +### Passing custom boto3 config If you need to pass custom configuration such as region to the SDK, you can pass your own [botocore config object](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html) to the `sqs_batch_processor` decorator: @@ -159,14 +193,15 @@ the `sqs_batch_processor` decorator: ``` -## Suppressing exceptions +### Suppressing exceptions If you want to disable the default behavior where `SQSBatchProcessingError` is raised if there are any errors, you can pass the `suppress_exception` boolean argument. === "Decorator" - ```python hl_lines="2" - ... + ```python hl_lines="3" + from aws_lambda_powertools.utilities.batch import sqs_batch_processor + @sqs_batch_processor(record_handler=record_handler, config=config, suppress_exception=True) def lambda_handler(event, context): return {"statusCode": 200} @@ -174,15 +209,16 @@ If you want to disable the default behavior where `SQSBatchProcessingError` is r === "Context manager" - ```python hl_lines="2" - ... + ```python hl_lines="3" + from aws_lambda_powertools.utilities.batch import PartialSQSProcessor + processor = PartialSQSProcessor(config=config, suppress_exception=True) with processor(records, record_handler): result = processor.process() ``` -## Create your own partial processor +### Create your own partial processor You can create your own partial batch processor by inheriting the `BasePartialProcessor` class, and implementing `_prepare()`, `_clean()` and `_process_record()`. @@ -192,11 +228,9 @@ You can create your own partial batch processor by inheriting the `BasePartialPr You can then use this class as a context manager, or pass it to `batch_processor` to use as a decorator on your Lambda handler function. -**Example:** - === "custom_processor.py" - ```python + ```python hl_lines="3 9 24 30 37 57" from random import randint from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor @@ -223,14 +257,12 @@ You can then use this class as a context manager, or pass it to `batch_processor def _prepare(self): # It's called once, *before* processing # Creates table resource and clean previous results - # E.g.: self.ddb_table = boto3.resource("dynamodb").Table(self.table_name) self.success_messages.clear() def _clean(self): # It's called once, *after* closing processing all records (closing the context manager) # Here we're sending, at once, all successful messages to a ddb table - # E.g.: with ddb_table.batch_writer() as batch: for result in self.success_messages: batch.put_item(Item=result) @@ -239,7 +271,6 @@ You can then use this class as a context manager, or pass it to `batch_processor # It handles how your record is processed # Here we're keeping the status of each run # where self.handler is the record_handler function passed as an argument - # E.g.: try: result = self.handler(record) # record_handler passed to decorator/context manager return self.success_handler(record, result) From 054348d3ab07aa024933d7f4cdf940d709737f83 Mon Sep 17 00:00:00 2001 From: heitorlessa Date: Thu, 4 Mar 2021 18:28:15 +0100 Subject: [PATCH 2/2] docs: include Charles suggestion for Sentry integration --- docs/utilities/batch.md | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index cd78e5b0b5b..ca4606e0f40 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -291,3 +291,24 @@ You can then use this class as a context manager, or pass it to `batch_processor def lambda_handler(event, context): return {"statusCode": 200} ``` + +### Integrating exception handling with Sentry.io + +When using Sentry.io for error monitoring, you can override `failure_handler` to include to capture each processing exception: + +> Credits to [Charles-Axel Dein](https://github.com/awslabs/aws-lambda-powertools-python/issues/293#issuecomment-781961732) + +=== "sentry_integration.py" + + ```python hl_lines="4 7-8" + from typing import Tuple + + from aws_lambda_powertools.utilities.batch import PartialSQSProcessor + from sentry_sdk import capture_exception + + class SQSProcessor(PartialSQSProcessor): + def failure_handler(self, record: Event, exception: Tuple) -> Tuple: # type: ignore + capture_exception() # send exception to Sentry + logger.exception("got exception while processing SQS message") + return super().failure_handler(record, exception) # type: ignore + ```