-
Notifications
You must be signed in to change notification settings - Fork 433
feat(event_source): add Kinesis Firehose Data Transformation data class #3029
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
leandrodamascena
merged 52 commits into
aws-powertools:develop
from
roger-zhangg:kinesis
Sep 14, 2023
Merged
Changes from all commits
Commits
Show all changes
52 commits
Select commit
Hold shift + click to select a range
040358d
support kinesis response
roger-zhangg 6eb8530
Merge branch 'develop' into kinesis
leandrodamascena 7ca016e
fix lint, address Leandro suggestions
roger-zhangg 785a144
Merge branch 'develop' of https://github.com/aws-powertools/powertool…
roger-zhangg 708686f
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg 8c9db32
remove deleted const
roger-zhangg f02319f
fix Literal import in 3.7
roger-zhangg 5f55aa7
change to use data-classes
roger-zhangg 2566f62
fix mypy
roger-zhangg 636e9d1
fix typo, make asdict a function
roger-zhangg d3114c4
Merge branch 'develop' into kinesis
leandrodamascena 3fa7b2d
Merge branch 'develop' into kinesis
leandrodamascena 370e156
address Troy/Leandro suggestions
roger-zhangg 99837fc
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg 029a55c
Merge branch 'develop' into kinesis
leandrodamascena af1abfe
remove 6MB comment
roger-zhangg 2032146
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg 4168e21
Merge branch 'develop' into kinesis
leandrodamascena 312830b
fix comments
roger-zhangg a6c05a5
Merge branch 'kinesis' of https://github.com/roger-zhangg/aws-lambda-…
roger-zhangg a3ed9f9
address Heitor's suggestion
roger-zhangg bfbee60
data class default optimization
roger-zhangg 4016446
remove slot for static check
roger-zhangg 5dbb3ff
fix doc, example
roger-zhangg 5b8a9c6
Merge branch 'develop' into kinesis
leandrodamascena 95b3958
Merge branch 'develop' into kinesis
leandrodamascena e4d75d7
rename r->record
roger-zhangg f5ca27d
Merge branch 'kinesis' of github.com:roger-zhangg/aws-lambda-powertoo…
roger-zhangg 46fbe98
Merge branch 'develop' into kinesis
leandrodamascena ddd49d2
Merge branch 'develop' into kinesis
leandrodamascena ce6ed61
Addressing Heitor's feedback
leandrodamascena d8be53e
Addressing Heitor's feedback
leandrodamascena 3a11563
Addressing Heitor's feedback
leandrodamascena 17c6763
add result warning, add asdict test, metadata test
roger-zhangg 00051a8
Merge branch 'develop' into kinesis
roger-zhangg 455402a
Merge branch 'develop' into kinesis
leandrodamascena cad6c09
refactor: initial refactoring
heitorlessa 6809e0e
chore: branding
heitorlessa 42a0de9
refactor: use classvar and tuple for perf
heitorlessa d05ca07
chore: fix rebase issue
heitorlessa b20137a
chore: fix mypy tuple exactness type
heitorlessa ef12496
remove Ok in example response,add failure example
roger-zhangg f2d5e63
Merge branch 'kinesis' of github.com:roger-zhangg/aws-lambda-powertoo…
roger-zhangg 3e43a25
chore: clean up docs example
heitorlessa 4ed0e66
chore: lower cognitive overhead; add example docstring
heitorlessa d1fc1c5
add drop example
roger-zhangg eb39c03
Merge branch 'kinesis' of github.com:roger-zhangg/aws-lambda-powertoo…
roger-zhangg c039480
docs: give info upfront, name examples
heitorlessa e11c718
docs: improve transforming records example
heitorlessa 745492c
docs: improve dropping records example
heitorlessa 61ddee9
docs: improve exception example
heitorlessa 6f95e8f
Merge branch 'develop' into kinesis
leandrodamascena File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
"""Internal shared functions. Do not rely on it besides internal usage.""" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
"""Standalone functions to serialize/deserialize common data structures""" | ||
import base64 | ||
import json | ||
from typing import Any, Callable | ||
|
||
|
||
def base64_encode(data: str) -> str: | ||
"""Encode a string and returns Base64-encoded encoded value. | ||
|
||
Parameters | ||
---------- | ||
data: str | ||
The string to encode. | ||
|
||
Returns | ||
------- | ||
str | ||
The Base64-encoded encoded value. | ||
""" | ||
return base64.b64encode(data.encode()).decode("utf-8") | ||
|
||
|
||
def base64_decode(data: str) -> str: | ||
"""Decodes a Base64-encoded string and returns the decoded value. | ||
|
||
Parameters | ||
---------- | ||
data: str | ||
The Base64-encoded string to decode. | ||
|
||
Returns | ||
------- | ||
str | ||
The decoded string value. | ||
""" | ||
return base64.b64decode(data).decode("utf-8") | ||
|
||
|
||
def base64_from_str(data: str) -> str: | ||
"""Encode str as base64 string""" | ||
return base64.b64encode(data.encode()).decode("utf-8") | ||
|
||
|
||
def base64_from_json(data: Any, json_serializer: Callable[..., str] = json.dumps) -> str: | ||
"""Encode JSON serializable data as base64 string | ||
|
||
Parameters | ||
---------- | ||
data: Any | ||
JSON serializable (dict, list, boolean, etc.) | ||
json_serializer: Callable | ||
function to serialize `obj` to a JSON formatted `str`, by default json.dumps | ||
|
||
Returns | ||
------- | ||
str: | ||
JSON string as base64 string | ||
""" | ||
return base64_from_str(data=json_serializer(data)) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
26 changes: 13 additions & 13 deletions
26
examples/event_sources/src/kinesis_firehose_delivery_stream.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,28 +1,28 @@ | ||
import base64 | ||
import json | ||
|
||
from aws_lambda_powertools.utilities.data_classes import ( | ||
KinesisFirehoseDataTransformationResponse, | ||
KinesisFirehoseEvent, | ||
event_source, | ||
) | ||
from aws_lambda_powertools.utilities.serialization import base64_from_json | ||
from aws_lambda_powertools.utilities.typing import LambdaContext | ||
|
||
|
||
@event_source(data_class=KinesisFirehoseEvent) | ||
def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext): | ||
result = [] | ||
result = KinesisFirehoseDataTransformationResponse() | ||
|
||
for record in event.records: | ||
# if data was delivered as json; caches loaded value | ||
data = record.data_as_json | ||
# get original data using data_as_text property | ||
data = record.data_as_text # (1)! | ||
|
||
## generate data to return | ||
transformed_data = {"new_data": "transformed data using Powertools", "original_payload": data} | ||
|
||
processed_record = { | ||
"recordId": record.record_id, | ||
"data": base64.b64encode(json.dumps(data).encode("utf-8")), | ||
"result": "Ok", | ||
} | ||
processed_record = record.build_data_transformation_response( | ||
data=base64_from_json(transformed_data), # (2)! | ||
) | ||
|
||
result.append(processed_record) | ||
result.add_record(processed_record) | ||
|
||
# return transformed records | ||
return {"records": result} | ||
return result.asdict() |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.