diff --git a/aws_lambda_powertools/shared/__init__.py b/aws_lambda_powertools/shared/__init__.py
index e69de29bb2d..d68e37349b7 100644
--- a/aws_lambda_powertools/shared/__init__.py
+++ b/aws_lambda_powertools/shared/__init__.py
@@ -0,0 +1 @@
+"""Internal shared functions. Do not rely on it besides internal usage."""
diff --git a/aws_lambda_powertools/utilities/data_classes/__init__.py b/aws_lambda_powertools/utilities/data_classes/__init__.py
index b29b63d8345..99754266928 100644
--- a/aws_lambda_powertools/utilities/data_classes/__init__.py
+++ b/aws_lambda_powertools/utilities/data_classes/__init__.py
@@ -14,7 +14,12 @@
from .event_bridge_event import EventBridgeEvent
from .event_source import event_source
from .kafka_event import KafkaEvent
-from .kinesis_firehose_event import KinesisFirehoseEvent
+from .kinesis_firehose_event import (
+ KinesisFirehoseDataTransformationRecord,
+ KinesisFirehoseDataTransformationRecordMetadata,
+ KinesisFirehoseDataTransformationResponse,
+ KinesisFirehoseEvent,
+)
from .kinesis_stream_event import KinesisStreamEvent
from .lambda_function_url_event import LambdaFunctionUrlEvent
from .s3_event import S3Event, S3EventBridgeNotificationEvent
@@ -39,6 +44,9 @@
"KafkaEvent",
"KinesisFirehoseEvent",
"KinesisStreamEvent",
+ "KinesisFirehoseDataTransformationResponse",
+ "KinesisFirehoseDataTransformationRecord",
+ "KinesisFirehoseDataTransformationRecordMetadata",
"LambdaFunctionUrlEvent",
"S3Event",
"S3EventBridgeNotificationEvent",
diff --git a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py
index 47dc196856d..dd42a09fa5e 100644
--- a/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py
+++ b/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py
@@ -1,9 +1,179 @@
import base64
-from typing import Iterator, Optional
+import json
+import warnings
+from dataclasses import dataclass, field
+from typing import Any, Callable, ClassVar, Dict, Iterator, List, Optional, Tuple
+
+from typing_extensions import Literal
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
+@dataclass(repr=False, order=False, frozen=True)
+class KinesisFirehoseDataTransformationRecordMetadata:
+ """
+ Metadata in Firehose Data Transform Record.
+
+ Parameters
+ ----------
+ partition_keys: Dict[str, str]
+ A dict of partition keys/value in string format, e.g. `{"year":"2023","month":"09"}`
+
+ Documentation:
+ --------------
+ - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
+ """
+
+ partition_keys: Dict[str, str] = field(default_factory=lambda: {})
+
+ def asdict(self) -> Dict:
+ if self.partition_keys is not None:
+ return {"partitionKeys": self.partition_keys}
+ return {}
+
+
+@dataclass(repr=False, order=False)
+class KinesisFirehoseDataTransformationRecord:
+ """Record in Kinesis Data Firehose response object.
+
+ Parameters
+ ----------
+ record_id: str
+ uniquely identifies this record within the current batch
+ result: Literal["Ok", "Dropped", "ProcessingFailed"]
+ record data transformation status, whether it succeeded, should be dropped, or failed.
+ data: str
+ base64-encoded payload, by default empty string.
+
+ Use `data_from_text` or `data_from_json` methods to convert data if needed.
+
+ metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata]
+ Metadata associated with this record; can contain partition keys.
+
+ See: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
+ json_serializer: Callable
+ function to serialize `obj` to a JSON formatted `str`, by default json.dumps
+ json_deserializer: Callable
+ function to deserialize `str`, `bytes`, bytearray` containing a JSON document to a Python `obj`,
+ by default json.loads
+
+ Documentation:
+ --------------
+ - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
+ """
+
+ _valid_result_types: ClassVar[Tuple[str, str, str]] = ("Ok", "Dropped", "ProcessingFailed")
+
+ record_id: str
+ result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok"
+ data: str = ""
+ metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None
+ json_serializer: Callable = json.dumps
+ json_deserializer: Callable = json.loads
+ _json_data: Optional[Any] = None
+
+ def asdict(self) -> Dict:
+ if self.result not in self._valid_result_types:
+ warnings.warn(
+ stacklevel=1,
+ message=f'The result "{self.result}" is not valid, Choose from "Ok", "Dropped", "ProcessingFailed"',
+ )
+
+ record: Dict[str, Any] = {
+ "recordId": self.record_id,
+ "result": self.result,
+ "data": self.data,
+ }
+ if self.metadata:
+ record["metadata"] = self.metadata.asdict()
+ return record
+
+ @property
+ def data_as_bytes(self) -> bytes:
+ """Decoded base64-encoded data as bytes"""
+ if not self.data:
+ return b""
+ return base64.b64decode(self.data)
+
+ @property
+ def data_as_text(self) -> str:
+ """Decoded base64-encoded data as text"""
+ if not self.data:
+ return ""
+ return self.data_as_bytes.decode("utf-8")
+
+ @property
+ def data_as_json(self) -> Dict:
+ """Decoded base64-encoded data loaded to json"""
+ if not self.data:
+ return {}
+ if self._json_data is None:
+ self._json_data = self.json_deserializer(self.data_as_text)
+ return self._json_data
+
+
+@dataclass(repr=False, order=False)
+class KinesisFirehoseDataTransformationResponse:
+ """Kinesis Data Firehose response object
+
+ Documentation:
+ --------------
+ - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
+
+ Parameters
+ ----------
+ records : List[KinesisFirehoseResponseRecord]
+ records of Kinesis Data Firehose response object,
+ optional parameter at start. can be added later using `add_record` function.
+
+ Examples
+ --------
+
+ **Transforming data records**
+
+ ```python
+ from aws_lambda_powertools.utilities.data_classes import (
+ KinesisFirehoseDataTransformationRecord,
+ KinesisFirehoseDataTransformationResponse,
+ KinesisFirehoseEvent,
+ )
+ from aws_lambda_powertools.utilities.serialization import base64_from_json
+ from aws_lambda_powertools.utilities.typing import LambdaContext
+
+
+ def lambda_handler(event: dict, context: LambdaContext):
+ firehose_event = KinesisFirehoseEvent(event)
+ result = KinesisFirehoseDataTransformationResponse()
+
+ for record in firehose_event.records:
+ payload = record.data_as_text # base64 decoded data as str
+
+ ## generate data to return
+ transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload}
+ processed_record = KinesisFirehoseDataTransformationRecord(
+ record_id=record.record_id,
+ data=base64_from_json(transformed_data),
+ )
+
+ result.add_record(processed_record)
+
+ # return transformed records
+ return result.asdict()
+ ```
+ """
+
+ records: List[KinesisFirehoseDataTransformationRecord] = field(default_factory=list)
+
+ def add_record(self, record: KinesisFirehoseDataTransformationRecord):
+ self.records.append(record)
+
+ def asdict(self) -> Dict:
+ if not self.records:
+ raise ValueError("Amazon Kinesis Data Firehose doesn't accept empty response")
+
+ return {"records": [record.asdict() for record in self.records]}
+
+
class KinesisFirehoseRecordMetadata(DictWrapper):
@property
def _metadata(self) -> dict:
@@ -77,6 +247,32 @@ def data_as_json(self) -> dict:
self._json_data = self._json_deserializer(self.data_as_text)
return self._json_data
+ def build_data_transformation_response(
+ self,
+ result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok",
+ data: str = "",
+ metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None,
+ ) -> KinesisFirehoseDataTransformationRecord:
+ """Create a KinesisFirehoseResponseRecord directly using the record_id and given values
+
+ Parameters
+ ----------
+ result : Literal["Ok", "Dropped", "ProcessingFailed"]
+ processing result, supported value: Ok, Dropped, ProcessingFailed
+ data : str, optional
+ data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or
+ use either function like `data_from_text`, `data_from_json` to populate data
+ metadata: KinesisFirehoseResponseRecordMetadata, optional
+ Metadata associated with this record; can contain partition keys
+ - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
+ """
+ return KinesisFirehoseDataTransformationRecord(
+ record_id=self.record_id,
+ result=result,
+ data=data,
+ metadata=metadata,
+ )
+
class KinesisFirehoseEvent(DictWrapper):
"""Kinesis Data Firehose event
diff --git a/aws_lambda_powertools/utilities/serialization.py b/aws_lambda_powertools/utilities/serialization.py
new file mode 100644
index 00000000000..ef76eec70e2
--- /dev/null
+++ b/aws_lambda_powertools/utilities/serialization.py
@@ -0,0 +1,59 @@
+"""Standalone functions to serialize/deserialize common data structures"""
+import base64
+import json
+from typing import Any, Callable
+
+
+def base64_encode(data: str) -> str:
+ """Encode a string and returns Base64-encoded encoded value.
+
+ Parameters
+ ----------
+ data: str
+ The string to encode.
+
+ Returns
+ -------
+ str
+ The Base64-encoded encoded value.
+ """
+ return base64.b64encode(data.encode()).decode("utf-8")
+
+
+def base64_decode(data: str) -> str:
+ """Decodes a Base64-encoded string and returns the decoded value.
+
+ Parameters
+ ----------
+ data: str
+ The Base64-encoded string to decode.
+
+ Returns
+ -------
+ str
+ The decoded string value.
+ """
+ return base64.b64decode(data).decode("utf-8")
+
+
+def base64_from_str(data: str) -> str:
+ """Encode str as base64 string"""
+ return base64.b64encode(data.encode()).decode("utf-8")
+
+
+def base64_from_json(data: Any, json_serializer: Callable[..., str] = json.dumps) -> str:
+ """Encode JSON serializable data as base64 string
+
+ Parameters
+ ----------
+ data: Any
+ JSON serializable (dict, list, boolean, etc.)
+ json_serializer: Callable
+ function to serialize `obj` to a JSON formatted `str`, by default json.dumps
+
+ Returns
+ -------
+ str:
+ JSON string as base64 string
+ """
+ return base64_from_str(data=json_serializer(data))
diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md
index 603ab87f50c..6016e68cfef 100644
--- a/docs/utilities/data_classes.md
+++ b/docs/utilities/data_classes.md
@@ -975,18 +975,39 @@ or plain text, depending on the original payload.
### Kinesis Firehose delivery stream
-Kinesis Firehose Data Transformation can use a Lambda Function to modify the records
-inline, and re-emit them back to the Delivery Stream.
+When using Kinesis Firehose, you can use a Lambda function to [perform data transformation](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html){target="_blank"}. For each transformed record, you can choose to either:
-Similar to Kinesis Data Streams, the events contain base64 encoded data. You can use the helper
-function to access the data either as json or plain text, depending on the original payload.
+* **A)** Put them back to the delivery stream (default)
+* **B)** Drop them so consumers don't receive them (e.g., data validation)
+* **C)** Indicate a record failed data transformation and should be retried
-=== "app.py"
+To do that, you can use `KinesisFirehoseDataTransformationResponse` class along with helper functions to make it easier to decode and encode base64 data in the stream.
- ```python
+=== "Transforming streaming records"
+
+ ```python hl_lines="2-3 12 28"
--8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream.py"
```
+ 1. **Ingesting JSON payloads?**
Use `record.data_as_json` to easily deserialize them.
+ 2. For your convenience, `base64_from_json` serializes a dict to JSON, then encode as base64 data.
+
+=== "Dropping invalid records"
+
+ ```python hl_lines="5-6 16 34"
+ --8<-- "examples/event_sources/src/kinesis_firehose_response_drop.py"
+ ```
+
+ 1. This exception would be generated from `record.data_as_json` if invalid payload.
+
+=== "Indicating a processing failure"
+
+ ```python hl_lines="2-3 33"
+ --8<-- "examples/event_sources/src/kinesis_firehose_response_exception.py"
+ ```
+
+ 1. This record will now be sent to your [S3 bucket in the `processing-failed` folder](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html#data-transformation-failure-handling){target="_blank"}.
+
### Lambda Function URL
=== "app.py"
diff --git a/examples/event_sources/src/kinesis_firehose_delivery_stream.py b/examples/event_sources/src/kinesis_firehose_delivery_stream.py
index 770bfb1ee63..3dc6fbda703 100644
--- a/examples/event_sources/src/kinesis_firehose_delivery_stream.py
+++ b/examples/event_sources/src/kinesis_firehose_delivery_stream.py
@@ -1,28 +1,28 @@
-import base64
-import json
-
from aws_lambda_powertools.utilities.data_classes import (
+ KinesisFirehoseDataTransformationResponse,
KinesisFirehoseEvent,
event_source,
)
+from aws_lambda_powertools.utilities.serialization import base64_from_json
from aws_lambda_powertools.utilities.typing import LambdaContext
@event_source(data_class=KinesisFirehoseEvent)
def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext):
- result = []
+ result = KinesisFirehoseDataTransformationResponse()
for record in event.records:
- # if data was delivered as json; caches loaded value
- data = record.data_as_json
+ # get original data using data_as_text property
+ data = record.data_as_text # (1)!
+
+ ## generate data to return
+ transformed_data = {"new_data": "transformed data using Powertools", "original_payload": data}
- processed_record = {
- "recordId": record.record_id,
- "data": base64.b64encode(json.dumps(data).encode("utf-8")),
- "result": "Ok",
- }
+ processed_record = record.build_data_transformation_response(
+ data=base64_from_json(transformed_data), # (2)!
+ )
- result.append(processed_record)
+ result.add_record(processed_record)
# return transformed records
- return {"records": result}
+ return result.asdict()
diff --git a/examples/event_sources/src/kinesis_firehose_response_drop.py b/examples/event_sources/src/kinesis_firehose_response_drop.py
new file mode 100644
index 00000000000..8b565480a34
--- /dev/null
+++ b/examples/event_sources/src/kinesis_firehose_response_drop.py
@@ -0,0 +1,40 @@
+from json import JSONDecodeError
+from typing import Dict
+
+from aws_lambda_powertools.utilities.data_classes import (
+ KinesisFirehoseDataTransformationRecord,
+ KinesisFirehoseDataTransformationResponse,
+ KinesisFirehoseEvent,
+ event_source,
+)
+from aws_lambda_powertools.utilities.serialization import base64_from_json
+from aws_lambda_powertools.utilities.typing import LambdaContext
+
+
+@event_source(data_class=KinesisFirehoseEvent)
+def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext):
+ result = KinesisFirehoseDataTransformationResponse()
+
+ for record in event.records:
+ try:
+ payload: Dict = record.data_as_json # decodes and deserialize base64 JSON string
+
+ ## generate data to return
+ transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload}
+
+ processed_record = KinesisFirehoseDataTransformationRecord(
+ record_id=record.record_id,
+ data=base64_from_json(transformed_data),
+ )
+ except JSONDecodeError: # (1)!
+ # our producers ingest JSON payloads only; drop malformed records from the stream
+ processed_record = KinesisFirehoseDataTransformationRecord(
+ record_id=record.record_id,
+ data=record.data,
+ result="Dropped",
+ )
+
+ result.add_record(processed_record)
+
+ # return transformed records
+ return result.asdict()
diff --git a/examples/event_sources/src/kinesis_firehose_response_exception.py b/examples/event_sources/src/kinesis_firehose_response_exception.py
new file mode 100644
index 00000000000..43ba3a039b2
--- /dev/null
+++ b/examples/event_sources/src/kinesis_firehose_response_exception.py
@@ -0,0 +1,39 @@
+from aws_lambda_powertools.utilities.data_classes import (
+ KinesisFirehoseDataTransformationRecord,
+ KinesisFirehoseDataTransformationResponse,
+ KinesisFirehoseEvent,
+ event_source,
+)
+from aws_lambda_powertools.utilities.serialization import base64_from_json
+from aws_lambda_powertools.utilities.typing import LambdaContext
+
+
+@event_source(data_class=KinesisFirehoseEvent)
+def lambda_handler(event: dict, context: LambdaContext):
+ firehose_event = KinesisFirehoseEvent(event)
+ result = KinesisFirehoseDataTransformationResponse()
+
+ for record in firehose_event.records:
+ try:
+ payload = record.data_as_text # base64 decoded data as str
+
+ # generate data to return
+ transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload}
+
+ # Default result is Ok
+ processed_record = KinesisFirehoseDataTransformationRecord(
+ record_id=record.record_id,
+ data=base64_from_json(transformed_data),
+ )
+ except Exception:
+ # add Failed result to processing results, send back to kinesis for retry
+ processed_record = KinesisFirehoseDataTransformationRecord(
+ record_id=record.record_id,
+ data=record.data,
+ result="ProcessingFailed", # (1)!
+ )
+
+ result.add_record(processed_record)
+
+ # return transformed records
+ return result.asdict()
diff --git a/tests/unit/data_classes/test_kinesis_firehose_response.py b/tests/unit/data_classes/test_kinesis_firehose_response.py
new file mode 100644
index 00000000000..0be8d0d3ec0
--- /dev/null
+++ b/tests/unit/data_classes/test_kinesis_firehose_response.py
@@ -0,0 +1,115 @@
+from aws_lambda_powertools.utilities.data_classes import (
+ KinesisFirehoseDataTransformationRecord,
+ KinesisFirehoseDataTransformationRecordMetadata,
+ KinesisFirehoseDataTransformationResponse,
+ KinesisFirehoseEvent,
+)
+from aws_lambda_powertools.utilities.serialization import base64_encode, base64_from_str
+from tests.functional.utils import load_event
+
+
+def test_kinesis_firehose_response_metadata():
+ # When we create metadata with partition keys and attach to a firehose response record
+ metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(partition_keys={"year": "2023"})
+
+ processed_record = KinesisFirehoseDataTransformationRecord(
+ record_id="test_id",
+ metadata=metadata_partition,
+ data="",
+ )
+ # Then we should have partition keys available in metadata field with same value
+ assert processed_record.metadata.partition_keys["year"] == "2023"
+ assert metadata_partition.asdict() == {"partitionKeys": {"year": "2023"}}
+
+
+def test_kinesis_firehose_response():
+ # GIVEN a Kinesis Firehose Event with two records
+ raw_event = load_event("kinesisFirehoseKinesisEvent.json")
+ parsed_event = KinesisFirehoseEvent(data=raw_event)
+
+ # WHEN we create a Data Transformation Response without changing the data
+ response = KinesisFirehoseDataTransformationResponse()
+ for record in parsed_event.records:
+ processed_record = KinesisFirehoseDataTransformationRecord(
+ record_id=record.record_id,
+ data=record.data,
+ )
+ response.add_record(record=processed_record)
+
+ # THEN we should have the same record data
+ record_01, record_02 = response.records[0], response.records[1]
+ raw_record_01, raw_record_02 = raw_event["records"][0], raw_event["records"][1]
+
+ assert len(response.records) == 2
+
+ assert record_01.result == "Ok"
+ assert record_02.result == "Ok"
+
+ assert record_01.record_id == raw_record_01["recordId"]
+ assert record_02.record_id == raw_record_02["recordId"]
+
+ assert record_01.data == raw_record_01["data"]
+ assert record_02.data == raw_record_02["data"]
+
+
+def test_kinesis_firehose_response_asdict():
+ # GIVEN the following example response provided by Firehose
+ sample_response = {
+ "records": [
+ {"recordId": "sample_record", "data": "", "result": "Ok", "metadata": {"partitionKeys": {"year": "2023"}}},
+ ],
+ }
+
+ response = KinesisFirehoseDataTransformationResponse()
+ metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(
+ partition_keys=sample_response["records"][0]["metadata"]["partitionKeys"],
+ )
+
+ # WHEN we create a transformation record with the exact same data
+ processed_record = KinesisFirehoseDataTransformationRecord(
+ record_id=sample_response["records"][0]["recordId"],
+ data=sample_response["records"][0]["data"],
+ result=sample_response["records"][0]["result"],
+ metadata=metadata_partition,
+ )
+
+ # THEN serialized response should return the same value
+ response.add_record(record=processed_record)
+ assert response.asdict() == sample_response
+
+
+def test_kinesis_firehose_create_response():
+ # GIVEN a Kinesis Firehose Event with two records
+ raw_event = load_event("kinesisFirehoseKinesisEvent.json")
+ parsed_event = KinesisFirehoseEvent(data=raw_event)
+
+ # WHEN we create a Data Transformation Response changing the data
+ # WHEN we add partitions keys
+
+ arbitrary_data = "arbitrary data"
+
+ response = KinesisFirehoseDataTransformationResponse()
+ for record in parsed_event.records:
+ metadata_partition = KinesisFirehoseDataTransformationRecordMetadata(partition_keys={"year": "2023"})
+ processed_record = record.build_data_transformation_response(
+ metadata=metadata_partition,
+ data=base64_from_str(arbitrary_data),
+ )
+ response.add_record(record=processed_record)
+
+ # THEN we should have the same record data
+ record_01, record_02 = response.records[0], response.records[1]
+ raw_record_01, raw_record_02 = raw_event["records"][0], raw_event["records"][1]
+
+ assert len(response.records) == 2
+
+ assert record_01.result == "Ok"
+ assert record_02.result == "Ok"
+
+ assert record_01.record_id == raw_record_01["recordId"]
+ assert record_02.record_id == raw_record_02["recordId"]
+
+ assert record_01.data == base64_encode(arbitrary_data)
+ assert record_02.data == base64_encode(arbitrary_data)
+
+ assert record_01.metadata.partition_keys["year"] == "2023"