Skip to content

Commit 5046a35

Browse files
feat(idempotency): support methods with the same name (ABCs) by including fully qualified name in v2 (#1535)
Co-authored-by: Rúben Fonseca <[email protected]>
1 parent 4729fc6 commit 5046a35

File tree

16 files changed

+292
-18
lines changed

16 files changed

+292
-18
lines changed

aws_lambda_powertools/utilities/idempotency/base.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def __init__(
7676
self.fn_kwargs = function_kwargs
7777
self.config = config
7878

79-
persistence_store.configure(config, self.function.__name__)
79+
persistence_store.configure(config, f"{self.function.__module__}.{self.function.__qualname__}")
8080
self.persistence_store = persistence_store
8181

8282
def handle(self) -> Any:

docs/upgrade.md

+12
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Changes at a glance:
1212

1313
* The API for **event handler's `Response`** has minor changes to support multi value headers and cookies.
1414
* The **legacy SQS batch processor** was removed.
15+
* The **Idempotency key** format changed slightly, invalidating all the existing cached results.
1516

1617
???+ important
1718
Powertools for Python v2 drops suport for Python 3.6, following the Python 3.6 End-Of-Life (EOL) reached on December 23, 2021.
@@ -142,3 +143,14 @@ You can migrate to the [native batch processing](https://aws.amazon.com/about-aw
142143
143144
return processor.response()
144145
```
146+
147+
## Idempotency key format
148+
149+
The format of the Idempotency key was changed. This is used store the invocation results on a persistent store like DynamoDB.
150+
151+
No changes are necessary in your code, but remember that existing Idempotency records will be ignored when you upgrade, as new executions generate keys with the new format.
152+
153+
Prior to this change, the Idempotency key was generated using only the caller function name (e.g: `lambda_handler#282e83393862a613b612c00283fef4c8`).
154+
After this change, the key is generated using the `module name` + `qualified function name` + `idempotency key` (e.g: `app.classExample.function#app.handler#282e83393862a613b612c00283fef4c8`).
155+
156+
Using qualified names prevents distinct functions with the same name to contend for the same Idempotency key.

docs/utilities/idempotency.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ If you're not [changing the default configuration for the DynamoDB persistence l
4242
| TTL attribute name | `expiration` | This can only be configured after your table is created if you're using AWS Console |
4343

4444
???+ tip "Tip: You can share a single state table for all functions"
45-
You can reuse the same DynamoDB table to store idempotency state. We add your `function_name` in addition to the idempotency key as a hash key.
45+
You can reuse the same DynamoDB table to store idempotency state. We add `module_name` and [qualified name for classes and functions](https://peps.python.org/pep-3155/) in addition to the idempotency key as a hash key.
4646

4747
```yaml hl_lines="5-13 21-23" title="AWS Serverless Application Model (SAM) example"
4848
Resources:

tests/e2e/idempotency/__init__.py

Whitespace-only changes.

tests/e2e/idempotency/conftest.py

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import pytest
2+
3+
from tests.e2e.idempotency.infrastructure import IdempotencyDynamoDBStack
4+
5+
6+
@pytest.fixture(autouse=True, scope="module")
7+
def infrastructure(tmp_path_factory, worker_id):
8+
"""Setup and teardown logic for E2E test infrastructure
9+
10+
Yields
11+
------
12+
Dict[str, str]
13+
CloudFormation Outputs from deployed infrastructure
14+
"""
15+
stack = IdempotencyDynamoDBStack()
16+
try:
17+
yield stack.deploy()
18+
finally:
19+
stack.delete()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import time
2+
3+
from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, idempotent
4+
5+
persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
6+
7+
8+
@idempotent(persistence_store=persistence_layer)
9+
def lambda_handler(event, context):
10+
11+
time.sleep(10)
12+
13+
return event
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import time
2+
3+
from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent
4+
5+
persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
6+
config = IdempotencyConfig(expires_after_seconds=20)
7+
8+
9+
@idempotent(config=config, persistence_store=persistence_layer)
10+
def lambda_handler(event, context):
11+
12+
time_now = time.time()
13+
14+
return {"time": str(time_now)}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import time
2+
3+
from aws_lambda_powertools.utilities.idempotency import DynamoDBPersistenceLayer, IdempotencyConfig, idempotent
4+
5+
persistence_layer = DynamoDBPersistenceLayer(table_name="IdempotencyTable")
6+
config = IdempotencyConfig(expires_after_seconds=1)
7+
8+
9+
@idempotent(config=config, persistence_store=persistence_layer)
10+
def lambda_handler(event, context):
11+
12+
sleep_time: int = event.get("sleep") or 0
13+
time.sleep(sleep_time)
14+
15+
return event
+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
from typing import Any
2+
3+
from aws_cdk import CfnOutput, RemovalPolicy
4+
from aws_cdk import aws_dynamodb as dynamodb
5+
6+
from tests.e2e.utils.infrastructure import BaseInfrastructure
7+
8+
9+
class IdempotencyDynamoDBStack(BaseInfrastructure):
10+
def create_resources(self):
11+
functions = self.create_lambda_functions()
12+
self._create_dynamodb_table(function=functions)
13+
14+
def _create_dynamodb_table(self, function: Any):
15+
table = dynamodb.Table(
16+
self.stack,
17+
"Idempotency",
18+
table_name="IdempotencyTable",
19+
removal_policy=RemovalPolicy.DESTROY,
20+
partition_key=dynamodb.Attribute(name="id", type=dynamodb.AttributeType.STRING),
21+
time_to_live_attribute="expiration",
22+
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
23+
)
24+
25+
table.grant_read_write_data(function["TtlCacheExpirationHandler"])
26+
table.grant_read_write_data(function["TtlCacheTimeoutHandler"])
27+
table.grant_read_write_data(function["ParallelExecutionHandler"])
28+
29+
CfnOutput(self.stack, "DynamoDBTable", value=table.table_name)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import json
2+
from time import sleep
3+
4+
import pytest
5+
6+
from tests.e2e.utils import data_fetcher
7+
from tests.e2e.utils.functions import execute_lambdas_in_parallel
8+
9+
10+
@pytest.fixture
11+
def ttl_cache_expiration_handler_fn_arn(infrastructure: dict) -> str:
12+
return infrastructure.get("TtlCacheExpirationHandlerArn", "")
13+
14+
15+
@pytest.fixture
16+
def ttl_cache_timeout_handler_fn_arn(infrastructure: dict) -> str:
17+
return infrastructure.get("TtlCacheTimeoutHandlerArn", "")
18+
19+
20+
@pytest.fixture
21+
def parallel_execution_handler_fn_arn(infrastructure: dict) -> str:
22+
return infrastructure.get("ParallelExecutionHandlerArn", "")
23+
24+
25+
@pytest.fixture
26+
def idempotency_table_name(infrastructure: dict) -> str:
27+
return infrastructure.get("DynamoDBTable", "")
28+
29+
30+
def test_ttl_caching_expiration_idempotency(ttl_cache_expiration_handler_fn_arn: str):
31+
# GIVEN
32+
payload = json.dumps({"message": "Lambda Powertools - TTL 20s"})
33+
34+
# WHEN
35+
# first execution
36+
first_execution, _ = data_fetcher.get_lambda_response(
37+
lambda_arn=ttl_cache_expiration_handler_fn_arn, payload=payload
38+
)
39+
first_execution_response = first_execution["Payload"].read().decode("utf-8")
40+
41+
# the second execution should return the same response as the first execution
42+
second_execution, _ = data_fetcher.get_lambda_response(
43+
lambda_arn=ttl_cache_expiration_handler_fn_arn, payload=payload
44+
)
45+
second_execution_response = second_execution["Payload"].read().decode("utf-8")
46+
47+
# wait 20s to expire ttl and execute again, this should return a new response value
48+
sleep(20)
49+
third_execution, _ = data_fetcher.get_lambda_response(
50+
lambda_arn=ttl_cache_expiration_handler_fn_arn, payload=payload
51+
)
52+
third_execution_response = third_execution["Payload"].read().decode("utf-8")
53+
54+
# THEN
55+
assert first_execution_response == second_execution_response
56+
assert third_execution_response != second_execution_response
57+
58+
59+
def test_ttl_caching_timeout_idempotency(ttl_cache_timeout_handler_fn_arn: str):
60+
# GIVEN
61+
payload_timeout_execution = json.dumps({"sleep": 10, "message": "Lambda Powertools - TTL 1s"})
62+
payload_working_execution = json.dumps({"sleep": 0, "message": "Lambda Powertools - TTL 1s"})
63+
64+
# WHEN
65+
# first call should fail due to timeout
66+
execution_with_timeout, _ = data_fetcher.get_lambda_response(
67+
lambda_arn=ttl_cache_timeout_handler_fn_arn, payload=payload_timeout_execution
68+
)
69+
execution_with_timeout_response = execution_with_timeout["Payload"].read().decode("utf-8")
70+
71+
# the second call should work and return the payload
72+
execution_working, _ = data_fetcher.get_lambda_response(
73+
lambda_arn=ttl_cache_timeout_handler_fn_arn, payload=payload_working_execution
74+
)
75+
execution_working_response = execution_working["Payload"].read().decode("utf-8")
76+
77+
# THEN
78+
assert "Task timed out after" in execution_with_timeout_response
79+
assert payload_working_execution == execution_working_response
80+
81+
82+
def test_parallel_execution_idempotency(parallel_execution_handler_fn_arn: str):
83+
# GIVEN
84+
arguments = json.dumps({"message": "Lambda Powertools - Parallel execution"})
85+
86+
# WHEN
87+
# executing Lambdas in parallel
88+
lambdas_arn = [parallel_execution_handler_fn_arn, parallel_execution_handler_fn_arn]
89+
execution_result_list = execute_lambdas_in_parallel("data_fetcher.get_lambda_response", lambdas_arn, arguments)
90+
91+
timeout_execution_response = execution_result_list[0][0]["Payload"].read().decode("utf-8")
92+
error_idempotency_execution_response = execution_result_list[1][0]["Payload"].read().decode("utf-8")
93+
94+
# THEN
95+
assert "Execution already in progress with idempotency key" in error_idempotency_execution_response
96+
assert "Task timed out after" in timeout_execution_response
+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from tests.e2e.utils.data_fetcher.common import get_http_response, get_lambda_response
2+
from tests.e2e.utils.data_fetcher.idempotency import get_ddb_idempotency_record
23
from tests.e2e.utils.data_fetcher.logs import get_logs
34
from tests.e2e.utils.data_fetcher.metrics import get_metrics
45
from tests.e2e.utils.data_fetcher.traces import get_traces
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import boto3
2+
from retry import retry
3+
4+
5+
@retry(ValueError, delay=2, jitter=1.5, tries=10)
6+
def get_ddb_idempotency_record(
7+
function_name: str,
8+
table_name: str,
9+
) -> int:
10+
"""_summary_
11+
12+
Parameters
13+
----------
14+
function_name : str
15+
Name of Lambda function to fetch dynamodb record
16+
table_name : str
17+
Name of DynamoDB table
18+
19+
Returns
20+
-------
21+
int
22+
Count of records found
23+
24+
Raises
25+
------
26+
ValueError
27+
When no record is found within retry window
28+
"""
29+
ddb_client = boto3.resource("dynamodb")
30+
table = ddb_client.Table(table_name)
31+
ret = table.scan(
32+
FilterExpression="contains (id, :functionName)",
33+
ExpressionAttributeValues={":functionName": f"{function_name}#"},
34+
)
35+
36+
if not ret["Items"]:
37+
raise ValueError("Empty response from DynamoDB Repeating...")
38+
39+
return ret["Count"]

tests/e2e/utils/functions.py

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from concurrent.futures import ThreadPoolExecutor
2+
3+
from tests.e2e.utils import data_fetcher # noqa F401
4+
5+
6+
def execute_lambdas_in_parallel(function_name: str, lambdas_arn: list, arguments: str):
7+
result_list = []
8+
with ThreadPoolExecutor() as executor:
9+
running_tasks = executor.map(lambda exec: eval(function_name)(*exec), [(arn, arguments) for arn in lambdas_arn])
10+
executor.shutdown(wait=True)
11+
for running_task in running_tasks:
12+
result_list.append(running_task)
13+
14+
return result_list

tests/functional/idempotency/conftest.py

+10-4
Original file line numberDiff line numberDiff line change
@@ -172,18 +172,24 @@ def expected_params_put_item_with_validation(hashed_idempotency_key, hashed_vali
172172

173173

174174
@pytest.fixture
175-
def hashed_idempotency_key(lambda_apigw_event, default_jmespath, lambda_context):
175+
def hashed_idempotency_key(request, lambda_apigw_event, default_jmespath, lambda_context):
176176
compiled_jmespath = jmespath.compile(default_jmespath)
177177
data = compiled_jmespath.search(lambda_apigw_event)
178-
return "test-func.lambda_handler#" + hash_idempotency_key(data)
178+
return (
179+
f"test-func.{request.function.__module__}.{request.function.__qualname__}.<locals>.lambda_handler#"
180+
+ hash_idempotency_key(data)
181+
)
179182

180183

181184
@pytest.fixture
182-
def hashed_idempotency_key_with_envelope(lambda_apigw_event):
185+
def hashed_idempotency_key_with_envelope(request, lambda_apigw_event):
183186
event = extract_data_from_envelope(
184187
data=lambda_apigw_event, envelope=envelopes.API_GATEWAY_HTTP, jmespath_options={}
185188
)
186-
return "test-func.lambda_handler#" + hash_idempotency_key(event)
189+
return (
190+
f"test-func.{request.function.__module__}.{request.function.__qualname__}.<locals>.lambda_handler#"
191+
+ hash_idempotency_key(event)
192+
)
187193

188194

189195
@pytest.fixture

0 commit comments

Comments
 (0)