Skip to content

Commit 643d1b3

Browse files
Add batch processing tests, SNS batch processing not possible
1 parent 472e659 commit 643d1b3

File tree

2 files changed

+228
-67
lines changed

2 files changed

+228
-67
lines changed

datadog_lambda/tracing.py

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -68,28 +68,32 @@
6868
LOWER_64_BITS = "LOWER_64_BITS"
6969

7070

71-
def _dsm_set_context_sqs_or_sns_event(event, event_type):
71+
def _dsm_set_context_sqs_event(event):
7272
for record in event.get("Records", [])[1:]:
73-
if arn := _get_source_arn_sqs_or_sns(record, event_type):
74-
context = _extract_context_from_sqs_or_sns_record(record)
75-
_dsm_set_checkpoint(context, event_type, arn)
73+
if arn := record.get("eventSourceARN"):
74+
try:
75+
context = _extract_context_from_sqs_or_sns_record(record)
76+
except Exception as e:
77+
logger.debug(
78+
f"DSM: Failed to extract context with error {e}, will still set checkpoint"
79+
)
80+
context = None
81+
_dsm_set_checkpoint(context, "sqs", arn)
7682

7783

7884
def _dsm_set_context_kinesis_event(event):
7985
for record in event.get("Records", [])[1:]:
8086
if (arn := record.get("eventSourceARN")) and (kinesis := record.get("kinesis")):
81-
context = _extract_context_from_kinesis_record(kinesis)
87+
try:
88+
context = _extract_context_from_kinesis_record(kinesis)
89+
except Exception as e:
90+
logger.debug(
91+
f"DSM: Failed to extract context with error {e}, will still set checkpoint"
92+
)
93+
context = None
8294
_dsm_set_checkpoint(context, "kinesis", arn)
8395

8496

