Skip to content

feat(event_source): add support for tumbling windows in Kinesis and DynamoDB events #6658

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
5236155
add data for tumbling window
kiitosu May 12, 2025
8b79b7b
add unit test
kiitosu May 13, 2025
81e8dfb
Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_ev…
kiitosu May 13, 2025
39c1fda
Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_ev…
kiitosu May 13, 2025
32fc93b
Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_ev…
kiitosu May 13, 2025
a7ea3e3
Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_ev…
kiitosu May 13, 2025
bff39f0
Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_ev…
kiitosu May 13, 2025
523435c
Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_ev…
kiitosu May 13, 2025
95809ff
Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_ev…
kiitosu May 13, 2025
af4c5e9
Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_ev…
kiitosu May 13, 2025
066cb2f
Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_ev…
kiitosu May 13, 2025
ac03550
Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_ev…
kiitosu May 13, 2025
44589bb
Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_ev…
kiitosu May 13, 2025
eb4bef7
Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_ev…
kiitosu May 13, 2025
c997c9b
Merge branch 'aws-powertools:develop' into feature/add_data_for_tumbl…
kiitosu May 13, 2025
3f07f43
fix window function
kiitosu May 13, 2025
a0adaee
add test test_kinesis_stream_with_tumbling_window_event
kiitosu May 13, 2025
b018614
add properties for tumbling window to dynamodb stream
kiitosu May 13, 2025
41adc0b
Update aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_…
kiitosu May 13, 2025
58ab8f1
Update aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_…
kiitosu May 13, 2025
d092a9e
Update aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_…
kiitosu May 13, 2025
fb77d5c
Converage
leandrodamascena May 14, 2025
f2406a1
Merge branch 'develop' into feature/add_data_for_tumbling_window
leandrodamascena May 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,25 @@ def user_identity(self) -> dict:
return self.get("userIdentity") or {}


class DynamoDBStreamWindow(DictWrapper):
@property
def start(self) -> str:
"""The time window started"""
return self["start"]

@property
def end(self) -> str:
"""The time window will end"""
return self["end"]


