diff --git a/aws_lambda_powertools/utilities/data_classes/api_gateway_authorizer_event.py b/aws_lambda_powertools/utilities/data_classes/api_gateway_authorizer_event.py new file mode 100644 index 00000000000..7c81678f305 --- /dev/null +++ b/aws_lambda_powertools/utilities/data_classes/api_gateway_authorizer_event.py @@ -0,0 +1,479 @@ +import re +from typing import Any, Dict, List, Optional + +from aws_lambda_powertools.utilities.data_classes.common import ( + BaseRequestContext, + BaseRequestContextV2, + DictWrapper, + get_header_value, +) + + +class APIGatewayRouteArn: + """A parsed route arn""" + + def __init__( + self, + region: str, + aws_account_id: str, + api_id: str, + stage: str, + http_method: str, + resource: str, + ): + self.partition = "aws" + self.region = region + self.aws_account_id = aws_account_id + self.api_id = api_id + self.stage = stage + self.http_method = http_method + self.resource = resource + + @property + def arn(self) -> str: + """Build an arn from it's parts + eg: arn:aws:execute-api:us-east-1:123456789012:abcdef123/test/GET/request""" + return ( + f"arn:{self.partition}:execute-api:{self.region}:{self.aws_account_id}:{self.api_id}/{self.stage}/" + f"{self.http_method}/{self.resource}" + ) + + +def parse_api_gateway_arn(arn: str) -> APIGatewayRouteArn: + """Parses a gateway route arn as a APIGatewayRouteArn class + + Parameters + ---------- + arn : str + ARN string for a methodArn or a routeArn + Returns + ------- + APIGatewayRouteArn + """ + arn_parts = arn.split(":") + api_gateway_arn_parts = arn_parts[5].split("/") + return APIGatewayRouteArn( + region=arn_parts[3], + aws_account_id=arn_parts[4], + api_id=api_gateway_arn_parts[0], + stage=api_gateway_arn_parts[1], + http_method=api_gateway_arn_parts[2], + resource=api_gateway_arn_parts[3] if len(api_gateway_arn_parts) == 4 else "", + ) + + +class APIGatewayAuthorizerTokenEvent(DictWrapper): + """API Gateway Authorizer Token Event Format 1.0 + + Documentation: + ------------- + - https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-use-lambda-authorizer.html + """ + + @property + def get_type(self) -> str: + return self["type"] + + @property + def authorization_token(self) -> str: + return self["authorizationToken"] + + @property + def method_arn(self) -> str: + """ARN of the incoming method request and is populated by API Gateway in accordance with the Lambda authorizer + configuration""" + return self["methodArn"] + + @property + def parsed_arn(self) -> APIGatewayRouteArn: + """Convenient property to return a parsed api gateway method arn""" + return parse_api_gateway_arn(self.method_arn) + + +class APIGatewayAuthorizerRequestEvent(DictWrapper): + """API Gateway Authorizer Request Event Format 1.0 + + Documentation: + ------------- + - https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-use-lambda-authorizer.html + - https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-lambda-authorizer.html + """ + + @property + def version(self) -> str: + return self["version"] + + @property + def get_type(self) -> str: + return self["type"] + + @property + def method_arn(self) -> str: + return self["methodArn"] + + @property + def parsed_arn(self) -> APIGatewayRouteArn: + return parse_api_gateway_arn(self.method_arn) + + @property + def identity_source(self) -> str: + return self["identitySource"] + + @property + def authorization_token(self) -> str: + return self["authorizationToken"] + + @property + def resource(self) -> str: + return self["resource"] + + @property + def path(self) -> str: + return self["path"] + + @property + def http_method(self) -> str: + return self["httpMethod"] + + @property + def headers(self) -> Dict[str, str]: + return self["headers"] + + @property + def query_string_parameters(self) -> Dict[str, str]: + return self["queryStringParameters"] + + @property + def path_parameters(self) -> Dict[str, str]: + return self["pathParameters"] + + @property + def stage_variables(self) -> Dict[str, str]: + return self["stageVariables"] + + @property + def request_context(self) -> BaseRequestContext: + return BaseRequestContext(self._data) + + def get_header_value( + self, name: str, default_value: Optional[str] = None, case_sensitive: Optional[bool] = False + ) -> Optional[str]: + """Get header value by name + + Parameters + ---------- + name: str + Header name + default_value: str, optional + Default value if no value was found by name + case_sensitive: bool + Whether to use a case sensitive look up + Returns + ------- + str, optional + Header value + """ + return get_header_value(self.headers, name, default_value, case_sensitive) + + +class APIGatewayAuthorizerEventV2(DictWrapper): + """API Gateway Authorizer Event Format 2.0 + + Documentation: + ------------- + - https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-lambda-authorizer.html + - https://aws.amazon.com/blogs/compute/introducing-iam-and-lambda-authorizers-for-amazon-api-gateway-http-apis/ + """ + + @property + def version(self) -> str: + """Event payload version should always be 2.0""" + return self["version"] + + @property + def get_type(self) -> str: + """Event type should always be request""" + return self["type"] + + @property + def route_arn(self) -> str: + """ARN of the route being called + + eg: arn:aws:execute-api:us-east-1:123456789012:abcdef123/test/GET/request""" + return self["routeArn"] + + @property + def parsed_arn(self) -> APIGatewayRouteArn: + """Convenient property to return a parsed api gateway route arn""" + return parse_api_gateway_arn(self.route_arn) + + @property + def identity_source(self) -> Optional[List[str]]: + """The identity source for which authorization is requested. + + For a REQUEST authorizer, this is optional. The value is a set of one or more mapping expressions of the + specified request parameters. The identity source can be headers, query string parameters, stage variables, + and context parameters. + """ + return self.get("identitySource") + + @property + def route_key(self) -> str: + """The route key for the route. For HTTP APIs, the route key can be either $default, + or a combination of an HTTP method and resource path, for example, GET /pets.""" + return self["routeKey"] + + @property + def raw_path(self) -> str: + return self["rawPath"] + + @property + def raw_query_string(self) -> str: + return self["rawQueryString"] + + @property + def cookies(self) -> List[str]: + return self["cookies"] + + @property + def headers(self) -> Dict[str, str]: + return self["headers"] + + @property + def query_string_parameters(self) -> Dict[str, str]: + return self["queryStringParameters"] + + @property + def request_context(self) -> BaseRequestContextV2: + return BaseRequestContextV2(self._data) + + @property + def path_parameters(self) -> Optional[Dict[str, str]]: + return self.get("pathParameters") + + @property + def stage_variables(self) -> Optional[Dict[str, str]]: + return self.get("stageVariables") + + def get_header_value( + self, name: str, default_value: Optional[str] = None, case_sensitive: Optional[bool] = False + ) -> Optional[str]: + """Get header value by name + + Parameters + ---------- + name: str + Header name + default_value: str, optional + Default value if no value was found by name + case_sensitive: bool + Whether to use a case sensitive look up + Returns + ------- + str, optional + Header value + """ + return get_header_value(self.headers, name, default_value, case_sensitive) + + +class APIGatewayAuthorizerResponseV2: + """Api Gateway HTTP API V2 payload authorizer simple response helper + + Parameters + ---------- + authorize: bool + authorize is a boolean value indicating if the value in authorizationToken + is authorized to make calls to the GraphQL API. If this value is + true, execution of the GraphQL API continues. If this value is false, + an UnauthorizedException is raised + context: Dict[str, Any], optional + A JSON object visible as `event.requestContext.authorizer` lambda event + + The context object only supports key-value pairs. Nested keys are not supported. + + Warning: The total size of this JSON object must not exceed 5MB. + """ + + def __init__( + self, + authorize: bool = False, + context: Optional[Dict[str, Any]] = None, + ): + self.authorize = authorize + self.context = context + + def asdict(self) -> dict: + """Return the response as a dict""" + response: Dict = {"isAuthorized": self.authorize} + + if self.context: + response["context"] = self.context + + return response + + +class HttpVerb: + GET = "GET" + POST = "POST" + PUT = "PUT" + PATCH = "PATCH" + HEAD = "HEAD" + DELETE = "DELETE" + OPTIONS = "OPTIONS" + ALL = "*" + + +class APIGatewayAuthorizerResponse: + """Api Gateway HTTP API V1 payload or Rest api authorizer response helper + + Based on: - https://github.com/awslabs/aws-apigateway-lambda-authorizer-blueprints/blob/\ + master/blueprints/python/api-gateway-authorizer-python.py + """ + + version = "2012-10-17" + """The policy version used for the evaluation. This should always be '2012-10-17'""" + + path_regex = r"^[/.a-zA-Z0-9-\*]+$" + """The regular expression used to validate resource paths for the policy""" + + def __init__( + self, + principal_id: str, + region: str, + aws_account_id: str, + api_id: str, + stage: str, + context: Optional[Dict] = None, + ): + """ + Parameters + ---------- + principal_id : str + The principal used for the policy, this should be a unique identifier for the end user + region : str + AWS Regions. Beware of using '*' since it will not simply mean any region, because stars will greedily + expand over '/' or other separators. + See https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_resource.html for more + details. + aws_account_id : str + The AWS account id the policy will be generated for. This is used to create the method ARNs. + api_id : str + The API Gateway API id to be used in the policy. + Beware of using '*' since it will not simply mean any API Gateway API id, because stars will greedily + expand over '/' or other separators. + See https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_resource.html for more + details. + stage : str + The default stage to be used in the policy. + Beware of using '*' since it will not simply mean any stage, because stars will + greedily expand over '/' or other separators. + See https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements_resource.html for more + details. + context : Dict, optional + Optional, context. + Note: only names of type string and values of type int, string or boolean are supported + """ + self.principal_id = principal_id + self.region = region + self.aws_account_id = aws_account_id + self.api_id = api_id + self.stage = stage + self.context = context + self._allow_routes: List[Dict] = [] + self._deny_routes: List[Dict] = [] + + def _add_route(self, effect: str, verb: str, resource: str, conditions: List[Dict]): + """Adds a route to the internal lists of allowed or denied routes. Each object in + the internal list contains a resource ARN and a condition statement. The condition + statement can be null.""" + if verb != "*" and not hasattr(HttpVerb, verb): + raise ValueError(f"Invalid HTTP verb {verb}. Allowed verbs in HttpVerb class") + + resource_pattern = re.compile(self.path_regex) + if not resource_pattern.match(resource): + raise ValueError(f"Invalid resource path: {resource}. Path should match {self.path_regex}") + + if resource[:1] == "/": + resource = resource[1:] + + resource_arn = APIGatewayRouteArn(self.region, self.aws_account_id, self.api_id, self.stage, verb, resource).arn + + route = {"resourceArn": resource_arn, "conditions": conditions} + + if effect.lower() == "allow": + self._allow_routes.append(route) + else: # deny + self._deny_routes.append(route) + + @staticmethod + def _get_empty_statement(effect: str) -> Dict[str, Any]: + """Returns an empty statement object prepopulated with the correct action and the desired effect.""" + return {"Action": "execute-api:Invoke", "Effect": effect.capitalize(), "Resource": []} + + def _get_statement_for_effect(self, effect: str, methods: List) -> List: + """This function loops over an array of objects containing a resourceArn and + conditions statement and generates the array of statements for the policy.""" + if len(methods) == 0: + return [] + + statements = [] + + statement = self._get_empty_statement(effect) + for method in methods: + if method["conditions"] is None or len(method["conditions"]) == 0: + statement["Resource"].append(method["resourceArn"]) + else: + conditional_statement = self._get_empty_statement(effect) + conditional_statement["Resource"].append(method["resourceArn"]) + conditional_statement["Condition"] = method["conditions"] + statements.append(conditional_statement) + + if len(statement["Resource"]) > 0: + statements.append(statement) + + return statements + + def allow_all_routes(self): + """Adds a '*' allow to the policy to authorize access to all methods of an API""" + self._add_route("Allow", HttpVerb.ALL, "*", []) + + def deny_all_routes(self): + """Adds a '*' allow to the policy to deny access to all methods of an API""" + self._add_route("Deny", HttpVerb.ALL, "*", []) + + def allow_route(self, http_method: str, resource: str, conditions: Optional[List[Dict]] = None): + """Adds an API Gateway method (Http verb + Resource path) to the list of allowed + methods for the policy. + + Optionally includes a condition for the policy statement. More on AWS policy + conditions here: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition""" + self._add_route("Allow", http_method, resource, conditions or []) + + def deny_route(self, http_method: str, resource: str, conditions: Optional[List[Dict]] = None): + """Adds an API Gateway method (Http verb + Resource path) to the list of denied + methods for the policy. + + Optionally includes a condition for the policy statement. More on AWS policy + conditions here: https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_policies_elements.html#Condition""" + self._add_route("Deny", http_method, resource, conditions or []) + + def asdict(self) -> Dict[str, Any]: + """Generates the policy document based on the internal lists of allowed and denied + conditions. This will generate a policy with two main statements for the effect: + one statement for Allow and one statement for Deny. + Methods that includes conditions will have their own statement in the policy.""" + if len(self._allow_routes) == 0 and len(self._deny_routes) == 0: + raise ValueError("No statements defined for the policy") + + response: Dict[str, Any] = { + "principalId": self.principal_id, + "policyDocument": {"Version": self.version, "Statement": []}, + } + + response["policyDocument"]["Statement"].extend(self._get_statement_for_effect("Allow", self._allow_routes)) + response["policyDocument"]["Statement"].extend(self._get_statement_for_effect("Deny", self._deny_routes)) + + if self.context: + response["context"] = self.context + + return response diff --git a/aws_lambda_powertools/utilities/data_classes/api_gateway_proxy_event.py b/aws_lambda_powertools/utilities/data_classes/api_gateway_proxy_event.py index 66908b98ec1..34ac8d83993 100644 --- a/aws_lambda_powertools/utilities/data_classes/api_gateway_proxy_event.py +++ b/aws_lambda_powertools/utilities/data_classes/api_gateway_proxy_event.py @@ -1,82 +1,11 @@ from typing import Any, Dict, List, Optional -from aws_lambda_powertools.utilities.data_classes.common import BaseProxyEvent, DictWrapper - - -class APIGatewayEventIdentity(DictWrapper): - @property - def access_key(self) -> Optional[str]: - return self["requestContext"]["identity"].get("accessKey") - - @property - def account_id(self) -> Optional[str]: - """The AWS account ID associated with the request.""" - return self["requestContext"]["identity"].get("accountId") - - @property - def api_key(self) -> Optional[str]: - """For API methods that require an API key, this variable is the API key associated with the method request. - For methods that don't require an API key, this variable is null.""" - return self["requestContext"]["identity"].get("apiKey") - - @property - def api_key_id(self) -> Optional[str]: - """The API key ID associated with an API request that requires an API key.""" - return self["requestContext"]["identity"].get("apiKeyId") - - @property - def caller(self) -> Optional[str]: - """The principal identifier of the caller making the request.""" - return self["requestContext"]["identity"].get("caller") - - @property - def cognito_authentication_provider(self) -> Optional[str]: - """A comma-separated list of the Amazon Cognito authentication providers used by the caller - making the request. Available only if the request was signed with Amazon Cognito credentials.""" - return self["requestContext"]["identity"].get("cognitoAuthenticationProvider") - - @property - def cognito_authentication_type(self) -> Optional[str]: - """The Amazon Cognito authentication type of the caller making the request. - Available only if the request was signed with Amazon Cognito credentials.""" - return self["requestContext"]["identity"].get("cognitoAuthenticationType") - - @property - def cognito_identity_id(self) -> Optional[str]: - """The Amazon Cognito identity ID of the caller making the request. - Available only if the request was signed with Amazon Cognito credentials.""" - return self["requestContext"]["identity"].get("cognitoIdentityId") - - @property - def cognito_identity_pool_id(self) -> Optional[str]: - """The Amazon Cognito identity pool ID of the caller making the request. - Available only if the request was signed with Amazon Cognito credentials.""" - return self["requestContext"]["identity"].get("cognitoIdentityPoolId") - - @property - def principal_org_id(self) -> Optional[str]: - """The AWS organization ID.""" - return self["requestContext"]["identity"].get("principalOrgId") - - @property - def source_ip(self) -> str: - """The source IP address of the TCP connection making the request to API Gateway.""" - return self["requestContext"]["identity"]["sourceIp"] - - @property - def user(self) -> Optional[str]: - """The principal identifier of the user making the request.""" - return self["requestContext"]["identity"].get("user") - - @property - def user_agent(self) -> Optional[str]: - """The User Agent of the API caller.""" - return self["requestContext"]["identity"].get("userAgent") - - @property - def user_arn(self) -> Optional[str]: - """The Amazon Resource Name (ARN) of the effective user identified after authentication.""" - return self["requestContext"]["identity"].get("userArn") +from aws_lambda_powertools.utilities.data_classes.common import ( + BaseProxyEvent, + BaseRequestContext, + BaseRequestContextV2, + DictWrapper, +) class APIGatewayEventAuthorizer(DictWrapper): @@ -89,21 +18,7 @@ def scopes(self) -> Optional[List[str]]: return self["requestContext"]["authorizer"].get("scopes") -class APIGatewayEventRequestContext(DictWrapper): - @property - def account_id(self) -> str: - """The AWS account ID associated with the request.""" - return self["requestContext"]["accountId"] - - @property - def api_id(self) -> str: - """The identifier API Gateway assigns to your API.""" - return self["requestContext"]["apiId"] - - @property - def authorizer(self) -> APIGatewayEventAuthorizer: - return APIGatewayEventAuthorizer(self._data) - +class APIGatewayEventRequestContext(BaseRequestContext): @property def connected_at(self) -> Optional[int]: """The Epoch-formatted connection time. (WebSocket API)""" @@ -114,40 +29,11 @@ def connection_id(self) -> Optional[str]: """A unique ID for the connection that can be used to make a callback to the client. (WebSocket API)""" return self["requestContext"].get("connectionId") - @property - def domain_name(self) -> Optional[str]: - """A domain name""" - return self["requestContext"].get("domainName") - - @property - def domain_prefix(self) -> Optional[str]: - return self["requestContext"].get("domainPrefix") - @property def event_type(self) -> Optional[str]: """The event type: `CONNECT`, `MESSAGE`, or `DISCONNECT`. (WebSocket API)""" return self["requestContext"].get("eventType") - @property - def extended_request_id(self) -> Optional[str]: - """An automatically generated ID for the API call, which contains more useful information - for debugging/troubleshooting.""" - return self["requestContext"].get("extendedRequestId") - - @property - def protocol(self) -> str: - """The request protocol, for example, HTTP/1.1.""" - return self["requestContext"]["protocol"] - - @property - def http_method(self) -> str: - """The HTTP method used. Valid values include: DELETE, GET, HEAD, OPTIONS, PATCH, POST, and PUT.""" - return self["requestContext"]["httpMethod"] - - @property - def identity(self) -> APIGatewayEventIdentity: - return APIGatewayEventIdentity(self._data) - @property def message_direction(self) -> Optional[str]: """Message direction (WebSocket API)""" @@ -159,36 +45,9 @@ def message_id(self) -> Optional[str]: return self["requestContext"].get("messageId") @property - def path(self) -> str: - return self["requestContext"]["path"] - - @property - def stage(self) -> str: - """The deployment stage of the API request""" - return self["requestContext"]["stage"] - - @property - def request_id(self) -> str: - """The ID that API Gateway assigns to the API request.""" - return self["requestContext"]["requestId"] - - @property - def request_time(self) -> Optional[str]: - """The CLF-formatted request time (dd/MMM/yyyy:HH:mm:ss +-hhmm)""" - return self["requestContext"].get("requestTime") - - @property - def request_time_epoch(self) -> int: - """The Epoch-formatted request time.""" - return self["requestContext"]["requestTimeEpoch"] - - @property - def resource_id(self) -> str: - return self["requestContext"]["resourceId"] - - @property - def resource_path(self) -> str: - return self["requestContext"]["resourcePath"] + def operation_name(self) -> Optional[str]: + """The name of the operation being performed""" + return self["requestContext"].get("operationName") @property def route_key(self) -> Optional[str]: @@ -196,9 +55,8 @@ def route_key(self) -> Optional[str]: return self["requestContext"].get("routeKey") @property - def operation_name(self) -> Optional[str]: - """The name of the operation being performed""" - return self["requestContext"].get("operationName") + def authorizer(self) -> APIGatewayEventAuthorizer: + return APIGatewayEventAuthorizer(self._data) class APIGatewayProxyEvent(BaseProxyEvent): @@ -238,31 +96,6 @@ def stage_variables(self) -> Optional[Dict[str, str]]: return self.get("stageVariables") -class RequestContextV2Http(DictWrapper): - @property - def method(self) -> str: - return self["requestContext"]["http"]["method"] - - @property - def path(self) -> str: - return self["requestContext"]["http"]["path"] - - @property - def protocol(self) -> str: - """The request protocol, for example, HTTP/1.1.""" - return self["requestContext"]["http"]["protocol"] - - @property - def source_ip(self) -> str: - """The source IP address of the TCP connection making the request to API Gateway.""" - return self["requestContext"]["http"]["sourceIp"] - - @property - def user_agent(self) -> str: - """The User Agent of the API caller.""" - return self["requestContext"]["http"]["userAgent"] - - class RequestContextV2AuthorizerIam(DictWrapper): @property def access_key(self) -> Optional[str]: @@ -334,60 +167,12 @@ def iam(self) -> Optional[RequestContextV2AuthorizerIam]: return None if iam is None else RequestContextV2AuthorizerIam(iam) -class RequestContextV2(DictWrapper): - @property - def account_id(self) -> str: - """The AWS account ID associated with the request.""" - return self["requestContext"]["accountId"] - - @property - def api_id(self) -> str: - """The identifier API Gateway assigns to your API.""" - return self["requestContext"]["apiId"] - +class RequestContextV2(BaseRequestContextV2): @property def authorizer(self) -> Optional[RequestContextV2Authorizer]: authorizer = self["requestContext"].get("authorizer") return None if authorizer is None else RequestContextV2Authorizer(authorizer) - @property - def domain_name(self) -> str: - """A domain name""" - return self["requestContext"]["domainName"] - - @property - def domain_prefix(self) -> str: - return self["requestContext"]["domainPrefix"] - - @property - def http(self) -> RequestContextV2Http: - return RequestContextV2Http(self._data) - - @property - def request_id(self) -> str: - """The ID that API Gateway assigns to the API request.""" - return self["requestContext"]["requestId"] - - @property - def route_key(self) -> str: - """The selected route key.""" - return self["requestContext"]["routeKey"] - - @property - def stage(self) -> str: - """The deployment stage of the API request""" - return self["requestContext"]["stage"] - - @property - def time(self) -> str: - """The CLF-formatted request time (dd/MMM/yyyy:HH:mm:ss +-hhmm).""" - return self["requestContext"]["time"] - - @property - def time_epoch(self) -> int: - """The Epoch-formatted request time.""" - return self["requestContext"]["timeEpoch"] - class APIGatewayProxyEventV2(BaseProxyEvent): """AWS Lambda proxy V2 event diff --git a/aws_lambda_powertools/utilities/data_classes/common.py b/aws_lambda_powertools/utilities/data_classes/common.py index fbf0502125e..566e1c56259 100644 --- a/aws_lambda_powertools/utilities/data_classes/common.py +++ b/aws_lambda_powertools/utilities/data_classes/common.py @@ -120,3 +120,274 @@ def get_header_value( Header value """ return get_header_value(self.headers, name, default_value, case_sensitive) + + +class RequestContextClientCert(DictWrapper): + @property + def client_cert_pem(self) -> str: + """Client certificate pem""" + return self["clientCertPem"] + + @property + def issuer_dn(self) -> str: + """Issuer Distinguished Name""" + return self["issuerDN"] + + @property + def serial_number(self) -> str: + """Unique serial number for client cert""" + return self["serialNumber"] + + @property + def subject_dn(self) -> str: + """Subject Distinguished Name""" + return self["subjectDN"] + + @property + def validity_not_after(self) -> str: + """Date when the cert is no longer valid + + eg: Aug 5 00:28:21 2120 GMT""" + return self["validity"]["notAfter"] + + @property + def validity_not_before(self) -> str: + """Cert is not valid before this date + + eg: Aug 29 00:28:21 2020 GMT""" + return self["validity"]["notBefore"] + + +class APIGatewayEventIdentity(DictWrapper): + @property + def access_key(self) -> Optional[str]: + return self["requestContext"]["identity"].get("accessKey") + + @property + def account_id(self) -> Optional[str]: + """The AWS account ID associated with the request.""" + return self["requestContext"]["identity"].get("accountId") + + @property + def api_key(self) -> Optional[str]: + """For API methods that require an API key, this variable is the API key associated with the method request. + For methods that don't require an API key, this variable is null.""" + return self["requestContext"]["identity"].get("apiKey") + + @property + def api_key_id(self) -> Optional[str]: + """The API key ID associated with an API request that requires an API key.""" + return self["requestContext"]["identity"].get("apiKeyId") + + @property + def caller(self) -> Optional[str]: + """The principal identifier of the caller making the request.""" + return self["requestContext"]["identity"].get("caller") + + @property + def cognito_authentication_provider(self) -> Optional[str]: + """A comma-separated list of the Amazon Cognito authentication providers used by the caller + making the request. Available only if the request was signed with Amazon Cognito credentials.""" + return self["requestContext"]["identity"].get("cognitoAuthenticationProvider") + + @property + def cognito_authentication_type(self) -> Optional[str]: + """The Amazon Cognito authentication type of the caller making the request. + Available only if the request was signed with Amazon Cognito credentials.""" + return self["requestContext"]["identity"].get("cognitoAuthenticationType") + + @property + def cognito_identity_id(self) -> Optional[str]: + """The Amazon Cognito identity ID of the caller making the request. + Available only if the request was signed with Amazon Cognito credentials.""" + return self["requestContext"]["identity"].get("cognitoIdentityId") + + @property + def cognito_identity_pool_id(self) -> Optional[str]: + """The Amazon Cognito identity pool ID of the caller making the request. + Available only if the request was signed with Amazon Cognito credentials.""" + return self["requestContext"]["identity"].get("cognitoIdentityPoolId") + + @property + def principal_org_id(self) -> Optional[str]: + """The AWS organization ID.""" + return self["requestContext"]["identity"].get("principalOrgId") + + @property + def source_ip(self) -> str: + """The source IP address of the TCP connection making the request to API Gateway.""" + return self["requestContext"]["identity"]["sourceIp"] + + @property + def user(self) -> Optional[str]: + """The principal identifier of the user making the request.""" + return self["requestContext"]["identity"].get("user") + + @property + def user_agent(self) -> Optional[str]: + """The User Agent of the API caller.""" + return self["requestContext"]["identity"].get("userAgent") + + @property + def user_arn(self) -> Optional[str]: + """The Amazon Resource Name (ARN) of the effective user identified after authentication.""" + return self["requestContext"]["identity"].get("userArn") + + @property + def client_cert(self) -> Optional[RequestContextClientCert]: + client_cert = self["requestContext"]["identity"].get("clientCert") + return None if client_cert is None else RequestContextClientCert(client_cert) + + +class BaseRequestContext(DictWrapper): + @property + def account_id(self) -> str: + """The AWS account ID associated with the request.""" + return self["requestContext"]["accountId"] + + @property + def api_id(self) -> str: + """The identifier API Gateway assigns to your API.""" + return self["requestContext"]["apiId"] + + @property + def domain_name(self) -> Optional[str]: + """A domain name""" + return self["requestContext"].get("domainName") + + @property + def domain_prefix(self) -> Optional[str]: + return self["requestContext"].get("domainPrefix") + + @property + def extended_request_id(self) -> Optional[str]: + """An automatically generated ID for the API call, which contains more useful information + for debugging/troubleshooting.""" + return self["requestContext"].get("extendedRequestId") + + @property + def protocol(self) -> str: + """The request protocol, for example, HTTP/1.1.""" + return self["requestContext"]["protocol"] + + @property + def http_method(self) -> str: + """The HTTP method used. Valid values include: DELETE, GET, HEAD, OPTIONS, PATCH, POST, and PUT.""" + return self["requestContext"]["httpMethod"] + + @property + def identity(self) -> APIGatewayEventIdentity: + return APIGatewayEventIdentity(self._data) + + @property + def path(self) -> str: + return self["requestContext"]["path"] + + @property + def stage(self) -> str: + """The deployment stage of the API request""" + return self["requestContext"]["stage"] + + @property + def request_id(self) -> str: + """The ID that API Gateway assigns to the API request.""" + return self["requestContext"]["requestId"] + + @property + def request_time(self) -> Optional[str]: + """The CLF-formatted request time (dd/MMM/yyyy:HH:mm:ss +-hhmm)""" + return self["requestContext"].get("requestTime") + + @property + def request_time_epoch(self) -> int: + """The Epoch-formatted request time.""" + return self["requestContext"]["requestTimeEpoch"] + + @property + def resource_id(self) -> str: + return self["requestContext"]["resourceId"] + + @property + def resource_path(self) -> str: + return self["requestContext"]["resourcePath"] + + +class RequestContextV2Http(DictWrapper): + @property + def method(self) -> str: + return self["requestContext"]["http"]["method"] + + @property + def path(self) -> str: + return self["requestContext"]["http"]["path"] + + @property + def protocol(self) -> str: + """The request protocol, for example, HTTP/1.1.""" + return self["requestContext"]["http"]["protocol"] + + @property + def source_ip(self) -> str: + """The source IP address of the TCP connection making the request to API Gateway.""" + return self["requestContext"]["http"]["sourceIp"] + + @property + def user_agent(self) -> str: + """The User Agent of the API caller.""" + return self["requestContext"]["http"]["userAgent"] + + +class BaseRequestContextV2(DictWrapper): + @property + def account_id(self) -> str: + """The AWS account ID associated with the request.""" + return self["requestContext"]["accountId"] + + @property + def api_id(self) -> str: + """The identifier API Gateway assigns to your API.""" + return self["requestContext"]["apiId"] + + @property + def domain_name(self) -> str: + """A domain name""" + return self["requestContext"]["domainName"] + + @property + def domain_prefix(self) -> str: + return self["requestContext"]["domainPrefix"] + + @property + def http(self) -> RequestContextV2Http: + return RequestContextV2Http(self._data) + + @property + def request_id(self) -> str: + """The ID that API Gateway assigns to the API request.""" + return self["requestContext"]["requestId"] + + @property + def route_key(self) -> str: + """The selected route key.""" + return self["requestContext"]["routeKey"] + + @property + def stage(self) -> str: + """The deployment stage of the API request""" + return self["requestContext"]["stage"] + + @property + def time(self) -> str: + """The CLF-formatted request time (dd/MMM/yyyy:HH:mm:ss +-hhmm).""" + return self["requestContext"]["time"] + + @property + def time_epoch(self) -> int: + """The Epoch-formatted request time.""" + return self["requestContext"]["timeEpoch"] + + @property + def authentication(self) -> Optional[RequestContextClientCert]: + """Optional when using mutual TLS authentication""" + client_cert = self["requestContext"].get("authentication", {}).get("clientCert") + return None if client_cert is None else RequestContextClientCert(client_cert) diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 7591a26288e..c3a8b5a415f 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -59,11 +59,13 @@ Same example as above, but using the `event_source` decorator Event Source | Data_class ------------------------------------------------- | --------------------------------------------------------------------------------- +[API Gateway Authorizer](#api-gateway-authorizer) | `APIGatewayAuthorizerRequestEvent` +[API Gateway Authorizer V2](#api-gateway-authorizer-v2) | `APIGatewayAuthorizerEventV2` [API Gateway Proxy](#api-gateway-proxy) | `APIGatewayProxyEvent` [API Gateway Proxy V2](#api-gateway-proxy-v2) | `APIGatewayProxyEventV2` [Application Load Balancer](#application-load-balancer) | `ALBEvent` -[AppSync Resolver](#appsync-resolver) | `AppSyncResolverEvent` [AppSync Authorizer](#appsync-authorizer) | `AppSyncAuthorizerEvent` +[AppSync Resolver](#appsync-resolver) | `AppSyncResolverEvent` [CloudWatch Logs](#cloudwatch-logs) | `CloudWatchLogsEvent` [CodePipeline Job Event](#codepipeline-job) | `CodePipelineJobEvent` [Cognito User Pool](#cognito-user-pool) | Multiple available under `cognito_user_pool_event` @@ -81,6 +83,129 @@ Event Source | Data_class The examples provided below are far from exhaustive - the data classes themselves are designed to provide a form of documentation inherently (via autocompletion, types and docstrings). +### API Gateway Authorizer + +> New in 1.20.0 + +It is used for API Gateway Rest API lambda authorizer payload. See docs on +[Use API Gateway Lambda authorizers](https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-use-lambda-authorizer.html){target="_blank"} +for more details. Use `APIGatewayAuthorizerRequestEvent` for type "REQUEST" and `APIGatewayAuthorizerTokenEvent` for +type "TOKEN". + +Below is 2 examples of a Rest API lambda authorizer. One looking up user details by `Authorization` header and using +`APIGatewayAuthorizerResponse` to return the declined response when user is not found or authorized and include +the user details in the request context and full access for admin users. And another using +`APIGatewayAuthorizerTokenEvent` to get the `authorization_token`. + +=== "app_type_request.py" + + ```python + from aws_lambda_powertools.utilities.data_classes import event_source + from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import ( + APIGatewayAuthorizerRequestEvent, + APIGatewayAuthorizerResponse, + HttpVerb, + ) + from secrets import compare_digest + + + def get_user_by_token(token): + if compare_digest(token, "admin-foo"): + return {"isAdmin": True, "name": "Admin"} + elif compare_digest(token, "regular-foo"): + return {"name": "Joe"} + else: + return None + + + @event_source(data_class=APIGatewayAuthorizerRequestEvent) + def handler(event: APIGatewayAuthorizerRequestEvent, context): + user = get_user_by_token(event.get_header_value("Authorization")) + + # parse the `methodArn` as an `APIGatewayRouteArn` + arn = event.parsed_arn + # Create the response builder from parts of the `methodArn` + builder = APIGatewayAuthorizerResponse("user", arn.region, arn.aws_account_id, arn.api_id, arn.stage) + + if user is None: + # No user was found, so we return not authorized + builder.deny_all_routes() + return builder.asdict() + + # Found the user and setting the details in the context + builder.context = user + + # Conditional IAM Policy + if user.get("isAdmin", False): + builder.allow_all_routes() + else: + builder.allow_route(HttpVerb.GET, "/user-profile") + + return builder.asdict() + ``` +=== "app_type_token.py" + + ```python + from aws_lambda_powertools.utilities.data_classes import event_source + from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import ( + APIGatewayAuthorizerTokenEvent, + APIGatewayAuthorizerResponse, + ) + + + @event_source(data_class=APIGatewayAuthorizerTokenEvent) + def handler(event: APIGatewayAuthorizerTokenEvent, context): + arn = event.parsed_arn + builder = APIGatewayAuthorizerResponse("user", arn.region, arn.aws_account_id, arn.api_id, arn.stage) + if event.authorization_token == "42": + builder.allow_all_methods() + else: + builder.deny_all_methods() + return builder.asdict() + ``` + +### API Gateway Authorizer V2 + +> New in 1.20.0 + +It is used for API Gateway HTTP API lambda authorizer payload version 2. See blog post +[Introducing IAM and Lambda authorizers for Amazon API Gateway HTTP APIs](https://aws.amazon.com/blogs/compute/introducing-iam-and-lambda-authorizers-for-amazon-api-gateway-http-apis/){target="_blank"} +or [Working with AWS Lambda authorizers for HTTP APIs](https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-lambda-authorizer.html){target="_blank"} +for more details + +Below is a simple example of an HTTP API lambda authorizer looking up user details by `x-token` header and using +`APIGatewayAuthorizerResponseV2` to return the declined response when user is not found or authorized and include +the user details in the request context. + +=== "app.py" + + ```python + from aws_lambda_powertools.utilities.data_classes import event_source + from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import ( + APIGatewayAuthorizerEventV2, + APIGatewayAuthorizerResponseV2, + ) + from secrets import compare_digest + + + def get_user_by_token(token): + if compare_digest(token, "Foo"): + return {"name": "Foo"} + return None + + + @event_source(data_class=APIGatewayAuthorizerEventV2) + def handler(event: APIGatewayAuthorizerEventV2, context): + user = get_user_by_token(event.get_header_value("x-token")) + + if user is None: + # No user was found, so we return not authorized + return APIGatewayAuthorizerResponseV2().asdict() + + # Found the user and setting the details in the context + return APIGatewayAuthorizerResponseV2(authorize=True, context=user).asdict() + ``` + ### API Gateway Proxy It is used for either API Gateway REST API or HTTP API using v1 proxy event. @@ -129,7 +254,7 @@ Is it used for Application load balancer event. do_something_with(event.json_body, event.query_string_parameters) ``` -## AppSync Authorizer +### AppSync Authorizer > New in 1.20.0 diff --git a/tests/events/apiGatewayAuthorizerRequestEvent.json b/tests/events/apiGatewayAuthorizerRequestEvent.json new file mode 100644 index 00000000000..d8dfe3fecf9 --- /dev/null +++ b/tests/events/apiGatewayAuthorizerRequestEvent.json @@ -0,0 +1,69 @@ +{ + "version": "1.0", + "type": "REQUEST", + "methodArn": "arn:aws:execute-api:us-east-1:123456789012:abcdef123/test/GET/request", + "identitySource": "user1,123", + "authorizationToken": "user1,123", + "resource": "/request", + "path": "/request", + "httpMethod": "GET", + "headers": { + "X-AMZ-Date": "20170718T062915Z", + "Accept": "*/*", + "HeaderAuth1": "headerValue1", + "CloudFront-Viewer-Country": "US", + "CloudFront-Forwarded-Proto": "https", + "CloudFront-Is-Tablet-Viewer": "false", + "CloudFront-Is-Mobile-Viewer": "false", + "User-Agent": "..." + }, + "queryStringParameters": { + "QueryString1": "queryValue1" + }, + "pathParameters": {}, + "stageVariables": { + "StageVar1": "stageValue1" + }, + "requestContext": { + "accountId": "123456789012", + "apiId": "abcdef123", + "domainName": "3npb9j1tlk.execute-api.us-west-1.amazonaws.com", + "domainPrefix": "3npb9j1tlk", + "extendedRequestId": "EXqgWgXxSK4EJug=", + "httpMethod": "GET", + "identity": { + "accessKey": null, + "accountId": null, + "caller": null, + "cognitoAmr": null, + "cognitoAuthenticationProvider": null, + "cognitoAuthenticationType": null, + "cognitoIdentityId": null, + "cognitoIdentityPoolId": null, + "principalOrgId": null, + "apiKey": "...", + "sourceIp": "...", + "user": null, + "userAgent": "PostmanRuntime/7.28.3", + "userArn": null, + "clientCert": { + "clientCertPem": "CERT_CONTENT", + "subjectDN": "www.example.com", + "issuerDN": "Example issuer", + "serialNumber": "a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1", + "validity": { + "notBefore": "May 28 12:30:02 2019 GMT", + "notAfter": "Aug 5 09:36:04 2021 GMT" + } + } + }, + "path": "/request", + "protocol": "HTTP/1.1", + "requestId": "EXqgWgXxSK4EJug=", + "requestTime": "20/Aug/2021:14:36:50 +0000", + "requestTimeEpoch": 1629470210043, + "resourceId": "ANY /request", + "resourcePath": "/request", + "stage": "test" + } +} diff --git a/tests/events/apiGatewayAuthorizerTokenEvent.json b/tests/events/apiGatewayAuthorizerTokenEvent.json new file mode 100644 index 00000000000..f30f360f6d8 --- /dev/null +++ b/tests/events/apiGatewayAuthorizerTokenEvent.json @@ -0,0 +1,5 @@ +{ + "type": "TOKEN", + "authorizationToken": "allow", + "methodArn": "arn:aws:execute-api:us-west-2:123456789012:ymy8tbxw7b/*/GET/" +} diff --git a/tests/events/apiGatewayAuthorizerV2Event.json b/tests/events/apiGatewayAuthorizerV2Event.json new file mode 100644 index 00000000000..f0528080c90 --- /dev/null +++ b/tests/events/apiGatewayAuthorizerV2Event.json @@ -0,0 +1,52 @@ +{ + "version": "2.0", + "type": "REQUEST", + "routeArn": "arn:aws:execute-api:us-east-1:123456789012:abcdef123/test/GET/request", + "identitySource": ["user1", "123"], + "routeKey": "GET /merchants", + "rawPath": "/merchants", + "rawQueryString": "parameter1=value1¶meter1=value2¶meter2=value", + "cookies": ["cookie1", "cookie2"], + "headers": { + "x-amzn-trace-id": "Root=1-611cc4a7-0746ebee281cfd967db97b64", + "Header1": "value1", + "Header2": "value2", + "Authorization": "value" + }, + "queryStringParameters": { + "parameter1": "value1,value2", + "parameter2": "value" + }, + "requestContext": { + "accountId": "123456789012", + "apiId": "api-id", + "authentication": { + "clientCert": { + "clientCertPem": "CERT_CONTENT", + "subjectDN": "www.example.com", + "issuerDN": "Example issuer", + "serialNumber": "a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1", + "validity": { + "notBefore": "May 28 12:30:02 2019 GMT", + "notAfter": "Aug 5 09:36:04 2021 GMT" + } + } + }, + "domainName": "id.execute-api.us-east-1.amazonaws.com", + "domainPrefix": "id", + "http": { + "method": "POST", + "path": "/merchants", + "protocol": "HTTP/1.1", + "sourceIp": "IP", + "userAgent": "agent" + }, + "requestId": "id", + "routeKey": "GET /merchants", + "stage": "$default", + "time": "12/Mar/2020:19:03:58 +0000", + "timeEpoch": 1583348638390 + }, + "pathParameters": { "parameter1": "value1" }, + "stageVariables": { "stageVariable1": "value1", "stageVariable2": "value2" } +} diff --git a/tests/events/apiGatewayProxyV2Event.json b/tests/events/apiGatewayProxyV2Event.json index 5e001934fee..9de632b8e3d 100644 --- a/tests/events/apiGatewayProxyV2Event.json +++ b/tests/events/apiGatewayProxyV2Event.json @@ -18,6 +18,18 @@ "requestContext": { "accountId": "123456789012", "apiId": "api-id", + "authentication": { + "clientCert": { + "clientCertPem": "CERT_CONTENT", + "subjectDN": "www.example.com", + "issuerDN": "Example issuer", + "serialNumber": "a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1", + "validity": { + "notBefore": "May 28 12:30:02 2019 GMT", + "notAfter": "Aug 5 09:36:04 2021 GMT" + } + } + }, "authorizer": { "jwt": { "claims": { @@ -54,4 +66,4 @@ "stageVariable1": "value1", "stageVariable2": "value2" } -} \ No newline at end of file +} diff --git a/tests/functional/data_classes/__init__.py b/tests/functional/data_classes/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/functional/data_classes/test_api_gateway_authorizer.py b/tests/functional/data_classes/test_api_gateway_authorizer.py new file mode 100644 index 00000000000..5310b28e634 --- /dev/null +++ b/tests/functional/data_classes/test_api_gateway_authorizer.py @@ -0,0 +1,159 @@ +import pytest + +from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import ( + APIGatewayAuthorizerResponse, + HttpVerb, +) + + +@pytest.fixture +def builder(): + return APIGatewayAuthorizerResponse("foo", "us-west-1", "123456789", "fantom", "dev") + + +def test_authorizer_response_no_statement(builder: APIGatewayAuthorizerResponse): + # GIVEN a builder with no statements + with pytest.raises(ValueError) as ex: + # WHEN calling build + builder.asdict() + + # THEN raise a name error for not statements + assert str(ex.value) == "No statements defined for the policy" + + +def test_authorizer_response_invalid_verb(builder: APIGatewayAuthorizerResponse): + with pytest.raises(ValueError) as ex: + # GIVEN a invalid http_method + # WHEN calling deny_method + builder.deny_route("INVALID", "foo") + + # THEN raise a name error for invalid http verb + assert str(ex.value) == "Invalid HTTP verb INVALID. Allowed verbs in HttpVerb class" + + +def test_authorizer_response_invalid_resource(builder: APIGatewayAuthorizerResponse): + with pytest.raises(ValueError) as ex: + # GIVEN a invalid resource path "$" + # WHEN calling deny_method + builder.deny_route(HttpVerb.GET, "$") + + # THEN raise a name error for invalid resource + assert "Invalid resource path: $" in str(ex.value) + + +def test_authorizer_response_allow_all_routes_with_context(): + builder = APIGatewayAuthorizerResponse("foo", "us-west-1", "123456789", "fantom", "dev", {"name": "Foo"}) + builder.allow_all_routes() + assert builder.asdict() == { + "principalId": "foo", + "policyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Action": "execute-api:Invoke", + "Effect": "Allow", + "Resource": ["arn:aws:execute-api:us-west-1:123456789:fantom/dev/*/*"], + } + ], + }, + "context": {"name": "Foo"}, + } + + +def test_authorizer_response_deny_all_routes(builder: APIGatewayAuthorizerResponse): + builder.deny_all_routes() + assert builder.asdict() == { + "principalId": "foo", + "policyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Action": "execute-api:Invoke", + "Effect": "Deny", + "Resource": ["arn:aws:execute-api:us-west-1:123456789:fantom/dev/*/*"], + } + ], + }, + } + + +def test_authorizer_response_allow_route(builder: APIGatewayAuthorizerResponse): + builder.allow_route(HttpVerb.GET, "/foo") + assert builder.asdict() == { + "policyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Action": "execute-api:Invoke", + "Effect": "Allow", + "Resource": ["arn:aws:execute-api:us-west-1:123456789:fantom/dev/GET/foo"], + } + ], + }, + "principalId": "foo", + } + + +def test_authorizer_response_deny_route(builder: APIGatewayAuthorizerResponse): + builder.deny_route(HttpVerb.PUT, "foo") + assert builder.asdict() == { + "principalId": "foo", + "policyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Action": "execute-api:Invoke", + "Effect": "Deny", + "Resource": ["arn:aws:execute-api:us-west-1:123456789:fantom/dev/PUT/foo"], + } + ], + }, + } + + +def test_authorizer_response_allow_route_with_conditions(builder: APIGatewayAuthorizerResponse): + builder.allow_route( + HttpVerb.POST, + "/foo", + [ + {"StringEquals": {"method.request.header.Content-Type": "text/html"}}, + ], + ) + assert builder.asdict() == { + "principalId": "foo", + "policyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Action": "execute-api:Invoke", + "Effect": "Allow", + "Resource": ["arn:aws:execute-api:us-west-1:123456789:fantom/dev/POST/foo"], + "Condition": [{"StringEquals": {"method.request.header.Content-Type": "text/html"}}], + } + ], + }, + } + + +def test_authorizer_response_deny_route_with_conditions(builder: APIGatewayAuthorizerResponse): + builder.deny_route( + HttpVerb.POST, + "/foo", + [ + {"StringEquals": {"method.request.header.Content-Type": "application/json"}}, + ], + ) + assert builder.asdict() == { + "principalId": "foo", + "policyDocument": { + "Version": "2012-10-17", + "Statement": [ + { + "Action": "execute-api:Invoke", + "Effect": "Deny", + "Resource": ["arn:aws:execute-api:us-west-1:123456789:fantom/dev/POST/foo"], + "Condition": [{"StringEquals": {"method.request.header.Content-Type": "application/json"}}], + } + ], + }, + } diff --git a/tests/functional/test_data_classes.py b/tests/functional/test_data_classes.py index c9e6f592439..5514a888e7d 100644 --- a/tests/functional/test_data_classes.py +++ b/tests/functional/test_data_classes.py @@ -22,6 +22,13 @@ SNSEvent, SQSEvent, ) +from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import ( + APIGatewayAuthorizerEventV2, + APIGatewayAuthorizerRequestEvent, + APIGatewayAuthorizerResponseV2, + APIGatewayAuthorizerTokenEvent, + parse_api_gateway_arn, +) from aws_lambda_powertools.utilities.data_classes.appsync.scalar_types_utils import ( _formatted_time, aws_date, @@ -785,6 +792,7 @@ def test_default_api_gateway_proxy_event(): assert identity.user == event["requestContext"]["identity"]["user"] assert identity.user_agent == event["requestContext"]["identity"]["userAgent"] assert identity.user_arn == event["requestContext"]["identity"]["userArn"] + assert identity.client_cert.subject_dn == "www.example.com" assert request_context.path == event["requestContext"]["path"] assert request_context.protocol == event["requestContext"]["protocol"] @@ -851,6 +859,7 @@ def test_api_gateway_proxy_event(): assert identity.user == event["requestContext"]["identity"]["user"] assert identity.user_agent == event["requestContext"]["identity"]["userAgent"] assert identity.user_arn == event["requestContext"]["identity"]["userArn"] + assert identity.client_cert.subject_dn == "www.example.com" assert request_context.path == event["requestContext"]["path"] assert request_context.protocol == event["requestContext"]["protocol"] @@ -875,6 +884,7 @@ def test_api_gateway_proxy_event(): assert request_context.operation_name is None assert identity.api_key is None assert identity.api_key_id is None + assert request_context.identity.client_cert.subject_dn == "www.example.com" def test_api_gateway_proxy_v2_event(): @@ -910,6 +920,7 @@ def test_api_gateway_proxy_v2_event(): assert request_context.stage == event["requestContext"]["stage"] assert request_context.time == event["requestContext"]["time"] assert request_context.time_epoch == event["requestContext"]["timeEpoch"] + assert request_context.authentication.subject_dn == "www.example.com" assert event.body == event["body"] assert event.path_parameters == event["pathParameters"] @@ -1455,3 +1466,151 @@ def test_appsync_authorizer_response(): assert {"isAuthorized": False} == AppSyncAuthorizerResponse(resolver_context={}).asdict() assert {"isAuthorized": True} == AppSyncAuthorizerResponse(authorize=True).asdict() assert {"isAuthorized": False, "ttlOverride": 0} == AppSyncAuthorizerResponse(max_age=0).asdict() + + +def test_api_gateway_authorizer_v2(): + """Check api gateway authorize event format v2.0""" + event = APIGatewayAuthorizerEventV2(load_event("apiGatewayAuthorizerV2Event.json")) + + assert event["version"] == event.version + assert event["version"] == "2.0" + assert event["type"] == event.get_type + assert event["routeArn"] == event.route_arn + assert event.parsed_arn.arn == event.route_arn + assert event["identitySource"] == event.identity_source + assert event["routeKey"] == event.route_key + assert event["rawPath"] == event.raw_path + assert event["rawQueryString"] == event.raw_query_string + assert event["cookies"] == event.cookies + assert event["headers"] == event.headers + assert event["queryStringParameters"] == event.query_string_parameters + assert event["requestContext"]["accountId"] == event.request_context.account_id + assert event["requestContext"]["apiId"] == event.request_context.api_id + expected_client_cert = event["requestContext"]["authentication"]["clientCert"] + assert expected_client_cert["clientCertPem"] == event.request_context.authentication.client_cert_pem + assert expected_client_cert["subjectDN"] == event.request_context.authentication.subject_dn + assert expected_client_cert["issuerDN"] == event.request_context.authentication.issuer_dn + assert expected_client_cert["serialNumber"] == event.request_context.authentication.serial_number + assert expected_client_cert["validity"]["notAfter"] == event.request_context.authentication.validity_not_after + assert expected_client_cert["validity"]["notBefore"] == event.request_context.authentication.validity_not_before + assert event["requestContext"]["domainName"] == event.request_context.domain_name + assert event["requestContext"]["domainPrefix"] == event.request_context.domain_prefix + expected_http = event["requestContext"]["http"] + assert expected_http["method"] == event.request_context.http.method + assert expected_http["path"] == event.request_context.http.path + assert expected_http["protocol"] == event.request_context.http.protocol + assert expected_http["sourceIp"] == event.request_context.http.source_ip + assert expected_http["userAgent"] == event.request_context.http.user_agent + assert event["requestContext"]["requestId"] == event.request_context.request_id + assert event["requestContext"]["routeKey"] == event.request_context.route_key + assert event["requestContext"]["stage"] == event.request_context.stage + assert event["requestContext"]["time"] == event.request_context.time + assert event["requestContext"]["timeEpoch"] == event.request_context.time_epoch + assert event["pathParameters"] == event.path_parameters + assert event["stageVariables"] == event.stage_variables + + assert event.get_header_value("Authorization") == "value" + assert event.get_header_value("authorization") == "value" + assert event.get_header_value("missing") is None + + # Check for optionals + event_optionals = APIGatewayAuthorizerEventV2({"requestContext": {}}) + assert event_optionals.identity_source is None + assert event_optionals.request_context.authentication is None + assert event_optionals.path_parameters is None + assert event_optionals.stage_variables is None + + +def test_api_gateway_authorizer_token_event(): + """Check API Gateway authorizer token event""" + event = APIGatewayAuthorizerTokenEvent(load_event("apiGatewayAuthorizerTokenEvent.json")) + + assert event.authorization_token == event["authorizationToken"] + assert event.method_arn == event["methodArn"] + assert event.parsed_arn.arn == event.method_arn + assert event.get_type == event["type"] + + +def test_api_gateway_authorizer_request_event(): + """Check API Gateway authorizer token event""" + event = APIGatewayAuthorizerRequestEvent(load_event("apiGatewayAuthorizerRequestEvent.json")) + + assert event.version == event["version"] + assert event.get_type == event["type"] + assert event.method_arn == event["methodArn"] + assert event.parsed_arn.arn == event.method_arn + assert event.identity_source == event["identitySource"] + assert event.authorization_token == event["authorizationToken"] + assert event.resource == event["resource"] + assert event.path == event["path"] + assert event.http_method == event["httpMethod"] + assert event.headers == event["headers"] + assert event.get_header_value("accept") == "*/*" + assert event.query_string_parameters == event["queryStringParameters"] + assert event.path_parameters == event["pathParameters"] + assert event.stage_variables == event["stageVariables"] + + assert event.request_context is not None + request_context = event.request_context + assert request_context.account_id == event["requestContext"]["accountId"] + assert request_context.api_id == event["requestContext"]["apiId"] + + assert request_context.domain_name == event["requestContext"]["domainName"] + assert request_context.domain_prefix == event["requestContext"]["domainPrefix"] + assert request_context.extended_request_id == event["requestContext"]["extendedRequestId"] + assert request_context.http_method == event["requestContext"]["httpMethod"] + + identity = request_context.identity + assert identity.access_key == event["requestContext"]["identity"]["accessKey"] + assert identity.account_id == event["requestContext"]["identity"]["accountId"] + assert identity.caller == event["requestContext"]["identity"]["caller"] + assert ( + identity.cognito_authentication_provider == event["requestContext"]["identity"]["cognitoAuthenticationProvider"] + ) + assert identity.cognito_authentication_type == event["requestContext"]["identity"]["cognitoAuthenticationType"] + assert identity.cognito_identity_id == event["requestContext"]["identity"]["cognitoIdentityId"] + assert identity.cognito_identity_pool_id == event["requestContext"]["identity"]["cognitoIdentityPoolId"] + assert identity.principal_org_id == event["requestContext"]["identity"]["principalOrgId"] + assert identity.source_ip == event["requestContext"]["identity"]["sourceIp"] + assert identity.user == event["requestContext"]["identity"]["user"] + assert identity.user_agent == event["requestContext"]["identity"]["userAgent"] + assert identity.user_arn == event["requestContext"]["identity"]["userArn"] + assert identity.client_cert.subject_dn == "www.example.com" + + assert request_context.path == event["requestContext"]["path"] + assert request_context.protocol == event["requestContext"]["protocol"] + assert request_context.request_id == event["requestContext"]["requestId"] + assert request_context.request_time == event["requestContext"]["requestTime"] + assert request_context.request_time_epoch == event["requestContext"]["requestTimeEpoch"] + assert request_context.resource_id == event["requestContext"]["resourceId"] + assert request_context.resource_path == event["requestContext"]["resourcePath"] + assert request_context.stage == event["requestContext"]["stage"] + + +def test_api_gateway_authorizer_simple_response(): + """Check building API Gateway authorizer simple resource""" + assert {"isAuthorized": False} == APIGatewayAuthorizerResponseV2().asdict() + expected_context = {"foo": "value"} + assert {"isAuthorized": True, "context": expected_context} == APIGatewayAuthorizerResponseV2( + authorize=True, + context=expected_context, + ).asdict() + + +def test_api_gateway_route_arn_parser(): + """Check api gateway route or method arn parsing""" + arn = "arn:aws:execute-api:us-east-1:123456789012:abcdef123/test/GET/request" + details = parse_api_gateway_arn(arn) + + assert details.arn == arn + assert details.region == "us-east-1" + assert details.aws_account_id == "123456789012" + assert details.api_id == "abcdef123" + assert details.stage == "test" + assert details.http_method == "GET" + assert details.resource == "request" + + arn = "arn:aws:execute-api:us-west-2:123456789012:ymy8tbxw7b/*/GET" + details = parse_api_gateway_arn(arn) + assert details.resource == "" + assert details.arn == arn + "/"