Description
Expected Behaviour
1. Overview
If you are using a data processor Lambda Function for a Kinesis Firehose Delivery Stream, you should be able to use the KinesisFirehoseModel
model to validate events without encountering a pydantic ValidationError
.
Current Behaviour
1. Overview
When you validate a Kinesis Firehose Delivery Stream event with the KinesisFirehoseModel
model, it fails because the kinesis record metadata's subsequenceNumber
(records[].kinesisRecordMetadata.subsequenceNumber) attribute has an incorrect type. This forces pydantic to throw a ValidationError
that should not happen.
Specifically, the KinesisFirehoseModel
model starts validating its sub-structures. Once it validates its kinesis records' metadata with the KinesisFirehoseRecordMetadata
model, it fails. This is because sub-sequence numbers are integers, but the model's subsequenceNumber
attribute is expecting a string.
2. ValidationError Output
pydantic_core._pydantic_core.ValidationError: 1 validation error for KinesisFirehoseModel
records.0.kinesisRecordMetadata.subsequenceNumber
Input should be a valid string [type=string_type, input_value=0, input_type=int]
For further information visit https://errors.pydantic.dev/2.4/v/string_type
Code snippet
#!/usr/bin/env python
from aws_lambda_powertools.utilities.parser.models import KinesisFirehoseModel
def main() -> None:
"""Code snippet that shows the previously mentioned validation error."""
# Create a kinesis firehose delivery stream event
firehose_event: dict = {
"invocationId": "00000000-0000-0000-0000-000000000000",
"sourceKinesisStreamArn": "arn:aws-us-gov:kinesis:us-gov-west-1:000000000000:stream/A",
"deliveryStreamArn": "arn:aws-us-gov:firehose:us-gov-west-1:000000000000:deliverystream/A",
"region": "us-gov-west-1",
"records": [
{
"recordId": "00000000000000000000000000000000000000000000000000000000000000",
"approximateArrivalTimestamp": 1697943843714,
"data": "YnVnIHJlcG9ydA==",
"kinesisRecordMetadata": {
"sequenceNumber": "00000000000000000000000000000000000000000000000000000000",
"subsequenceNumber": 0,
"partitionKey": "00000000000000000000000000000000",
"shardId": "shardId-000000000000",
"approximateArrivalTimestamp": 1697943843714
}
}
]
}
# Create a model from the event (this fails)
model: KinesisFirehoseModel = KinesisFirehoseModel.model_validate(firehose_event)
print(model)
return None
if __name__ == "__main__":
main()
Possible Solution
1. Overview
Update the Kinesis Firehose pydantic model and data class with the correct type hints.
2. Problematic Code
- https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py#L203-L209
- https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/parser/models/kinesis_firehose.py#L8-L13
3. Proposal for the Pydantic Model
class KinesisFirehoseRecordMetadata(BaseModel):
shardId: str
partitionKey: str
approximateArrivalTimestamp: PositiveInt
sequenceNumber: str
subsequenceNumber: int
4. Proposal for the Data Class Dictionary Wrapper
class KinesisFirehoseRecordMetadata(DictWrapper):
@property
def subsequence_number(self) -> int:
"""Kinesis stream sub-sequence number; present only when Kinesis Stream is source
Note: this will only be present for Kinesis streams using record aggregation
"""
return self._metadata["subsequenceNumber"]
5. Examples From Other Amazon/AWS Packages
- https://github.com/awslabs/amazon-kinesis-client-net/blob/master/ClientLibrary/inputs/Record.cs#L27-L32
- https://github.com/aws/aws-lambda-go/blob/main/events/firehose.go#L43-L49
- https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/kpl/ExtendedSequenceNumber.java#L37-L39
Steps to Reproduce
1. Overview
- Have an event from a Kinesis Firehose Delivery Stream.
- Validate the event with the
KinesisFirehoseModel
pydantic model. - Encounter an error when the
KinesisFirehoseRecordMetadata
fails to validate.
2. Sample Event
{
"invocationId": "00000000-0000-0000-0000-000000000000",
"sourceKinesisStreamArn": "arn:aws-us-gov:kinesis:us-gov-west-1:000000000000:stream/A",
"deliveryStreamArn": "arn:aws-us-gov:firehose:us-gov-west-1:000000000000:deliverystream/A",
"region": "us-gov-west-1",
"records": [
{
"recordId": "00000000000000000000000000000000000000000000000000000000000000",
"approximateArrivalTimestamp": 1697943843714,
"data": "YnVnIHJlcG9ydA==",
"kinesisRecordMetadata": {
"sequenceNumber": "00000000000000000000000000000000000000000000000000000000",
"subsequenceNumber": 0,
"partitionKey": "00000000000000000000000000000000",
"shardId": "shardId-000000000000",
"approximateArrivalTimestamp": 1697943843714
}
}
]
}
Powertools for AWS Lambda (Python) version
2.26.0
AWS Lambda function runtime
3.11
Packaging format used
PyPi
Debugging logs
N/A
Metadata
Metadata
Assignees
Type
Projects
Status