class DynamoDBStreamEvent(DictWrapper):
"""Dynamo DB Stream Event

Documentation:
-------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html
- https://docs.aws.amazon.com/lambda/latest/dg/services-ddb-windows.html

Example
-------
Expand All @@ -167,3 +180,30 @@ def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
def records(self) -> Iterator[DynamoDBRecord]:
for record in self["Records"]:
yield DynamoDBRecord(record)

@property
def window(self) -> DynamoDBStreamWindow | None:
window = self.get("window")
if window:
return DynamoDBStreamWindow(window)
return window

@property
def state(self) -> dict[str, Any]:
return self.get("state") or {}

@property
def shard_id(self) -> str | None:
return self.get("shardId")

@property
def event_source_arn(self) -> str | None:
return self.get("eventSourceARN")

@property
def is_final_invoke_for_window(self) -> bool | None:
return self.get("isFinalInvokeForWindow")

@property
def is_window_terminated_early(self) -> bool | None:
return self.get("isWindowTerminatedEarly")
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import base64
import json
import zlib
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any

from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import (
CloudWatchLogsDecodedData,
Expand Down Expand Up @@ -100,19 +100,59 @@ def kinesis(self) -> KinesisStreamRecordPayload:
return KinesisStreamRecordPayload(self["kinesis"])


class KinesisStreamWindow(DictWrapper):
@property
def start(self) -> str:
"""The time window started"""
return self["start"]

@property
def end(self) -> str:
"""The time window will end"""
return self["end"]


class KinesisStreamEvent(DictWrapper):
"""Kinesis stream event

Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html
- https://docs.aws.amazon.com/lambda/latest/dg/services-kinesis-windows.html
"""

@property
def records(self) -> Iterator[KinesisStreamRecord]:
for record in self["Records"]:
yield KinesisStreamRecord(record)

@property
def window(self) -> KinesisStreamWindow | None:
window = self.get("window")
if window:
return KinesisStreamWindow(window)
return window

@property
def state(self) -> dict[str, Any]:
return self.get("state") or {}

@property
def shard_id(self) -> str | None:
return self.get("shardId")

@property
def event_source_arn(self) -> str | None:
return self.get("eventSourceARN")

@property
def is_final_invoke_for_window(self) -> bool | None:
return self.get("isFinalInvokeForWindow")

@property
def is_window_terminated_early(self) -> bool | None:
return self.get("isWindowTerminatedEarly")


def extract_cloudwatch_logs_from_event(event: KinesisStreamEvent) -> list[CloudWatchLogsDecodedData]:
return [CloudWatchLogsDecodedData(record.kinesis.data_zlib_compressed_as_json()) for record in event.records]
Expand Down
101 changes: 101 additions & 0 deletions tests/events/dynamoStreamTumblingWindowEvent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
{
"Records": [
{
"eventID": "1",
"eventName": "INSERT",
"eventVersion": "1.0",
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-1",
"dynamodb": {
"Keys": {
"Id": {
"N": "101"
}
},
"NewImage": {
"Message": {
"S": "New item!"
},
"Id": {
"N": "101"
}
},
"SequenceNumber": "111",
"SizeBytes": 26,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"eventSourceARN": "stream-ARN"
},
{
"eventID": "2",
"eventName": "MODIFY",
"eventVersion": "1.0",
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-1",
"dynamodb": {
"Keys": {
"Id": {
"N": "101"
}
},
"NewImage": {
"Message": {
"S": "This item has changed"
},
"Id": {
"N": "101"
}
},
"OldImage": {
"Message": {
"S": "New item!"
},
"Id": {
"N": "101"
}
},
"SequenceNumber": "222",
"SizeBytes": 59,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"eventSourceARN": "stream-ARN"
},
{
"eventID": "3",
"eventName": "REMOVE",
"eventVersion": "1.0",
"eventSource": "aws:dynamodb",
"awsRegion": "us-east-1",
"dynamodb": {
"Keys": {
"Id": {
"N": "101"
}
},
"OldImage": {
"Message": {
"S": "This item has changed"
},
"Id": {
"N": "101"
}
},
"SequenceNumber": "333",
"SizeBytes": 38,
"StreamViewType": "NEW_AND_OLD_IMAGES"
},
"eventSourceARN": "stream-ARN"
}
],
"window": {
"start": "2020-07-30T17:00:00Z",
"end": "2020-07-30T17:05:00Z"
},
"state": {
"1": "state1"
},
"shardId": "shard123456789",
"eventSourceARN": "stream-ARN",
"isFinalInvokeForWindow": false,
"isWindowTerminatedEarly": false
}
14 changes: 13 additions & 1 deletion tests/events/kinesisStreamEvent.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,17 @@
"awsRegion": "us-east-2",
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
}
]
],
"window": {
"start": "2020-12-09T07:04:00Z",
"end": "2020-12-09T07:06:00Z"
},
"state": {
"1": 282,
"2": 715
},
"shardId": "shardId-000000000006",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream",
"isFinalInvokeForWindow": false,
"isWindowTerminatedEarly": false
}
33 changes: 33 additions & 0 deletions tests/events/kinesisStreamTumblingWindowEvent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@

{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "1",
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
"approximateArrivalTimestamp": 1607497475.000
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
"awsRegion": "us-east-1",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"
}
],
"window": {
"start": "2020-12-09T07:04:00Z",
"end": "2020-12-09T07:06:00Z"
},
"state": {
"1": 282,
"2": 715
},
"shardId": "shardId-000000000006",
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream",
"isFinalInvokeForWindow": false,
"isWindowTerminatedEarly": false
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,42 @@ def test_dynamodb_stream_trigger_event():
assert dynamodb.stream_view_type == StreamViewType.NEW_AND_OLD_IMAGES


def test_dynamodb_stream_trigger_with_tumbling_window_event():
raw_event = load_event("dynamoStreamTumblingWindowEvent.json")
parsed_event = DynamoDBStreamEvent(raw_event)

records = list(parsed_event.records)

record = records[0]
record_raw = raw_event["Records"][0]
assert record.aws_region == record_raw["awsRegion"]
assert record.event_id == record_raw["eventID"]
assert record.event_name is DynamoDBRecordEventName.INSERT
assert record.event_source == record_raw["eventSource"]
assert record.event_source_arn == record_raw["eventSourceARN"]
assert record.event_version == record_raw["eventVersion"]
assert record.user_identity == {}
dynamodb = record.dynamodb
assert dynamodb is not None
keys = dynamodb.keys
assert keys is not None
assert keys["Id"] == DECIMAL_CONTEXT.create_decimal(101)
assert dynamodb.new_image.get("Message") == record_raw["dynamodb"]["NewImage"]["Message"]["S"]
assert dynamodb.old_image == {}
assert dynamodb.sequence_number == record_raw["dynamodb"]["SequenceNumber"]
assert dynamodb.size_bytes == record_raw["dynamodb"]["SizeBytes"]
assert dynamodb.stream_view_type == StreamViewType.NEW_AND_OLD_IMAGES

assert parsed_event.window.raw_event == raw_event["window"]
assert parsed_event.window.start == raw_event["window"]["start"]
assert parsed_event.window.end == raw_event["window"]["end"]
assert parsed_event.state == raw_event["state"]
assert parsed_event.shard_id == raw_event["shardId"]
assert parsed_event.event_source_arn == raw_event["eventSourceARN"]
assert parsed_event.is_final_invoke_for_window == raw_event["isFinalInvokeForWindow"]
assert parsed_event.is_window_terminated_early == raw_event["isWindowTerminatedEarly"]


def test_dynamodb_stream_record_deserialization_large_int():
data = {
"Keys": {"key1": {"attr1": "value1"}},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ def test_kinesis_stream_event():
assert kinesis.data_as_bytes() == b"Hello, this is a test."
assert kinesis.data_as_text() == "Hello, this is a test."

assert parsed_event.window.raw_event == raw_event["window"]
assert parsed_event.state == raw_event["state"]
assert parsed_event.shard_id == raw_event["shardId"]
assert parsed_event.event_source_arn == raw_event["eventSourceARN"]
assert parsed_event.is_final_invoke_for_window == raw_event["isFinalInvokeForWindow"]
assert parsed_event.is_window_terminated_early == raw_event["isWindowTerminatedEarly"]


def test_kinesis_stream_event_json_data():
json_value = {"test": "value"}
Expand All @@ -56,3 +63,43 @@ def test_kinesis_stream_event_cloudwatch_logs_data_extraction():
individual_logs = [extract_cloudwatch_logs_from_record(record) for record in event.records]

assert len(extracted_logs) == len(individual_logs)


def test_kinesis_stream_with_tumbling_window_event():
raw_event = load_event("kinesisStreamTumblingWindowEvent.json")
parsed_event = KinesisStreamEvent(raw_event)

records = list(parsed_event.records)
assert len(records) == 1
record = records[0]

record_raw = raw_event["Records"][0]

assert record.aws_region == record_raw["awsRegion"]
assert record.event_id == record_raw["eventID"]
assert record.event_name == record_raw["eventName"]
assert record.event_source == record_raw["eventSource"]
assert record.event_source_arn == record_raw["eventSourceARN"]
assert record.event_version == record_raw["eventVersion"]
assert record.invoke_identity_arn == record_raw["invokeIdentityArn"]

kinesis = record.kinesis
kinesis_raw = raw_event["Records"][0]["kinesis"]

assert kinesis.approximate_arrival_timestamp == kinesis_raw["approximateArrivalTimestamp"]
assert kinesis.data == kinesis_raw["data"]
assert kinesis.kinesis_schema_version == kinesis_raw["kinesisSchemaVersion"]
assert kinesis.partition_key == kinesis_raw["partitionKey"]
assert kinesis.sequence_number == kinesis_raw["sequenceNumber"]

assert kinesis.data_as_bytes() == b"Hello, this is a test."
assert kinesis.data_as_text() == "Hello, this is a test."

assert parsed_event.window.raw_event == raw_event["window"]
assert parsed_event.window.start == raw_event["window"]["start"]
assert parsed_event.window.end == raw_event["window"]["end"]
assert parsed_event.state == raw_event["state"]
assert parsed_event.shard_id == raw_event["shardId"]
assert parsed_event.event_source_arn == raw_event["eventSourceARN"]
assert parsed_event.is_final_invoke_for_window == raw_event["isFinalInvokeForWindow"]
assert parsed_event.is_window_terminated_early == raw_event["isWindowTerminatedEarly"]
Loading