Skip to content

Commit c7a584f

Browse files
authored
Merge pull request #118 from risenberg-cyberark/pydantic
feat: Advanced parser utility (pydantic)
2 parents d08de0b + 10d0079 commit c7a584f

File tree

23 files changed

+770
-91
lines changed

23 files changed

+770
-91
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ target:
44

55
dev:
66
pip install --upgrade pip poetry pre-commit
7-
poetry install
7+
poetry install --extras "pydantic"
88
pre-commit install
99

1010
dev-docs:
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
"""Advanced parser utility
2+
"""
3+
from .envelopes import Envelope, InvalidEnvelopeError, parse_envelope
4+
from .parser import parser
5+
6+
__all__ = ["InvalidEnvelopeError", "Envelope", "parse_envelope", "parser"]
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .envelopes import Envelope, InvalidEnvelopeError, parse_envelope
2+
3+
__all__ = ["InvalidEnvelopeError", "Envelope", "parse_envelope"]
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import logging
2+
from abc import ABC, abstractmethod
3+
from typing import Any, Dict
4+
5+
from pydantic import BaseModel, ValidationError
6+
7+
logger = logging.getLogger(__name__)
8+
9+
10+
class BaseEnvelope(ABC):
11+
def _parse_user_dict_schema(self, user_event: Dict[str, Any], schema: BaseModel) -> Any:
12+
if user_event is None:
13+
return None
14+
logger.debug("parsing user dictionary schema")
15+
try:
16+
return schema(**user_event)
17+
except (ValidationError, TypeError):
18+
logger.exception("Validation exception while extracting user custom schema")
19+
raise
20+
21+
def _parse_user_json_string_schema(self, user_event: str, schema: BaseModel) -> Any:
22+
if user_event is None:
23+
return None
24+
# this is used in cases where the underlying schema is not a Dict that can be parsed as baseModel
25+
# but a plain string i.e SQS has plain string payload
26+
if schema == str:
27+
logger.debug("input is string, returning")
28+
return user_event
29+
logger.debug("trying to parse as json encoded string")
30+
try:
31+
return schema.parse_raw(user_event)
32+
except (ValidationError, TypeError):
33+
logger.exception("Validation exception while extracting user custom schema")
34+
raise
35+
36+
@abstractmethod
37+
def parse(self, event: Dict[str, Any], schema: BaseModel):
38+
return NotImplemented
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import logging
2+
from typing import Any, Dict, List
3+
from typing_extensions import Literal
4+
5+
from pydantic import BaseModel, ValidationError
6+
7+
from aws_lambda_powertools.utilities.advanced_parser.envelopes.base import BaseEnvelope
8+
from aws_lambda_powertools.utilities.advanced_parser.schemas import DynamoDBSchema
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
# returns a List of dictionaries which each contains two keys, "NewImage" and "OldImage".
14+
# The values are the parsed schema models. The images' values can also be None.
15+
# Length of the list is the record's amount in the original event.
16+
class DynamoDBEnvelope(BaseEnvelope):
17+
def parse(self, event: Dict[str, Any], schema: BaseModel) -> List[Dict[Literal["NewImage", "OldImage"], BaseModel]]:
18+
try:
19+
parsed_envelope = DynamoDBSchema(**event)
20+
except (ValidationError, TypeError):
21+
logger.exception("Validation exception received from input dynamodb stream event")
22+
raise
23+
output = []
24+
for record in parsed_envelope.Records:
25+
output.append(
26+
{
27+
"NewImage": self._parse_user_dict_schema(record.dynamodb.NewImage, schema),
28+
"OldImage": self._parse_user_dict_schema(record.dynamodb.OldImage, schema),
29+
}
30+
)
31+
return output
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import logging
2+
from enum import Enum
3+
from typing import Any, Dict
4+
5+
from pydantic import BaseModel
6+
7+
from aws_lambda_powertools.utilities.advanced_parser.envelopes.base import BaseEnvelope
8+
from aws_lambda_powertools.utilities.advanced_parser.envelopes.dynamodb import DynamoDBEnvelope
9+
from aws_lambda_powertools.utilities.advanced_parser.envelopes.event_bridge import EventBridgeEnvelope
10+
from aws_lambda_powertools.utilities.advanced_parser.envelopes.sqs import SqsEnvelope
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
"""Built-in envelopes"""
16+
17+
18+
class Envelope(str, Enum):
19+
SQS = "sqs"
20+
EVENTBRIDGE = "eventbridge"
21+
DYNAMODB_STREAM = "dynamodb_stream"
22+
23+
24+
class InvalidEnvelopeError(Exception):
25+
"""Input envelope is not one of the Envelope enum values"""
26+
27+
28+
# enum to BaseEnvelope handler class
29+
__ENVELOPE_MAPPING = {
30+
Envelope.SQS: SqsEnvelope,
31+
Envelope.DYNAMODB_STREAM: DynamoDBEnvelope,
32+
Envelope.EVENTBRIDGE: EventBridgeEnvelope,
33+
}
34+
35+
36+
def parse_envelope(event: Dict[str, Any], envelope: Envelope, schema: BaseModel):
37+
envelope_handler: BaseEnvelope = __ENVELOPE_MAPPING.get(envelope)
38+
if envelope_handler is None:
39+
logger.exception("envelope must be an instance of Envelope enum")
40+
raise InvalidEnvelopeError("envelope must be an instance of Envelope enum")
41+
logger.debug(f"Parsing and validating event schema, envelope={str(envelope.value)}")
42+
return envelope_handler().parse(event=event, schema=schema)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import logging
2+
from typing import Any, Dict
3+
4+
from pydantic import BaseModel, ValidationError
5+
6+
from aws_lambda_powertools.utilities.advanced_parser.envelopes.base import BaseEnvelope
7+
from aws_lambda_powertools.utilities.advanced_parser.schemas import EventBridgeSchema
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
# returns a parsed BaseModel object according to schema type
13+
class EventBridgeEnvelope(BaseEnvelope):
14+
def parse(self, event: Dict[str, Any], schema: BaseModel) -> BaseModel:
15+
try:
16+
parsed_envelope = EventBridgeSchema(**event)
17+
except (ValidationError, TypeError):
18+
logger.exception("Validation exception received from input eventbridge event")
19+
raise
20+
return self._parse_user_dict_schema(parsed_envelope.detail, schema)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import logging
2+
from typing import Any, Dict, List, Union
3+
4+
from pydantic import BaseModel, ValidationError
5+
6+
from aws_lambda_powertools.utilities.advanced_parser.envelopes.base import BaseEnvelope
7+
from aws_lambda_powertools.utilities.advanced_parser.schemas import SqsSchema
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
# returns a list of parsed schemas of type BaseModel or plain string.
13+
# The record's body parameter is a string. However, it can also be a JSON encoded string which
14+
# can then be parsed into a BaseModel object.
15+
# Note that all records will be parsed the same way so if schema is str,
16+
# all the items in the list will be parsed as str and npt as JSON (and vice versa).
17+
class SqsEnvelope(BaseEnvelope):
18+
def parse(self, event: Dict[str, Any], schema: Union[BaseModel, str]) -> List[Union[BaseModel, str]]:
19+
try:
20+
parsed_envelope = SqsSchema(**event)
21+
except (ValidationError, TypeError):
22+
logger.exception("Validation exception received from input sqs event")
23+
raise
24+
output = []
25+
for record in parsed_envelope.Records:
26+
output.append(self._parse_user_json_string_schema(record.body, schema))
27+
return output
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import logging
2+
from typing import Any, Callable, Dict, Optional
3+
4+
from pydantic import BaseModel, ValidationError
5+
6+
from aws_lambda_powertools.middleware_factory import lambda_handler_decorator
7+
from aws_lambda_powertools.utilities.advanced_parser.envelopes import Envelope, parse_envelope
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
@lambda_handler_decorator
13+
def parser(
14+
handler: Callable[[Dict, Any], Any],
15+
event: Dict[str, Any],
16+
context: Dict[str, Any],
17+
schema: BaseModel,
18+
envelope: Optional[Envelope] = None,
19+
) -> Any:
20+
"""Decorator to conduct advanced parsing & validation for lambda handlers events
21+
22+
As Lambda follows (event, context) signature we can remove some of the boilerplate
23+
and also capture any exception any Lambda function throws as metadata.
24+
event will be the parsed and passed as a BaseModel pydantic class of the input type "schema"
25+
to the lambda handler.
26+
event will be extracted from the envelope in case envelope is not None.
27+
In case envelope is None, the complete event is parsed to match the schema parameter BaseModel definition.
28+
In case envelope is not None, first the event is parsed as the envelope's schema definition, and the user
29+
message is extracted and parsed again as the schema parameter's definition.
30+
31+
Example
32+
-------
33+
**Lambda function using validation decorator**
34+
35+
@parser(schema=MyBusiness, envelope=envelopes.EVENTBRIDGE)
36+
def handler(event: MyBusiness , context: LambdaContext):
37+
...
38+
39+
Parameters
40+
----------
41+
handler: input for lambda_handler_decorator, wraps the handler lambda
42+
event: AWS event dictionary
43+
context: AWS lambda context
44+
schema: pydantic BaseModel class. This is the user data schema that will replace the event.
45+
event parameter will be parsed and a new schema object will be created from it.
46+
envelope: what envelope to extract the schema from, can be any AWS service that is currently
47+
supported in the envelopes module. Can be None.
48+
49+
Raises
50+
------
51+
err
52+
TypeError - in case event is None
53+
pydantic.ValidationError - event fails validation, either of the envelope
54+
"""
55+
lambda_handler_name = handler.__name__
56+
parsed_event = None
57+
if envelope is None:
58+
try:
59+
logger.debug("Parsing and validating event schema, no envelope is used")
60+
parsed_event = schema(**event)
61+
except (ValidationError, TypeError):
62+
logger.exception("Validation exception received from input event")
63+
raise
64+
else:
65+
parsed_event = parse_envelope(event, envelope, schema)
66+
67+
logger.debug(f"Calling handler {lambda_handler_name}")
68+
return handler(parsed_event, context)
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from .dynamodb import DynamoDBSchema, DynamoRecordSchema, DynamoScheme
2+
from .event_bridge import EventBridgeSchema
3+
from .sqs import SqsRecordSchema, SqsSchema
4+
5+
__all__ = [
6+
"DynamoDBSchema",
7+
"EventBridgeSchema",
8+
"DynamoScheme",
9+
"DynamoRecordSchema",
10+
"SqsSchema",
11+
"SqsRecordSchema",
12+
]

0 commit comments

Comments
 (0)