85-
def _get_source_arn_sqs_or_sns(record, event_type):
86-
return (
87-
record.get("Sns", {}).get("TopicArn")
88-
if event_type == "sns"
89-
else record.get("eventSourceARN")
90-
)
91-
92-
9397
def _dsm_set_checkpoint(context_json, event_type, arn):
9498
if not config.data_streams_enabled:
9599
return
@@ -271,7 +275,11 @@ def extract_context_from_sqs_or_sns_event_or_context(
271275

272276
try:
273277
first_record = event.get("Records")[0]
274-
source_arn = _get_source_arn_sqs_or_sns(first_record, event_type)
278+
source_arn = (
279+
first_record.get("Sns", {}).get("TopicArn")
280+
if event_type == "sns"
281+
else first_record.get("eventSourceARN")
282+
)
275283
dd_data = _extract_context_from_sqs_or_sns_record(first_record)
276284
if dd_data:
277285
if is_step_function_event(dd_data):
@@ -283,8 +291,9 @@ def extract_context_from_sqs_or_sns_event_or_context(
283291
)
284292
context = propagator.extract(dd_data)
285293
_dsm_set_checkpoint(dd_data, event_type, source_arn)
286-
if config.data_streams_enabled:
287-
_dsm_set_context_sqs_or_sns_event(event, event_type)
294+
# Batch Processing does not occur for SNS events
295+
if config.data_streams_enabled and event_type == "sqs":
296+
_dsm_set_context_sqs_event(event)
288297
return context
289298
else:
290299
# Handle case where trace context is injected into attributes.AWSTraceHeader
@@ -310,15 +319,17 @@ def extract_context_from_sqs_or_sns_event_or_context(
310319
)
311320
# Still want to set a DSM checkpoint even if DSM context not propagated
312321
_dsm_set_checkpoint(None, event_type, source_arn)
313-
if config.data_streams_enabled:
314-
_dsm_set_context_sqs_or_sns_event(event, event_type)
322+
# Batch Processing does not occur for SNS events
323+
if config.data_streams_enabled and event_type == "sqs":
324+
_dsm_set_context_sqs_event(event)
315325
return extract_context_from_lambda_context(lambda_context)
316326
except Exception as e:
317327
logger.debug("The trace extractor returned with error %s", e)
318328
# Still want to set a DSM checkpoint even if DSM context not propagated
319329
_dsm_set_checkpoint(None, event_type, source_arn)
320-
if config.data_streams_enabled:
321-
_dsm_set_context_sqs_or_sns_event(event, event_type)
330+
# Batch Processing does not occur for SNS events
331+
if config.data_streams_enabled and event_type == "sqs":
332+
_dsm_set_context_sqs_event(event)
322333
return extract_context_from_lambda_context(lambda_context)
323334

324335

tests/test_tracing.py

Lines changed: 197 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -2743,6 +2743,71 @@ def test_sqs_batch_processing(self):
27432743
carrier_get_2 = args_2[2]
27442744
self.assertEqual(carrier_get_2("dd-pathway-ctx-base64"), "record2")
27452745

2746+
def test_sqs_batch_processing_with_invalid_record(self):
2747+
dd_data_1 = {"dd-pathway-ctx-base64": "record1"}
2748+
dd_data_3 = {"dd-pathway-ctx-base64": "record3"}
2749+
dd_json_data_1 = json.dumps(dd_data_1)
2750+
dd_json_data_3 = json.dumps(dd_data_3)
2751+
2752+
event = {
2753+
"Records": [
2754+
{
2755+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue",
2756+
"messageAttributes": {
2757+
"_datadog": {
2758+
"dataType": "String",
2759+
"stringValue": dd_json_data_1,
2760+
}
2761+
},
2762+
"eventSource": "aws:sqs",
2763+
},
2764+
{
2765+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue",
2766+
"messageAttributes": {
2767+
"_datadog": {
2768+
"dataType": "String",
2769+
"stringValue": "invalid json", # This will cause extraction to fail
2770+
}
2771+
},
2772+
"eventSource": "aws:sqs",
2773+
},
2774+
{
2775+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue",
2776+
"messageAttributes": {
2777+
"_datadog": {
2778+
"dataType": "String",
2779+
"stringValue": dd_json_data_3,
2780+
}
2781+
},
2782+
"eventSource": "aws:sqs",
2783+
},
2784+
]
2785+
}
2786+
2787+
extract_context_from_sqs_or_sns_event_or_context(
2788+
event, self.lambda_context, parse_event_source(event)
2789+
)
2790+
2791+
self.assertEqual(self.mock_checkpoint.call_count, 3)
2792+
2793+
args_1, _ = self.mock_checkpoint.call_args_list[0]
2794+
self.assertEqual(args_1[0], "sqs")
2795+
self.assertEqual(args_1[1], "arn:aws:sqs:us-east-1:123456789012:test-queue")
2796+
carrier_get_1 = args_1[2]
2797+
self.assertEqual(carrier_get_1("dd-pathway-ctx-base64"), "record1")
2798+
2799+
args_2, _ = self.mock_checkpoint.call_args_list[1]
2800+
self.assertEqual(args_2[0], "sqs")
2801+
self.assertEqual(args_2[1], "arn:aws:sqs:us-east-1:123456789012:test-queue")
2802+
carrier_get_2 = args_2[2]
2803+
self.assertEqual(carrier_get_2("dd-pathway-ctx-base64"), None)
2804+
2805+
args_3, _ = self.mock_checkpoint.call_args_list[2]
2806+
self.assertEqual(args_3[0], "sqs")
2807+
self.assertEqual(args_3[1], "arn:aws:sqs:us-east-1:123456789012:test-queue")
2808+
carrier_get_3 = args_3[2]
2809+
self.assertEqual(carrier_get_3("dd-pathway-ctx-base64"), "record3")
2810+
27462811
def test_sqs_source_arn_not_found(self):
27472812
event = {
27482813
"Records": [
@@ -3019,53 +3084,6 @@ def test_sns_data_streams_disabled(self):
30193084

30203085
self.mock_checkpoint.assert_not_called()
30213086

3022-
def test_sns_batch_processing(self):
3023-
dd_data_1 = {"dd-pathway-ctx-base64": "record1"}
3024-
dd_data_2 = {"dd-pathway-ctx-base64": "record2"}
3025-
dd_json_data_1 = json.dumps(dd_data_1)
3026-
dd_json_data_2 = json.dumps(dd_data_2)
3027-
3028-
event = {
3029-
"Records": [
3030-
{
3031-
"Sns": {
3032-
"TopicArn": "arn:aws:sns:us-east-1:123456789012:test-topic",
3033-
"MessageAttributes": {
3034-
"_datadog": {"Type": "String", "Value": dd_json_data_1}
3035-
},
3036-
},
3037-
"eventSource": "aws:sns",
3038-
},
3039-
{
3040-
"Sns": {
3041-
"TopicArn": "arn:aws:sns:us-east-1:123456789012:test-topic",
3042-
"MessageAttributes": {
3043-
"_datadog": {"Type": "String", "Value": dd_json_data_2}
3044-
},
3045-
},
3046-
"eventSource": "aws:sns",
3047-
},
3048-
]
3049-
}
3050-
3051-
extract_context_from_sqs_or_sns_event_or_context(
3052-
event, self.lambda_context, parse_event_source(event)
3053-
)
3054-
3055-
self.assertEqual(self.mock_checkpoint.call_count, 2)
3056-
3057-
args_1, _ = self.mock_checkpoint.call_args_list[0]
3058-
self.assertEqual(args_1[0], "sns")
3059-
self.assertEqual(args_1[1], "arn:aws:sns:us-east-1:123456789012:test-topic")
3060-
carrier_get_1 = args_1[2]
3061-
self.assertEqual(carrier_get_1("dd-pathway-ctx-base64"), "record1")
3062-
3063-
args_2, _ = self.mock_checkpoint.call_args_list[1]
3064-
self.assertEqual(args_2[0], "sns")
3065-
self.assertEqual(args_2[1], "arn:aws:sns:us-east-1:123456789012:test-topic")
3066-
carrier_get_2 = args_2[2]
3067-
self.assertEqual(carrier_get_2("dd-pathway-ctx-base64"), "record2")
3068-
30693087
# SNS -> SQS TESTS
30703088

30713089
def test_sns_to_sqs_context_propagated_string_value(self):
@@ -3362,6 +3380,81 @@ def test_sns_to_sqs_batch_processing(self):
33623380
carrier_get_2 = args_2[2]
33633381
self.assertEqual(carrier_get_2("dd-pathway-ctx-base64"), "record2")
33643382

3383+
def test_sns_to_sqs_batch_processing_with_invalid_record(self):
3384+
dd_data_1 = {"dd-pathway-ctx-base64": "record1"}
3385+
dd_data_3 = {"dd-pathway-ctx-base64": "record3"}
3386+
dd_json_data_1 = json.dumps(dd_data_1)
3387+
dd_json_data_3 = json.dumps(dd_data_3)
3388+
3389+
sns_message_1 = {
3390+
"Type": "Notification",
3391+
"TopicArn": "arn:aws:sns:us-east-1:123456789012:test-topic",
3392+
"MessageAttributes": {
3393+
"_datadog": {"Type": "String", "Value": dd_json_data_1}
3394+
},
3395+
}
3396+
sns_message_2 = {
3397+
"Type": "Notification",
3398+
"TopicArn": "arn:aws:sns:us-east-1:123456789012:test-topic",
3399+
"MessageAttributes": {
3400+
"_datadog": {
3401+
"Type": "String",
3402+
"Value": "invalid json",
3403+
} # This will cause extraction to fail
3404+
},
3405+
}
3406+
sns_message_3 = {
3407+
"Type": "Notification",
3408+
"TopicArn": "arn:aws:sns:us-east-1:123456789012:test-topic",
3409+
"MessageAttributes": {
3410+
"_datadog": {"Type": "String", "Value": dd_json_data_3}
3411+
},
3412+
}
3413+
3414+
event = {
3415+
"Records": [
3416+
{
3417+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue",
3418+
"body": json.dumps(sns_message_1),
3419+
"eventSource": "aws:sqs",
3420+
},
3421+
{
3422+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue",
3423+
"body": json.dumps(sns_message_2),
3424+
"eventSource": "aws:sqs",
3425+
},
3426+
{
3427+
"eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue",
3428+
"body": json.dumps(sns_message_3),
3429+
"eventSource": "aws:sqs",
3430+
},
3431+
]
3432+
}
3433+
3434+
extract_context_from_sqs_or_sns_event_or_context(
3435+
event, self.lambda_context, parse_event_source(event)
3436+
)
3437+
3438+
self.assertEqual(self.mock_checkpoint.call_count, 3)
3439+
3440+
args_1, _ = self.mock_checkpoint.call_args_list[0]
3441+
self.assertEqual(args_1[0], "sqs")
3442+
self.assertEqual(args_1[1], "arn:aws:sqs:us-east-1:123456789012:test-queue")
3443+
carrier_get_1 = args_1[2]
3444+
self.assertEqual(carrier_get_1("dd-pathway-ctx-base64"), "record1")
3445+
3446+
args_2, _ = self.mock_checkpoint.call_args_list[1]
3447+
self.assertEqual(args_2[0], "sqs")
3448+
self.assertEqual(args_2[1], "arn:aws:sqs:us-east-1:123456789012:test-queue")
3449+
carrier_get_2 = args_2[2]
3450+
self.assertEqual(carrier_get_2("dd-pathway-ctx-base64"), None)
3451+
3452+
args_3, _ = self.mock_checkpoint.call_args_list[2]
3453+
self.assertEqual(args_3[0], "sqs")
3454+
self.assertEqual(args_3[1], "arn:aws:sqs:us-east-1:123456789012:test-queue")
3455+
carrier_get_3 = args_3[2]
3456+
self.assertEqual(carrier_get_3("dd-pathway-ctx-base64"), "record3")
3457+
33653458
def test_sns_to_sqs_source_arn_not_found(self):
33663459
sns_notification = {
33673460
"Type": "Notification",
@@ -3574,6 +3667,63 @@ def test_kinesis_batch_processing(self):
35743667
carrier_get_2 = args_2[2]
35753668
self.assertEqual(carrier_get_2("dd-pathway-ctx-base64"), "record2")
35763669

3670+
def test_kinesis_batch_processing_with_invalid_record(self):
3671+
dd_data_1 = {"dd-pathway-ctx-base64": "record1"}
3672+
dd_data_3 = {"dd-pathway-ctx-base64": "record3"}
3673+
3674+
kinesis_data_1 = {"_datadog": dd_data_1, "message": "test1"}
3675+
kinesis_data_2 = {"invalid": "data"}
3676+
kinesis_data_3 = {"_datadog": dd_data_3, "message": "test3"}
3677+
3678+
encoded_data_1 = base64.b64encode(json.dumps(kinesis_data_1).encode()).decode()
3679+
encoded_data_2 = base64.b64encode(json.dumps(kinesis_data_2).encode()).decode()
3680+
encoded_data_3 = base64.b64encode(json.dumps(kinesis_data_3).encode()).decode()
3681+
3682+
event = {
3683+
"Records": [
3684+
{
3685+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream",
3686+
"kinesis": {"data": encoded_data_1},
3687+
},
3688+
{
3689+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream",
3690+
"kinesis": {"data": encoded_data_2},
3691+
},
3692+
{
3693+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream",
3694+
"kinesis": {"data": encoded_data_3},
3695+
},
3696+
]
3697+
}
3698+
3699+
extract_context_from_kinesis_event(event, self.lambda_context)
3700+
3701+
self.assertEqual(self.mock_checkpoint.call_count, 3)
3702+
3703+
args_1, _ = self.mock_checkpoint.call_args_list[0]
3704+
self.assertEqual(args_1[0], "kinesis")
3705+
self.assertEqual(
3706+
args_1[1], "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream"
3707+
)
3708+
carrier_get_1 = args_1[2]
3709+
self.assertEqual(carrier_get_1("dd-pathway-ctx-base64"), "record1")
3710+
3711+
args_2, _ = self.mock_checkpoint.call_args_list[1]
3712+
self.assertEqual(args_2[0], "kinesis")
3713+
self.assertEqual(
3714+
args_2[1], "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream"
3715+
)
3716+
carrier_get_2 = args_2[2]
3717+
self.assertEqual(carrier_get_2("dd-pathway-ctx-base64"), None)
3718+
3719+
args_3, _ = self.mock_checkpoint.call_args_list[2]
3720+
self.assertEqual(args_3[0], "kinesis")
3721+
self.assertEqual(
3722+
args_3[1], "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream"
3723+
)
3724+
carrier_get_3 = args_3[2]
3725+
self.assertEqual(carrier_get_3("dd-pathway-ctx-base64"), "record3")
3726+
35773727
def test_kinesis_source_arn_not_found(self):
35783728
kinesis_data = {"message": "test"}
35793729
kinesis_data_str = json.dumps(kinesis_data)

0 commit comments

Comments
 (0)