Skip to content

Commit 76f16be

Browse files
feat(event_source): add support for tumbling windows in Kinesis and DynamoDB events (#6658)
* add data for tumbling window * add unit test * Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena <[email protected]> Signed-off-by: kiitosu <[email protected]> * Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena <[email protected]> Signed-off-by: kiitosu <[email protected]> * Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena <[email protected]> Signed-off-by: kiitosu <[email protected]> * Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena <[email protected]> Signed-off-by: kiitosu <[email protected]> * Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena <[email protected]> Signed-off-by: kiitosu <[email protected]> * Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena <[email protected]> Signed-off-by: kiitosu <[email protected]> * Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena <[email protected]> Signed-off-by: kiitosu <[email protected]> * Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena <[email protected]> Signed-off-by: kiitosu <[email protected]> * Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena <[email protected]> Signed-off-by: kiitosu <[email protected]> * Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena <[email protected]> Signed-off-by: kiitosu <[email protected]> * Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena <[email protected]> Signed-off-by: kiitosu <[email protected]> * Update aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py Co-authored-by: Leandro Damascena <[email protected]> Signed-off-by: kiitosu <[email protected]> * fix window function * add test test_kinesis_stream_with_tumbling_window_event * add properties for tumbling window to dynamodb stream * Update aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py Co-authored-by: Leandro Damascena <[email protected]> Signed-off-by: kiitosu <[email protected]> * Update aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py Co-authored-by: Leandro Damascena <[email protected]> Signed-off-by: kiitosu <[email protected]> * Update aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py Co-authored-by: Leandro Damascena <[email protected]> Signed-off-by: kiitosu <[email protected]> * Converage --------- Signed-off-by: kiitosu <[email protected]> Co-authored-by: Leandro Damascena <[email protected]>
1 parent fd9a84d commit 76f16be

File tree

7 files changed

+311
-2
lines changed

7 files changed

+311
-2
lines changed

aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,25 @@ def user_identity(self) -> dict:
140140
return self.get("userIdentity") or {}
141141

142142

143+
class DynamoDBStreamWindow(DictWrapper):
144+
@property
145+
def start(self) -> str:
146+
"""The time window started"""
147+
return self["start"]
148+
149+
@property
150+
def end(self) -> str:
151+
"""The time window will end"""
152+
return self["end"]
153+
154+
143155
class DynamoDBStreamEvent(DictWrapper):
144156
"""Dynamo DB Stream Event
145157
146158
Documentation:
147159
-------------
148160
- https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html
161+
- https://docs.aws.amazon.com/lambda/latest/dg/services-ddb-windows.html
149162
150163
Example
151164
-------
@@ -167,3 +180,30 @@ def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
167180
def records(self) -> Iterator[DynamoDBRecord]:
168181
for record in self["Records"]:
169182
yield DynamoDBRecord(record)
183+
184+
@property
185+
def window(self) -> DynamoDBStreamWindow | None:
186+
window = self.get("window")
187+
if window:
188+
return DynamoDBStreamWindow(window)
189+
return window
190+
191+
@property
192+
def state(self) -> dict[str, Any]:
193+
return self.get("state") or {}
194+
195+
@property
196+
def shard_id(self) -> str | None:
197+
return self.get("shardId")
198+
199+
@property
200+
def event_source_arn(self) -> str | None:
201+
return self.get("eventSourceARN")
202+
203+
@property
204+
def is_final_invoke_for_window(self) -> bool | None:
205+
return self.get("isFinalInvokeForWindow")
206+
207+
@property
208+
def is_window_terminated_early(self) -> bool | None:
209+
return self.get("isWindowTerminatedEarly")

aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import base64
44
import json
55
import zlib
6-
from typing import TYPE_CHECKING
6+
from typing import TYPE_CHECKING, Any
77

88
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import (
99
CloudWatchLogsDecodedData,
@@ -100,19 +100,59 @@ def kinesis(self) -> KinesisStreamRecordPayload:
100100
return KinesisStreamRecordPayload(self["kinesis"])
101101

102102

103+
class KinesisStreamWindow(DictWrapper):
104+
@property
105+
def start(self) -> str:
106+
"""The time window started"""
107+
return self["start"]
108+
109+
@property
110+
def end(self) -> str:
111+
"""The time window will end"""
112+
return self["end"]
113+
114+
103115
class KinesisStreamEvent(DictWrapper):
104116
"""Kinesis stream event
105117
106118
Documentation:
107119
--------------
108120
- https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html
121+
- https://docs.aws.amazon.com/lambda/latest/dg/services-kinesis-windows.html
109122
"""
110123

111124
@property
112125
def records(self) -> Iterator[KinesisStreamRecord]:
113126
for record in self["Records"]:
114127
yield KinesisStreamRecord(record)
115128

129+
@property
130+
def window(self) -> KinesisStreamWindow | None:
131+
window = self.get("window")
132+
if window:
133+
return KinesisStreamWindow(window)
134+
return window
135+
136+
@property
137+
def state(self) -> dict[str, Any]:
138+
return self.get("state") or {}
139+
140+
@property
141+
def shard_id(self) -> str | None:
142+
return self.get("shardId")
143+
144+
@property
145+
def event_source_arn(self) -> str | None:
146+
return self.get("eventSourceARN")
147+
148+
@property
149+
def is_final_invoke_for_window(self) -> bool | None:
150+
return self.get("isFinalInvokeForWindow")
151+
152+
@property
153+
def is_window_terminated_early(self) -> bool | None:
154+
return self.get("isWindowTerminatedEarly")
155+
116156

117157
def extract_cloudwatch_logs_from_event(event: KinesisStreamEvent) -> list[CloudWatchLogsDecodedData]:
118158
return [CloudWatchLogsDecodedData(record.kinesis.data_zlib_compressed_as_json()) for record in event.records]
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
{
2+
"Records": [
3+
{
4+
"eventID": "1",
5+
"eventName": "INSERT",
6+
"eventVersion": "1.0",
7+
"eventSource": "aws:dynamodb",
8+
"awsRegion": "us-east-1",
9+
"dynamodb": {
10+
"Keys": {
11+
"Id": {
12+
"N": "101"
13+
}
14+
},
15+
"NewImage": {
16+
"Message": {
17+
"S": "New item!"
18+
},
19+
"Id": {
20+
"N": "101"
21+
}
22+
},
23+
"SequenceNumber": "111",
24+
"SizeBytes": 26,
25+
"StreamViewType": "NEW_AND_OLD_IMAGES"
26+
},
27+
"eventSourceARN": "stream-ARN"
28+
},
29+
{
30+
"eventID": "2",
31+
"eventName": "MODIFY",
32+
"eventVersion": "1.0",
33+
"eventSource": "aws:dynamodb",
34+
"awsRegion": "us-east-1",
35+
"dynamodb": {
36+
"Keys": {
37+
"Id": {
38+
"N": "101"
39+
}
40+
},
41+
"NewImage": {
42+
"Message": {
43+
"S": "This item has changed"
44+
},
45+
"Id": {
46+
"N": "101"
47+
}
48+
},
49+
"OldImage": {
50+
"Message": {
51+
"S": "New item!"
52+
},
53+
"Id": {
54+
"N": "101"
55+
}
56+
},
57+
"SequenceNumber": "222",
58+
"SizeBytes": 59,
59+
"StreamViewType": "NEW_AND_OLD_IMAGES"
60+
},
61+
"eventSourceARN": "stream-ARN"
62+
},
63+
{
64+
"eventID": "3",
65+
"eventName": "REMOVE",
66+
"eventVersion": "1.0",
67+
"eventSource": "aws:dynamodb",
68+
"awsRegion": "us-east-1",
69+
"dynamodb": {
70+
"Keys": {
71+
"Id": {
72+
"N": "101"
73+
}
74+
},
75+
"OldImage": {
76+
"Message": {
77+
"S": "This item has changed"
78+
},
79+
"Id": {
80+
"N": "101"
81+
}
82+
},
83+
"SequenceNumber": "333",
84+
"SizeBytes": 38,
85+
"StreamViewType": "NEW_AND_OLD_IMAGES"
86+
},
87+
"eventSourceARN": "stream-ARN"
88+
}
89+
],
90+
"window": {
91+
"start": "2020-07-30T17:00:00Z",
92+
"end": "2020-07-30T17:05:00Z"
93+
},
94+
"state": {
95+
"1": "state1"
96+
},
97+
"shardId": "shard123456789",
98+
"eventSourceARN": "stream-ARN",
99+
"isFinalInvokeForWindow": false,
100+
"isWindowTerminatedEarly": false
101+
}

tests/events/kinesisStreamEvent.json

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,17 @@
3232
"awsRegion": "us-east-2",
3333
"eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
3434
}
35-
]
35+
],
36+
"window": {
37+
"start": "2020-12-09T07:04:00Z",
38+
"end": "2020-12-09T07:06:00Z"
39+
},
40+
"state": {
41+
"1": 282,
42+
"2": 715
43+
},
44+
"shardId": "shardId-000000000006",
45+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream",
46+
"isFinalInvokeForWindow": false,
47+
"isWindowTerminatedEarly": false
3648
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
2+
{
3+
"Records": [
4+
{
5+
"kinesis": {
6+
"kinesisSchemaVersion": "1.0",
7+
"partitionKey": "1",
8+
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
9+
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
10+
"approximateArrivalTimestamp": 1607497475.000
11+
},
12+
"eventSource": "aws:kinesis",
13+
"eventVersion": "1.0",
14+
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
15+
"eventName": "aws:kinesis:record",
16+
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
17+
"awsRegion": "us-east-1",
18+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"
19+
}
20+
],
21+
"window": {
22+
"start": "2020-12-09T07:04:00Z",
23+
"end": "2020-12-09T07:06:00Z"
24+
},
25+
"state": {
26+
"1": 282,
27+
"2": 715
28+
},
29+
"shardId": "shardId-000000000006",
30+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream",
31+
"isFinalInvokeForWindow": false,
32+
"isWindowTerminatedEarly": false
33+
}

tests/unit/data_classes/required_dependencies/test_dynamo_db_stream_event.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,42 @@ def test_dynamodb_stream_trigger_event():
4646
assert dynamodb.stream_view_type == StreamViewType.NEW_AND_OLD_IMAGES
4747

4848

49+
def test_dynamodb_stream_trigger_with_tumbling_window_event():
50+
raw_event = load_event("dynamoStreamTumblingWindowEvent.json")
51+
parsed_event = DynamoDBStreamEvent(raw_event)
52+
53+
records = list(parsed_event.records)
54+
55+
record = records[0]
56+
record_raw = raw_event["Records"][0]
57+
assert record.aws_region == record_raw["awsRegion"]
58+
assert record.event_id == record_raw["eventID"]
59+
assert record.event_name is DynamoDBRecordEventName.INSERT
60+
assert record.event_source == record_raw["eventSource"]
61+
assert record.event_source_arn == record_raw["eventSourceARN"]
62+
assert record.event_version == record_raw["eventVersion"]
63+
assert record.user_identity == {}
64+
dynamodb = record.dynamodb
65+
assert dynamodb is not None
66+
keys = dynamodb.keys
67+
assert keys is not None
68+
assert keys["Id"] == DECIMAL_CONTEXT.create_decimal(101)
69+
assert dynamodb.new_image.get("Message") == record_raw["dynamodb"]["NewImage"]["Message"]["S"]
70+
assert dynamodb.old_image == {}
71+
assert dynamodb.sequence_number == record_raw["dynamodb"]["SequenceNumber"]
72+
assert dynamodb.size_bytes == record_raw["dynamodb"]["SizeBytes"]
73+
assert dynamodb.stream_view_type == StreamViewType.NEW_AND_OLD_IMAGES
74+
75+
assert parsed_event.window.raw_event == raw_event["window"]
76+
assert parsed_event.window.start == raw_event["window"]["start"]
77+
assert parsed_event.window.end == raw_event["window"]["end"]
78+
assert parsed_event.state == raw_event["state"]
79+
assert parsed_event.shard_id == raw_event["shardId"]
80+
assert parsed_event.event_source_arn == raw_event["eventSourceARN"]
81+
assert parsed_event.is_final_invoke_for_window == raw_event["isFinalInvokeForWindow"]
82+
assert parsed_event.is_window_terminated_early == raw_event["isWindowTerminatedEarly"]
83+
84+
4985
def test_dynamodb_stream_record_deserialization_large_int():
5086
data = {
5187
"Keys": {"key1": {"attr1": "value1"}},

tests/unit/data_classes/required_dependencies/test_kinesis_stream_event.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ def test_kinesis_stream_event():
4141
assert kinesis.data_as_bytes() == b"Hello, this is a test."
4242
assert kinesis.data_as_text() == "Hello, this is a test."
4343

44+
assert parsed_event.window.raw_event == raw_event["window"]
45+
assert parsed_event.state == raw_event["state"]
46+
assert parsed_event.shard_id == raw_event["shardId"]
47+
assert parsed_event.event_source_arn == raw_event["eventSourceARN"]
48+
assert parsed_event.is_final_invoke_for_window == raw_event["isFinalInvokeForWindow"]
49+
assert parsed_event.is_window_terminated_early == raw_event["isWindowTerminatedEarly"]
50+
4451

4552
def test_kinesis_stream_event_json_data():
4653
json_value = {"test": "value"}
@@ -56,3 +63,43 @@ def test_kinesis_stream_event_cloudwatch_logs_data_extraction():
5663
individual_logs = [extract_cloudwatch_logs_from_record(record) for record in event.records]
5764

5865
assert len(extracted_logs) == len(individual_logs)
66+
67+
68+
def test_kinesis_stream_with_tumbling_window_event():
69+
raw_event = load_event("kinesisStreamTumblingWindowEvent.json")
70+
parsed_event = KinesisStreamEvent(raw_event)
71+
72+
records = list(parsed_event.records)
73+
assert len(records) == 1
74+
record = records[0]
75+
76+
record_raw = raw_event["Records"][0]
77+
78+
assert record.aws_region == record_raw["awsRegion"]
79+
assert record.event_id == record_raw["eventID"]
80+
assert record.event_name == record_raw["eventName"]
81+
assert record.event_source == record_raw["eventSource"]
82+
assert record.event_source_arn == record_raw["eventSourceARN"]
83+
assert record.event_version == record_raw["eventVersion"]
84+
assert record.invoke_identity_arn == record_raw["invokeIdentityArn"]
85+
86+
kinesis = record.kinesis
87+
kinesis_raw = raw_event["Records"][0]["kinesis"]
88+
89+
assert kinesis.approximate_arrival_timestamp == kinesis_raw["approximateArrivalTimestamp"]
90+
assert kinesis.data == kinesis_raw["data"]
91+
assert kinesis.kinesis_schema_version == kinesis_raw["kinesisSchemaVersion"]
92+
assert kinesis.partition_key == kinesis_raw["partitionKey"]
93+
assert kinesis.sequence_number == kinesis_raw["sequenceNumber"]
94+
95+
assert kinesis.data_as_bytes() == b"Hello, this is a test."
96+
assert kinesis.data_as_text() == "Hello, this is a test."
97+
98+
assert parsed_event.window.raw_event == raw_event["window"]
99+
assert parsed_event.window.start == raw_event["window"]["start"]
100+
assert parsed_event.window.end == raw_event["window"]["end"]
101+
assert parsed_event.state == raw_event["state"]
102+
assert parsed_event.shard_id == raw_event["shardId"]
103+
assert parsed_event.event_source_arn == raw_event["eventSourceARN"]
104+
assert parsed_event.is_final_invoke_for_window == raw_event["isFinalInvokeForWindow"]
105+
assert parsed_event.is_window_terminated_early == raw_event["isWindowTerminatedEarly"]

0 commit comments

Comments
 (0)