From c0906a00df624c4791fb95a20b24b5560c48cd5c Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Mon, 4 Aug 2025 16:04:54 -0400 Subject: [PATCH 01/16] set checkpoint for all messages one loop approach --- datadog_lambda/tracing.py | 235 +++++++++++++++++++++----------------- tests/test_tracing.py | 195 +++++++++++++++++++++++++++++++ 2 files changed, 326 insertions(+), 104 deletions(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 51157f6a..943e1ac0 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -248,91 +248,104 @@ def extract_context_from_sqs_or_sns_event_or_context( except Exception: logger.debug("Failed extracting context as EventBridge to SQS.") - try: - first_record = event.get("Records")[0] - source_arn = first_record.get("eventSourceARN", "") - - # logic to deal with SNS => SQS event - if "body" in first_record: - body_str = first_record.get("body") - try: - body = json.loads(body_str) - if body.get("Type", "") == "Notification" and "TopicArn" in body: - logger.debug("Found SNS message inside SQS event") - first_record = get_first_record(create_sns_event(body)) - except Exception: - pass - - msg_attributes = first_record.get("messageAttributes") - if msg_attributes is None: - sns_record = first_record.get("Sns") or {} - # SNS->SQS event would extract SNS arn without this check - if event_source.equals(EventTypes.SNS): - source_arn = sns_record.get("TopicArn", "") - msg_attributes = sns_record.get("MessageAttributes") or {} - dd_payload = msg_attributes.get("_datadog") - if dd_payload: - # SQS uses dataType and binaryValue/stringValue - # SNS uses Type and Value - dd_json_data = None - dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType") - if dd_json_data_type == "Binary": - import base64 + context = None + records = ( + event.get("Records", []) + if config.data_streams_enabled + else [event.get("Records")[0]] + ) + is_first_record = True + for record in records: + try: + source_arn = record.get("eventSourceARN", "") + dsm_data = None + + # logic to deal with SNS => SQS event + if "body" in record: + body_str = record.get("body") + try: + body = json.loads(body_str) + if body.get("Type", "") == "Notification" and "TopicArn" in body: + logger.debug("Found SNS message inside SQS event") + record = get_first_record(create_sns_event(body)) + except Exception: + pass + + msg_attributes = record.get("messageAttributes") + if msg_attributes is None: + sns_record = record.get("Sns") or {} + # SNS->SQS event would extract SNS arn without this check + if event_source.equals(EventTypes.SNS): + source_arn = sns_record.get("TopicArn", "") + msg_attributes = sns_record.get("MessageAttributes") or {} + dd_payload = msg_attributes.get("_datadog") + if dd_payload: + # SQS uses dataType and binaryValue/stringValue + # SNS uses Type and Value + dd_json_data = None + dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType") + if dd_json_data_type == "Binary": + import base64 + + dd_json_data = dd_payload.get("binaryValue") or dd_payload.get( + "Value" + ) + if dd_json_data: + dd_json_data = base64.b64decode(dd_json_data) + elif dd_json_data_type == "String": + dd_json_data = dd_payload.get("stringValue") or dd_payload.get( + "Value" + ) + else: + logger.debug( + "Datadog Lambda Python only supports extracting trace" + "context from String or Binary SQS/SNS message attributes" + ) - dd_json_data = dd_payload.get("binaryValue") or dd_payload.get("Value") if dd_json_data: - dd_json_data = base64.b64decode(dd_json_data) - elif dd_json_data_type == "String": - dd_json_data = dd_payload.get("stringValue") or dd_payload.get("Value") + dd_data = json.loads(dd_json_data) + + if is_step_function_event(dd_data): + try: + return extract_context_from_step_functions(dd_data, None) + except Exception: + logger.debug( + "Failed to extract Step Functions context from SQS/SNS event." + ) + if is_first_record: + context = propagator.extract(dd_data) + dsm_data = dd_data else: - logger.debug( - "Datadog Lambda Python only supports extracting trace" - "context from String or Binary SQS/SNS message attributes" - ) + # Handle case where trace context is injected into attributes.AWSTraceHeader + # example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1 + attrs = event.get("Records")[0].get("attributes") + if attrs: + x_ray_header = attrs.get("AWSTraceHeader") + if x_ray_header: + x_ray_context = parse_xray_header(x_ray_header) + trace_id_parts = x_ray_context.get("trace_id", "").split("-") + if len(trace_id_parts) > 2 and trace_id_parts[2].startswith( + DD_TRACE_JAVA_TRACE_ID_PADDING + ): + # If it starts with eight 0's padding, + # then this AWSTraceHeader contains Datadog injected trace context + logger.debug( + "Found dd-trace injected trace context from AWSTraceHeader" + ) + if is_first_record: + context = Context( + trace_id=int(trace_id_parts[2][8:], 16), + span_id=int(x_ray_context["parent_id"], 16), + sampling_priority=float(x_ray_context["sampled"]), + ) + except Exception as e: + logger.debug("The trace extractor returned with error %s", e) - if dd_json_data: - dd_data = json.loads(dd_json_data) + # Set DSM checkpoint once per record + _dsm_set_checkpoint(dsm_data, event_type, source_arn) + is_first_record = False - if is_step_function_event(dd_data): - try: - return extract_context_from_step_functions(dd_data, None) - except Exception: - logger.debug( - "Failed to extract Step Functions context from SQS/SNS event." - ) - context = propagator.extract(dd_data) - _dsm_set_checkpoint(dd_data, event_type, source_arn) - return context - else: - # Handle case where trace context is injected into attributes.AWSTraceHeader - # example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1 - attrs = event.get("Records")[0].get("attributes") - if attrs: - x_ray_header = attrs.get("AWSTraceHeader") - if x_ray_header: - x_ray_context = parse_xray_header(x_ray_header) - trace_id_parts = x_ray_context.get("trace_id", "").split("-") - if len(trace_id_parts) > 2 and trace_id_parts[2].startswith( - DD_TRACE_JAVA_TRACE_ID_PADDING - ): - # If it starts with eight 0's padding, - # then this AWSTraceHeader contains Datadog injected trace context - logger.debug( - "Found dd-trace injected trace context from AWSTraceHeader" - ) - return Context( - trace_id=int(trace_id_parts[2][8:], 16), - span_id=int(x_ray_context["parent_id"], 16), - sampling_priority=float(x_ray_context["sampled"]), - ) - # Still want to set a DSM checkpoint even if DSM context not propagated - _dsm_set_checkpoint(None, event_type, source_arn) - return extract_context_from_lambda_context(lambda_context) - except Exception as e: - logger.debug("The trace extractor returned with error %s", e) - # Still want to set a DSM checkpoint even if DSM context not propagated - _dsm_set_checkpoint(None, event_type, source_arn) - return extract_context_from_lambda_context(lambda_context) + return context if context else extract_context_from_lambda_context(lambda_context) def _extract_context_from_eventbridge_sqs_event(event): @@ -393,30 +406,44 @@ def extract_context_from_kinesis_event(event, lambda_context): Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported. """ source_arn = "" - try: - record = get_first_record(event) - source_arn = record.get("eventSourceARN", "") - kinesis = record.get("kinesis") - if not kinesis: - return extract_context_from_lambda_context(lambda_context) - data = kinesis.get("data") - if data: - import base64 - - b64_bytes = data.encode("ascii") - str_bytes = base64.b64decode(b64_bytes) - data_str = str_bytes.decode("ascii") - data_obj = json.loads(data_str) - dd_ctx = data_obj.get("_datadog") - if dd_ctx: - context = propagator.extract(dd_ctx) - _dsm_set_checkpoint(dd_ctx, "kinesis", source_arn) - return context - except Exception as e: - logger.debug("The trace extractor returned with error %s", e) - # Still want to set a DSM checkpoint even if DSM context not propagated - _dsm_set_checkpoint(None, "kinesis", source_arn) - return extract_context_from_lambda_context(lambda_context) + records = ( + [get_first_record(event)] + if not config.data_streams_enabled + else event.get("Records") + ) + context = None + is_first_record = True + for record in records: + dsm_data = None + try: + source_arn = record.get("eventSourceARN", "") + kinesis = record.get("kinesis") + if not kinesis: + context = ( + extract_context_from_lambda_context(lambda_context) + if is_first_record + else context + ) + is_first_record = False + continue + data = kinesis.get("data") + if data: + import base64 + + b64_bytes = data.encode("ascii") + str_bytes = base64.b64decode(b64_bytes) + data_str = str_bytes.decode("ascii") + data_obj = json.loads(data_str) + dd_ctx = data_obj.get("_datadog") + if dd_ctx: + if is_first_record: + context = propagator.extract(dd_ctx) + dsm_data = dd_ctx + except Exception as e: + logger.debug("The trace extractor returned with error %s", e) + _dsm_set_checkpoint(dsm_data, "kinesis", source_arn) + is_first_record = False + return context if context else extract_context_from_lambda_context(lambda_context) def _deterministic_sha256_hash(s: str, part: str) -> int: diff --git a/tests/test_tracing.py b/tests/test_tracing.py index c87a0971..6eb463a6 100644 --- a/tests/test_tracing.py +++ b/tests/test_tracing.py @@ -2694,6 +2694,55 @@ def test_sqs_invalid_datadog_message_attribute(self, mock_logger): # None indicates no DSM context propagation self.assertEqual(carrier_get("dd-pathway-ctx-base64"), None) + def test_sqs_batch_processing(self): + dd_data_1 = {"dd-pathway-ctx-base64": "record1"} + dd_data_2 = {"dd-pathway-ctx-base64": "record2"} + dd_json_data_1 = json.dumps(dd_data_1) + dd_json_data_2 = json.dumps(dd_data_2) + + event = { + "Records": [ + { + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue", + "messageAttributes": { + "_datadog": { + "dataType": "String", + "stringValue": dd_json_data_1, + } + }, + "eventSource": "aws:sqs", + }, + { + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue", + "messageAttributes": { + "_datadog": { + "dataType": "String", + "stringValue": dd_json_data_2, + } + }, + "eventSource": "aws:sqs", + }, + ] + } + + extract_context_from_sqs_or_sns_event_or_context( + event, self.lambda_context, parse_event_source(event) + ) + + self.assertEqual(self.mock_checkpoint.call_count, 2) + + args_1, _ = self.mock_checkpoint.call_args_list[0] + self.assertEqual(args_1[0], "sqs") + self.assertEqual(args_1[1], "arn:aws:sqs:us-east-1:123456789012:test-queue") + carrier_get_1 = args_1[2] + self.assertEqual(carrier_get_1("dd-pathway-ctx-base64"), "record1") + + args_2, _ = self.mock_checkpoint.call_args_list[1] + self.assertEqual(args_2[0], "sqs") + self.assertEqual(args_2[1], "arn:aws:sqs:us-east-1:123456789012:test-queue") + carrier_get_2 = args_2[2] + self.assertEqual(carrier_get_2("dd-pathway-ctx-base64"), "record2") + def test_sqs_source_arn_not_found(self): event = { "Records": [ @@ -2970,6 +3019,53 @@ def test_sns_data_streams_disabled(self): self.mock_checkpoint.assert_not_called() + def test_sns_batch_processing(self): + dd_data_1 = {"dd-pathway-ctx-base64": "record1"} + dd_data_2 = {"dd-pathway-ctx-base64": "record2"} + dd_json_data_1 = json.dumps(dd_data_1) + dd_json_data_2 = json.dumps(dd_data_2) + + event = { + "Records": [ + { + "Sns": { + "TopicArn": "arn:aws:sns:us-east-1:123456789012:test-topic", + "MessageAttributes": { + "_datadog": {"Type": "String", "Value": dd_json_data_1} + }, + }, + "eventSource": "aws:sns", + }, + { + "Sns": { + "TopicArn": "arn:aws:sns:us-east-1:123456789012:test-topic", + "MessageAttributes": { + "_datadog": {"Type": "String", "Value": dd_json_data_2} + }, + }, + "eventSource": "aws:sns", + }, + ] + } + + extract_context_from_sqs_or_sns_event_or_context( + event, self.lambda_context, parse_event_source(event) + ) + + self.assertEqual(self.mock_checkpoint.call_count, 2) + + args_1, _ = self.mock_checkpoint.call_args_list[0] + self.assertEqual(args_1[0], "sns") + self.assertEqual(args_1[1], "arn:aws:sns:us-east-1:123456789012:test-topic") + carrier_get_1 = args_1[2] + self.assertEqual(carrier_get_1("dd-pathway-ctx-base64"), "record1") + + args_2, _ = self.mock_checkpoint.call_args_list[1] + self.assertEqual(args_2[0], "sns") + self.assertEqual(args_2[1], "arn:aws:sns:us-east-1:123456789012:test-topic") + carrier_get_2 = args_2[2] + self.assertEqual(carrier_get_2("dd-pathway-ctx-base64"), "record2") + # SNS -> SQS TESTS def test_sns_to_sqs_context_propagated_string_value(self): @@ -3212,6 +3308,60 @@ def test_sns_to_sqs_invalid_datadog_message_attribute(self, mock_logger): # None indicates no DSM context propagation self.assertEqual(carrier_get("dd-pathway-ctx-base64"), None) + def test_sns_to_sqs_batch_processing(self): + dd_data_1 = {"dd-pathway-ctx-base64": "record1"} + dd_data_2 = {"dd-pathway-ctx-base64": "record2"} + dd_json_data_1 = json.dumps(dd_data_1) + dd_json_data_2 = json.dumps(dd_data_2) + + sns_message_1 = { + "Type": "Notification", + "TopicArn": "arn:aws:sns:us-east-1:123456789012:test-topic", + "MessageAttributes": { + "_datadog": {"Type": "String", "Value": dd_json_data_1} + }, + } + sns_message_2 = { + "Type": "Notification", + "TopicArn": "arn:aws:sns:us-east-1:123456789012:test-topic", + "MessageAttributes": { + "_datadog": {"Type": "String", "Value": dd_json_data_2} + }, + } + + event = { + "Records": [ + { + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue", + "body": json.dumps(sns_message_1), + "eventSource": "aws:sqs", + }, + { + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue", + "body": json.dumps(sns_message_2), + "eventSource": "aws:sqs", + }, + ] + } + + extract_context_from_sqs_or_sns_event_or_context( + event, self.lambda_context, parse_event_source(event) + ) + + self.assertEqual(self.mock_checkpoint.call_count, 2) + + args_1, _ = self.mock_checkpoint.call_args_list[0] + self.assertEqual(args_1[0], "sqs") + self.assertEqual(args_1[1], "arn:aws:sqs:us-east-1:123456789012:test-queue") + carrier_get_1 = args_1[2] + self.assertEqual(carrier_get_1("dd-pathway-ctx-base64"), "record1") + + args_2, _ = self.mock_checkpoint.call_args_list[1] + self.assertEqual(args_2[0], "sqs") + self.assertEqual(args_2[1], "arn:aws:sqs:us-east-1:123456789012:test-queue") + carrier_get_2 = args_2[2] + self.assertEqual(carrier_get_2("dd-pathway-ctx-base64"), "record2") + def test_sns_to_sqs_source_arn_not_found(self): sns_notification = { "Type": "Notification", @@ -3379,6 +3529,51 @@ def test_kinesis_invalid_datadog_message_attribute(self, mock_logger): # None indicates no DSM context propagation self.assertEqual(carrier_get("dd-pathway-ctx-base64"), None) + def test_kinesis_batch_processing(self): + dd_data_1 = {"dd-pathway-ctx-base64": "record1"} + dd_data_2 = {"dd-pathway-ctx-base64": "record2"} + + kinesis_data_1 = {"_datadog": dd_data_1, "message": "test1"} + kinesis_data_2 = {"_datadog": dd_data_2, "message": "test2"} + + encoded_data_1 = base64.b64encode(json.dumps(kinesis_data_1).encode()).decode() + encoded_data_2 = base64.b64encode(json.dumps(kinesis_data_2).encode()).decode() + + event = { + "Records": [ + { + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream", + "kinesis": {"data": encoded_data_1}, + }, + { + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream", + "kinesis": {"data": encoded_data_2}, + }, + ] + } + + extract_context_from_kinesis_event(event, self.lambda_context) + + self.assertEqual(self.mock_checkpoint.call_count, 2) + + # Verify first record call + args_1, _ = self.mock_checkpoint.call_args_list[0] + self.assertEqual(args_1[0], "kinesis") + self.assertEqual( + args_1[1], "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream" + ) + carrier_get_1 = args_1[2] + self.assertEqual(carrier_get_1("dd-pathway-ctx-base64"), "record1") + + # Verify second record call + args_2, _ = self.mock_checkpoint.call_args_list[1] + self.assertEqual(args_2[0], "kinesis") + self.assertEqual( + args_2[1], "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream" + ) + carrier_get_2 = args_2[2] + self.assertEqual(carrier_get_2("dd-pathway-ctx-base64"), "record2") + def test_kinesis_source_arn_not_found(self): kinesis_data = {"message": "test"} kinesis_data_str = json.dumps(kinesis_data) From 8230b078240fb667b21358024b1131bb48f55b70 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Mon, 4 Aug 2025 16:07:20 -0400 Subject: [PATCH 02/16] fix lint --- datadog_lambda/tracing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 943e1ac0..0cc3ec5e 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -317,7 +317,7 @@ def extract_context_from_sqs_or_sns_event_or_context( dsm_data = dd_data else: # Handle case where trace context is injected into attributes.AWSTraceHeader - # example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1 + # example:Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1 attrs = event.get("Records")[0].get("attributes") if attrs: x_ray_header = attrs.get("AWSTraceHeader") From e2cd3c5c7b9acfd61f614452ba4fbb26beee8c09 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Mon, 4 Aug 2025 16:31:25 -0400 Subject: [PATCH 03/16] formatting stuff --- datadog_lambda/tracing.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 0cc3ec5e..9a3da341 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -282,20 +282,18 @@ def extract_context_from_sqs_or_sns_event_or_context( if dd_payload: # SQS uses dataType and binaryValue/stringValue # SNS uses Type and Value + # fmt: off dd_json_data = None dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType") if dd_json_data_type == "Binary": import base64 - dd_json_data = dd_payload.get("binaryValue") or dd_payload.get( - "Value" - ) + dd_json_data = dd_payload.get("binaryValue") or dd_payload.get("Value") if dd_json_data: dd_json_data = base64.b64decode(dd_json_data) elif dd_json_data_type == "String": - dd_json_data = dd_payload.get("stringValue") or dd_payload.get( - "Value" - ) + dd_json_data = dd_payload.get("stringValue") or dd_payload.get("Value") + # fmt: on else: logger.debug( "Datadog Lambda Python only supports extracting trace" From 5cf2059cb4f6ac5550616a68be17b1298ef975cf Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Mon, 4 Aug 2025 16:52:12 -0400 Subject: [PATCH 04/16] better approach with indexes --- datadog_lambda/tracing.py | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 9a3da341..da814740 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -254,8 +254,7 @@ def extract_context_from_sqs_or_sns_event_or_context( if config.data_streams_enabled else [event.get("Records")[0]] ) - is_first_record = True - for record in records: + for idx, record in enumerate(records): try: source_arn = record.get("eventSourceARN", "") dsm_data = None @@ -310,7 +309,7 @@ def extract_context_from_sqs_or_sns_event_or_context( logger.debug( "Failed to extract Step Functions context from SQS/SNS event." ) - if is_first_record: + if idx == 0: context = propagator.extract(dd_data) dsm_data = dd_data else: @@ -330,7 +329,7 @@ def extract_context_from_sqs_or_sns_event_or_context( logger.debug( "Found dd-trace injected trace context from AWSTraceHeader" ) - if is_first_record: + if idx == 0: context = Context( trace_id=int(trace_id_parts[2][8:], 16), span_id=int(x_ray_context["parent_id"], 16), @@ -341,7 +340,6 @@ def extract_context_from_sqs_or_sns_event_or_context( # Set DSM checkpoint once per record _dsm_set_checkpoint(dsm_data, event_type, source_arn) - is_first_record = False return context if context else extract_context_from_lambda_context(lambda_context) @@ -410,8 +408,7 @@ def extract_context_from_kinesis_event(event, lambda_context): else event.get("Records") ) context = None - is_first_record = True - for record in records: + for idx, record in enumerate(records): dsm_data = None try: source_arn = record.get("eventSourceARN", "") @@ -419,10 +416,10 @@ def extract_context_from_kinesis_event(event, lambda_context): if not kinesis: context = ( extract_context_from_lambda_context(lambda_context) - if is_first_record + if idx == 0 else context ) - is_first_record = False + _dsm_set_checkpoint(None, "kinesis", source_arn) continue data = kinesis.get("data") if data: @@ -434,13 +431,12 @@ def extract_context_from_kinesis_event(event, lambda_context): data_obj = json.loads(data_str) dd_ctx = data_obj.get("_datadog") if dd_ctx: - if is_first_record: + if idx == 0: context = propagator.extract(dd_ctx) dsm_data = dd_ctx except Exception as e: logger.debug("The trace extractor returned with error %s", e) _dsm_set_checkpoint(dsm_data, "kinesis", source_arn) - is_first_record = False return context if context else extract_context_from_lambda_context(lambda_context) From 90480bbe3c798123835e91fa8cf86bd1086cdcbe Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 5 Aug 2025 16:04:31 -0400 Subject: [PATCH 05/16] changed approach returning early, rename to dd_ctx --- datadog_lambda/tracing.py | 53 ++++++++++++++++----------------------- 1 file changed, 21 insertions(+), 32 deletions(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index da814740..72339b47 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -249,15 +249,10 @@ def extract_context_from_sqs_or_sns_event_or_context( logger.debug("Failed extracting context as EventBridge to SQS.") context = None - records = ( - event.get("Records", []) - if config.data_streams_enabled - else [event.get("Records")[0]] - ) - for idx, record in enumerate(records): + for idx, record in enumerate(event.get("Records", [])): try: source_arn = record.get("eventSourceARN", "") - dsm_data = None + dd_ctx = None # logic to deal with SNS => SQS event if "body" in record: @@ -311,8 +306,10 @@ def extract_context_from_sqs_or_sns_event_or_context( ) if idx == 0: context = propagator.extract(dd_data) - dsm_data = dd_data - else: + if not config.data_streams_enabled: + break + dd_ctx = dd_data + elif idx == 0: # Handle case where trace context is injected into attributes.AWSTraceHeader # example:Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1 attrs = event.get("Records")[0].get("attributes") @@ -329,17 +326,18 @@ def extract_context_from_sqs_or_sns_event_or_context( logger.debug( "Found dd-trace injected trace context from AWSTraceHeader" ) - if idx == 0: - context = Context( - trace_id=int(trace_id_parts[2][8:], 16), - span_id=int(x_ray_context["parent_id"], 16), - sampling_priority=float(x_ray_context["sampled"]), - ) + context = Context( + trace_id=int(trace_id_parts[2][8:], 16), + span_id=int(x_ray_context["parent_id"], 16), + sampling_priority=float(x_ray_context["sampled"]), + ) + if not config.data_streams_enabled: + break except Exception as e: logger.debug("The trace extractor returned with error %s", e) # Set DSM checkpoint once per record - _dsm_set_checkpoint(dsm_data, event_type, source_arn) + _dsm_set_checkpoint(dd_ctx, event_type, source_arn) return context if context else extract_context_from_lambda_context(lambda_context) @@ -402,25 +400,15 @@ def extract_context_from_kinesis_event(event, lambda_context): Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported. """ source_arn = "" - records = ( - [get_first_record(event)] - if not config.data_streams_enabled - else event.get("Records") - ) + context = None - for idx, record in enumerate(records): - dsm_data = None + for idx, record in enumerate(event.get("Records", [])): + dd_ctx = None try: source_arn = record.get("eventSourceARN", "") kinesis = record.get("kinesis") if not kinesis: - context = ( - extract_context_from_lambda_context(lambda_context) - if idx == 0 - else context - ) - _dsm_set_checkpoint(None, "kinesis", source_arn) - continue + return extract_context_from_lambda_context(lambda_context) data = kinesis.get("data") if data: import base64 @@ -433,10 +421,11 @@ def extract_context_from_kinesis_event(event, lambda_context): if dd_ctx: if idx == 0: context = propagator.extract(dd_ctx) - dsm_data = dd_ctx + if not config.data_streams_enabled: + break except Exception as e: logger.debug("The trace extractor returned with error %s", e) - _dsm_set_checkpoint(dsm_data, "kinesis", source_arn) + _dsm_set_checkpoint(dd_ctx, "kinesis", source_arn) return context if context else extract_context_from_lambda_context(lambda_context) From 6936bba239d284a42bbeb27b849d7a4a8c559739 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 5 Aug 2025 18:16:50 -0400 Subject: [PATCH 06/16] keep APM record[0] implementation --- datadog_lambda/tracing.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 72339b47..27c7d6d9 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -408,7 +408,9 @@ def extract_context_from_kinesis_event(event, lambda_context): source_arn = record.get("eventSourceARN", "") kinesis = record.get("kinesis") if not kinesis: - return extract_context_from_lambda_context(lambda_context) + if idx == 0: + return extract_context_from_lambda_context(lambda_context) + continue data = kinesis.get("data") if data: import base64 From 268c1e5d6d6888e0b16fc5b0b98a86210535b505 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 5 Aug 2025 18:30:49 -0400 Subject: [PATCH 07/16] Keep APM records[0] approach --- datadog_lambda/tracing.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 27c7d6d9..033f75f3 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -297,14 +297,16 @@ def extract_context_from_sqs_or_sns_event_or_context( if dd_json_data: dd_data = json.loads(dd_json_data) - if is_step_function_event(dd_data): - try: - return extract_context_from_step_functions(dd_data, None) - except Exception: - logger.debug( - "Failed to extract Step Functions context from SQS/SNS event." - ) if idx == 0: + if is_step_function_event(dd_data): + try: + return extract_context_from_step_functions( + dd_data, None + ) + except Exception: + logger.debug( + "Failed to extract Step Functions context from SQS/SNS event." + ) context = propagator.extract(dd_data) if not config.data_streams_enabled: break From 2f8dfaa668db763e877a4de497517db22a164ea6 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Wed, 6 Aug 2025 11:03:04 -0400 Subject: [PATCH 08/16] add more batch processing tests --- datadog_lambda/tracing.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 033f75f3..ed68fcbe 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -328,13 +328,11 @@ def extract_context_from_sqs_or_sns_event_or_context( logger.debug( "Found dd-trace injected trace context from AWSTraceHeader" ) - context = Context( + return Context( trace_id=int(trace_id_parts[2][8:], 16), span_id=int(x_ray_context["parent_id"], 16), sampling_priority=float(x_ray_context["sampled"]), ) - if not config.data_streams_enabled: - break except Exception as e: logger.debug("The trace extractor returned with error %s", e) From 63e41cb940e24fbf36ae84723e74e57c21b4575d Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Wed, 6 Aug 2025 11:40:08 -0400 Subject: [PATCH 09/16] want to keep looping --- datadog_lambda/tracing.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index ed68fcbe..033f75f3 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -328,11 +328,13 @@ def extract_context_from_sqs_or_sns_event_or_context( logger.debug( "Found dd-trace injected trace context from AWSTraceHeader" ) - return Context( + context = Context( trace_id=int(trace_id_parts[2][8:], 16), span_id=int(x_ray_context["parent_id"], 16), sampling_priority=float(x_ray_context["sampled"]), ) + if not config.data_streams_enabled: + break except Exception as e: logger.debug("The trace extractor returned with error %s", e) From e5a0903263313daf3a49590275b687aed9f7f598 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Wed, 6 Aug 2025 13:20:31 -0400 Subject: [PATCH 10/16] clean up if statement --- datadog_lambda/tracing.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 033f75f3..2845fe7d 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -422,11 +422,10 @@ def extract_context_from_kinesis_event(event, lambda_context): data_str = str_bytes.decode("ascii") data_obj = json.loads(data_str) dd_ctx = data_obj.get("_datadog") - if dd_ctx: - if idx == 0: - context = propagator.extract(dd_ctx) - if not config.data_streams_enabled: - break + if dd_ctx and idx == 0: + context = propagator.extract(dd_ctx) + if not config.data_streams_enabled: + break except Exception as e: logger.debug("The trace extractor returned with error %s", e) _dsm_set_checkpoint(dd_ctx, "kinesis", source_arn) From 103ffa16c52045bf97029dba507b7a17ce6cc8af Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Wed, 6 Aug 2025 16:19:53 -0400 Subject: [PATCH 11/16] structure code based on comment --- datadog_lambda/tracing.py | 182 ++++++++++++++++++++------------------ 1 file changed, 94 insertions(+), 88 deletions(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 2845fe7d..2f26104d 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -237,7 +237,6 @@ def extract_context_from_sqs_or_sns_event_or_context( Falls back to lambda context if no trace data is found in the SQS message attributes. Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported. """ - source_arn = "" event_type = "sqs" if event_source.equals(EventTypes.SQS) else "sns" # EventBridge => SQS @@ -248,100 +247,107 @@ def extract_context_from_sqs_or_sns_event_or_context( except Exception: logger.debug("Failed extracting context as EventBridge to SQS.") - context = None + apm_context = None for idx, record in enumerate(event.get("Records", [])): + source_arn = ( + record.get("eventSourceARN") + if event_type == "sqs" + else record.get("Sns", {}).get("TopicArn") + ) + dd_data = None try: - source_arn = record.get("eventSourceARN", "") - dd_ctx = None - - # logic to deal with SNS => SQS event - if "body" in record: - body_str = record.get("body") - try: - body = json.loads(body_str) - if body.get("Type", "") == "Notification" and "TopicArn" in body: - logger.debug("Found SNS message inside SQS event") - record = get_first_record(create_sns_event(body)) - except Exception: - pass - - msg_attributes = record.get("messageAttributes") - if msg_attributes is None: - sns_record = record.get("Sns") or {} - # SNS->SQS event would extract SNS arn without this check - if event_source.equals(EventTypes.SNS): - source_arn = sns_record.get("TopicArn", "") - msg_attributes = sns_record.get("MessageAttributes") or {} - dd_payload = msg_attributes.get("_datadog") - if dd_payload: - # SQS uses dataType and binaryValue/stringValue - # SNS uses Type and Value - # fmt: off - dd_json_data = None - dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType") - if dd_json_data_type == "Binary": - import base64 - - dd_json_data = dd_payload.get("binaryValue") or dd_payload.get("Value") - if dd_json_data: - dd_json_data = base64.b64decode(dd_json_data) - elif dd_json_data_type == "String": - dd_json_data = dd_payload.get("stringValue") or dd_payload.get("Value") - # fmt: on + dd_data = _extract_context_from_sqs_or_sns_record(record) + if idx == 0: + if dd_data and is_step_function_event(dd_data): + try: + return extract_context_from_step_functions(dd_data, None) + except Exception: + logger.debug( + "Failed to extract Step Functions context from SQS/SNS event." + ) + elif not dd_data: + apm_context = _extract_context_from_xray(record) else: - logger.debug( - "Datadog Lambda Python only supports extracting trace" - "context from String or Binary SQS/SNS message attributes" - ) - - if dd_json_data: - dd_data = json.loads(dd_json_data) - - if idx == 0: - if is_step_function_event(dd_data): - try: - return extract_context_from_step_functions( - dd_data, None - ) - except Exception: - logger.debug( - "Failed to extract Step Functions context from SQS/SNS event." - ) - context = propagator.extract(dd_data) - if not config.data_streams_enabled: - break - dd_ctx = dd_data - elif idx == 0: - # Handle case where trace context is injected into attributes.AWSTraceHeader - # example:Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1 - attrs = event.get("Records")[0].get("attributes") - if attrs: - x_ray_header = attrs.get("AWSTraceHeader") - if x_ray_header: - x_ray_context = parse_xray_header(x_ray_header) - trace_id_parts = x_ray_context.get("trace_id", "").split("-") - if len(trace_id_parts) > 2 and trace_id_parts[2].startswith( - DD_TRACE_JAVA_TRACE_ID_PADDING - ): - # If it starts with eight 0's padding, - # then this AWSTraceHeader contains Datadog injected trace context - logger.debug( - "Found dd-trace injected trace context from AWSTraceHeader" - ) - context = Context( - trace_id=int(trace_id_parts[2][8:], 16), - span_id=int(x_ray_context["parent_id"], 16), - sampling_priority=float(x_ray_context["sampled"]), - ) - if not config.data_streams_enabled: - break + apm_context = propagator.extract(dd_data) except Exception as e: logger.debug("The trace extractor returned with error %s", e) + if config.data_streams_enabled: + _dsm_set_checkpoint(dd_data, event_type, source_arn) + if not config.data_streams_enabled: + break + + return ( + apm_context + if apm_context + else extract_context_from_lambda_context(lambda_context) + ) - # Set DSM checkpoint once per record - _dsm_set_checkpoint(dd_ctx, event_type, source_arn) - return context if context else extract_context_from_lambda_context(lambda_context) +def _extract_context_from_sqs_or_sns_record(record): + # logic to deal with SNS => SQS event + if "body" in record: + body_str = record.get("body") + try: + body = json.loads(body_str) + if body.get("Type", "") == "Notification" and "TopicArn" in body: + logger.debug("Found SNS message inside SQS event") + record = get_first_record(create_sns_event(body)) + except Exception: + pass + + msg_attributes = record.get("messageAttributes") + if msg_attributes is None: + sns_record = record.get("Sns") or {} + msg_attributes = sns_record.get("MessageAttributes") or {} + dd_payload = msg_attributes.get("_datadog") + if dd_payload: + # SQS uses dataType and binaryValue/stringValue + # SNS uses Type and Value + # fmt: off + dd_json_data = None + dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType") + if dd_json_data_type == "Binary": + import base64 + + dd_json_data = dd_payload.get("binaryValue") or dd_payload.get("Value") + if dd_json_data: + dd_json_data = base64.b64decode(dd_json_data) + elif dd_json_data_type == "String": + dd_json_data = dd_payload.get("stringValue") or dd_payload.get("Value") + # fmt: on + else: + logger.debug( + "Datadog Lambda Python only supports extracting trace" + "context from String or Binary SQS/SNS message attributes" + ) + + if dd_json_data: + dd_data = json.loads(dd_json_data) + return dd_data + return None + + +def _extract_context_from_xray(record): + attrs = record.get("attributes") + if attrs: + x_ray_header = attrs.get("AWSTraceHeader") + if x_ray_header: + x_ray_context = parse_xray_header(x_ray_header) + trace_id_parts = x_ray_context.get("trace_id", "").split("-") + if len(trace_id_parts) > 2 and trace_id_parts[2].startswith( + DD_TRACE_JAVA_TRACE_ID_PADDING + ): + # If it starts with eight 0's padding, + # then this AWSTraceHeader contains Datadog injected trace context + logger.debug( + "Found dd-trace injected trace context from AWSTraceHeader" + ) + return Context( + trace_id=int(trace_id_parts[2][8:], 16), + span_id=int(x_ray_context["parent_id"], 16), + sampling_priority=float(x_ray_context["sampled"]), + ) + return None def _extract_context_from_eventbridge_sqs_event(event): From 6c2fbbd9cf45476c315c8706eaa4a62170d50449 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Thu, 7 Aug 2025 11:40:00 -0400 Subject: [PATCH 12/16] Suggested loop structure --- datadog_lambda/tracing.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 2f26104d..8a5d6e85 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -247,32 +247,33 @@ def extract_context_from_sqs_or_sns_event_or_context( except Exception: logger.debug("Failed extracting context as EventBridge to SQS.") - apm_context = None - for idx, record in enumerate(event.get("Records", [])): + apm_context: Context = None + for record in event.get("Records", []): source_arn = ( record.get("eventSourceARN") if event_type == "sqs" else record.get("Sns", {}).get("TopicArn") ) - dd_data = None + dd_ctx = None try: - dd_data = _extract_context_from_sqs_or_sns_record(record) - if idx == 0: - if dd_data and is_step_function_event(dd_data): + dd_ctx = _extract_context_from_sqs_or_sns_record(record) + if apm_context is None: + if not dd_ctx: + apm_context = _extract_context_from_xray(record) + + elif dd_ctx and is_step_function_event(dd_ctx): try: - return extract_context_from_step_functions(dd_data, None) + return extract_context_from_step_functions(dd_ctx, None) except Exception: logger.debug( "Failed to extract Step Functions context from SQS/SNS event." ) - elif not dd_data: - apm_context = _extract_context_from_xray(record) else: - apm_context = propagator.extract(dd_data) + apm_context = propagator.extract(dd_ctx) except Exception as e: logger.debug("The trace extractor returned with error %s", e) if config.data_streams_enabled: - _dsm_set_checkpoint(dd_data, event_type, source_arn) + _dsm_set_checkpoint(dd_ctx, event_type, source_arn) if not config.data_streams_enabled: break @@ -303,7 +304,6 @@ def _extract_context_from_sqs_or_sns_record(record): if dd_payload: # SQS uses dataType and binaryValue/stringValue # SNS uses Type and Value - # fmt: off dd_json_data = None dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType") if dd_json_data_type == "Binary": @@ -314,7 +314,6 @@ def _extract_context_from_sqs_or_sns_record(record): dd_json_data = base64.b64decode(dd_json_data) elif dd_json_data_type == "String": dd_json_data = dd_payload.get("stringValue") or dd_payload.get("Value") - # fmt: on else: logger.debug( "Datadog Lambda Python only supports extracting trace" From 20673632828cd3214c07754fa9b49eba958508c5 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Thu, 7 Aug 2025 11:49:36 -0400 Subject: [PATCH 13/16] same structure with kinesis --- datadog_lambda/tracing.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 8a5d6e85..cc73c7d0 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -406,18 +406,15 @@ def extract_context_from_kinesis_event(event, lambda_context): Extract datadog trace context from a Kinesis Stream's base64 encoded data string Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported. """ - source_arn = "" - context = None - for idx, record in enumerate(event.get("Records", [])): + apm_context: Context = None + for record in event.get("Records", []): dd_ctx = None try: source_arn = record.get("eventSourceARN", "") kinesis = record.get("kinesis") if not kinesis: - if idx == 0: - return extract_context_from_lambda_context(lambda_context) - continue + return extract_context_from_lambda_context(lambda_context) data = kinesis.get("data") if data: import base64 @@ -427,14 +424,19 @@ def extract_context_from_kinesis_event(event, lambda_context): data_str = str_bytes.decode("ascii") data_obj = json.loads(data_str) dd_ctx = data_obj.get("_datadog") - if dd_ctx and idx == 0: - context = propagator.extract(dd_ctx) - if not config.data_streams_enabled: - break + if dd_ctx and apm_context is None: + apm_context = propagator.extract(dd_ctx) except Exception as e: logger.debug("The trace extractor returned with error %s", e) - _dsm_set_checkpoint(dd_ctx, "kinesis", source_arn) - return context if context else extract_context_from_lambda_context(lambda_context) + if config.data_streams_enabled: + _dsm_set_checkpoint(dd_ctx, "kinesis", source_arn) + if not config.data_streams_enabled: + break + return ( + apm_context + if apm_context + else extract_context_from_lambda_context(lambda_context) + ) def _deterministic_sha256_hash(s: str, part: str) -> int: From f2b2dc3478012a5e43f4e84351eb82700097754d Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Thu, 7 Aug 2025 12:33:06 -0400 Subject: [PATCH 14/16] return early --- datadog_lambda/tracing.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index cc73c7d0..0c97eb28 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -258,16 +258,15 @@ def extract_context_from_sqs_or_sns_event_or_context( try: dd_ctx = _extract_context_from_sqs_or_sns_record(record) if apm_context is None: - if not dd_ctx: - apm_context = _extract_context_from_xray(record) - - elif dd_ctx and is_step_function_event(dd_ctx): + if dd_ctx and is_step_function_event(dd_ctx): try: return extract_context_from_step_functions(dd_ctx, None) except Exception: logger.debug( "Failed to extract Step Functions context from SQS/SNS event." ) + elif not dd_ctx: + apm_context = _extract_context_from_xray(record) else: apm_context = propagator.extract(dd_ctx) except Exception as e: From 1a1a1d9316024ed2d87371c5203df6d48931a01f Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 8 Aug 2025 10:15:42 -0400 Subject: [PATCH 15/16] suggested refactoring --- datadog_lambda/tracing.py | 61 ++++++++++++++++++++------------------- 1 file changed, 32 insertions(+), 29 deletions(-) diff --git a/datadog_lambda/tracing.py b/datadog_lambda/tracing.py index 0c97eb28..aa94a20e 100644 --- a/datadog_lambda/tracing.py +++ b/datadog_lambda/tracing.py @@ -258,17 +258,7 @@ def extract_context_from_sqs_or_sns_event_or_context( try: dd_ctx = _extract_context_from_sqs_or_sns_record(record) if apm_context is None: - if dd_ctx and is_step_function_event(dd_ctx): - try: - return extract_context_from_step_functions(dd_ctx, None) - except Exception: - logger.debug( - "Failed to extract Step Functions context from SQS/SNS event." - ) - elif not dd_ctx: - apm_context = _extract_context_from_xray(record) - else: - apm_context = propagator.extract(dd_ctx) + apm_context = _extract_apm_context(dd_ctx, record) except Exception as e: logger.debug("The trace extractor returned with error %s", e) if config.data_streams_enabled: @@ -325,26 +315,39 @@ def _extract_context_from_sqs_or_sns_record(record): return None -def _extract_context_from_xray(record): - attrs = record.get("attributes") - if attrs: - x_ray_header = attrs.get("AWSTraceHeader") - if x_ray_header: - x_ray_context = parse_xray_header(x_ray_header) - trace_id_parts = x_ray_context.get("trace_id", "").split("-") - if len(trace_id_parts) > 2 and trace_id_parts[2].startswith( - DD_TRACE_JAVA_TRACE_ID_PADDING - ): - # If it starts with eight 0's padding, - # then this AWSTraceHeader contains Datadog injected trace context +def _extract_apm_context(dd_ctx, record): + if dd_ctx: + if is_step_function_event(dd_ctx): + try: + return extract_context_from_step_functions(dd_ctx, None) + except Exception: logger.debug( - "Found dd-trace injected trace context from AWSTraceHeader" - ) - return Context( - trace_id=int(trace_id_parts[2][8:], 16), - span_id=int(x_ray_context["parent_id"], 16), - sampling_priority=float(x_ray_context["sampled"]), + "Failed to extract Step Functions context from SQS/SNS event." ) + else: + return propagator.extract(dd_ctx) + else: + # Handle case where trace context is injected into attributes.AWSTraceHeader + # example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1 + attrs = record.get("attributes") + if attrs: + x_ray_header = attrs.get("AWSTraceHeader") + if x_ray_header: + x_ray_context = parse_xray_header(x_ray_header) + trace_id_parts = x_ray_context.get("trace_id", "").split("-") + if len(trace_id_parts) > 2 and trace_id_parts[2].startswith( + DD_TRACE_JAVA_TRACE_ID_PADDING + ): + # If it starts with eight 0's padding, + # then this AWSTraceHeader contains Datadog injected trace context + logger.debug( + "Found dd-trace injected trace context from AWSTraceHeader" + ) + return Context( + trace_id=int(trace_id_parts[2][8:], 16), + span_id=int(x_ray_context["parent_id"], 16), + sampling_priority=float(x_ray_context["sampled"]), + ) return None From 6bcbd0a13b5c5a28e232b800ab2f2dde5a82b0ff Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 8 Aug 2025 10:28:48 -0400 Subject: [PATCH 16/16] more tests --- tests/test_tracing.py | 249 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 202 insertions(+), 47 deletions(-) diff --git a/tests/test_tracing.py b/tests/test_tracing.py index 6eb463a6..a149c5ee 100644 --- a/tests/test_tracing.py +++ b/tests/test_tracing.py @@ -2743,6 +2743,73 @@ def test_sqs_batch_processing(self): carrier_get_2 = args_2[2] self.assertEqual(carrier_get_2("dd-pathway-ctx-base64"), "record2") + def test_sqs_batch_processing_with_invalid_records(self): + dd_data_1 = {"dd-pathway-ctx-base64": "valid_record"} + dd_json_data_1 = json.dumps(dd_data_1) + + dd_data_3 = {"dd-pathway-ctx-base64": "another_valid_record"} + dd_json_data_3 = json.dumps(dd_data_3) + + event = { + "Records": [ + { + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue", + "messageAttributes": { + "_datadog": { + "dataType": "String", + "stringValue": dd_json_data_1, + } + }, + "eventSource": "aws:sqs", + }, + { + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue", + "messageAttributes": { + "_datadog": { + "dataType": "Binary", + # This will cause extraction to fail + "binaryValue": "invalid-base64-data", + } + }, + "eventSource": "aws:sqs", + }, + { + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue", + "messageAttributes": { + "_datadog": { + "dataType": "String", + "stringValue": dd_json_data_3, + } + }, + "eventSource": "aws:sqs", + }, + ] + } + + extract_context_from_sqs_or_sns_event_or_context( + event, self.lambda_context, parse_event_source(event) + ) + + self.assertEqual(self.mock_checkpoint.call_count, 3) + + args_1, _ = self.mock_checkpoint.call_args_list[0] + self.assertEqual(args_1[0], "sqs") + self.assertEqual(args_1[1], "arn:aws:sqs:us-east-1:123456789012:test-queue") + carrier_get_1 = args_1[2] + self.assertEqual(carrier_get_1("dd-pathway-ctx-base64"), "valid_record") + + args_2, _ = self.mock_checkpoint.call_args_list[1] + self.assertEqual(args_2[0], "sqs") + self.assertEqual(args_2[1], "arn:aws:sqs:us-east-1:123456789012:test-queue") + carrier_get_2 = args_2[2] + self.assertEqual(carrier_get_2("dd-pathway-ctx-base64"), None) + + args_3, _ = self.mock_checkpoint.call_args_list[2] + self.assertEqual(args_3[0], "sqs") + self.assertEqual(args_3[1], "arn:aws:sqs:us-east-1:123456789012:test-queue") + carrier_get_3 = args_3[2] + self.assertEqual(carrier_get_3("dd-pathway-ctx-base64"), "another_valid_record") + def test_sqs_source_arn_not_found(self): event = { "Records": [ @@ -3019,53 +3086,6 @@ def test_sns_data_streams_disabled(self): self.mock_checkpoint.assert_not_called() - def test_sns_batch_processing(self): - dd_data_1 = {"dd-pathway-ctx-base64": "record1"} - dd_data_2 = {"dd-pathway-ctx-base64": "record2"} - dd_json_data_1 = json.dumps(dd_data_1) - dd_json_data_2 = json.dumps(dd_data_2) - - event = { - "Records": [ - { - "Sns": { - "TopicArn": "arn:aws:sns:us-east-1:123456789012:test-topic", - "MessageAttributes": { - "_datadog": {"Type": "String", "Value": dd_json_data_1} - }, - }, - "eventSource": "aws:sns", - }, - { - "Sns": { - "TopicArn": "arn:aws:sns:us-east-1:123456789012:test-topic", - "MessageAttributes": { - "_datadog": {"Type": "String", "Value": dd_json_data_2} - }, - }, - "eventSource": "aws:sns", - }, - ] - } - - extract_context_from_sqs_or_sns_event_or_context( - event, self.lambda_context, parse_event_source(event) - ) - - self.assertEqual(self.mock_checkpoint.call_count, 2) - - args_1, _ = self.mock_checkpoint.call_args_list[0] - self.assertEqual(args_1[0], "sns") - self.assertEqual(args_1[1], "arn:aws:sns:us-east-1:123456789012:test-topic") - carrier_get_1 = args_1[2] - self.assertEqual(carrier_get_1("dd-pathway-ctx-base64"), "record1") - - args_2, _ = self.mock_checkpoint.call_args_list[1] - self.assertEqual(args_2[0], "sns") - self.assertEqual(args_2[1], "arn:aws:sns:us-east-1:123456789012:test-topic") - carrier_get_2 = args_2[2] - self.assertEqual(carrier_get_2("dd-pathway-ctx-base64"), "record2") - # SNS -> SQS TESTS def test_sns_to_sqs_context_propagated_string_value(self): @@ -3362,6 +3382,83 @@ def test_sns_to_sqs_batch_processing(self): carrier_get_2 = args_2[2] self.assertEqual(carrier_get_2("dd-pathway-ctx-base64"), "record2") + def test_sns_to_sqs_batch_processing_with_invalid_records(self): + dd_data_1 = {"dd-pathway-ctx-base64": "valid_sns_record"} + dd_json_data_1 = json.dumps(dd_data_1) + + sns_message_1 = { + "Type": "Notification", + "TopicArn": "arn:aws:sns:us-east-1:123456789012:test-topic", + "MessageAttributes": { + "_datadog": {"Type": "String", "Value": dd_json_data_1} + }, + } + + sns_message_2 = { + "Type": "Notification", + "TopicArn": "arn:aws:sns:us-east-1:123456789012:test-topic", + "MessageAttributes": { + "_datadog": {"Type": "Binary", "Value": "invalid-base64-data"} + }, + } + + dd_data_3 = {"dd-pathway-ctx-base64": "another_valid_sns_record"} + dd_json_data_3 = json.dumps(dd_data_3) + + sns_message_3 = { + "Type": "Notification", + "TopicArn": "arn:aws:sns:us-east-1:123456789012:test-topic", + "MessageAttributes": { + "_datadog": {"Type": "String", "Value": dd_json_data_3} + }, + } + + event = { + "Records": [ + { + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue", + "body": json.dumps(sns_message_1), + "eventSource": "aws:sqs", + }, + { + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue", + "body": json.dumps(sns_message_2), + "eventSource": "aws:sqs", + }, + { + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:test-queue", + "body": json.dumps(sns_message_3), + "eventSource": "aws:sqs", + }, + ] + } + + extract_context_from_sqs_or_sns_event_or_context( + event, self.lambda_context, parse_event_source(event) + ) + + self.assertEqual(self.mock_checkpoint.call_count, 3) + + args_1, _ = self.mock_checkpoint.call_args_list[0] + self.assertEqual(args_1[0], "sqs") + self.assertEqual(args_1[1], "arn:aws:sqs:us-east-1:123456789012:test-queue") + carrier_get_1 = args_1[2] + self.assertEqual(carrier_get_1("dd-pathway-ctx-base64"), "valid_sns_record") + + args_2, _ = self.mock_checkpoint.call_args_list[1] + self.assertEqual(args_2[0], "sqs") + self.assertEqual(args_2[1], "arn:aws:sqs:us-east-1:123456789012:test-queue") + carrier_get_2 = args_2[2] + self.assertEqual(carrier_get_2("dd-pathway-ctx-base64"), None) + + args_3, _ = self.mock_checkpoint.call_args_list[2] + self.assertEqual(args_3[0], "sqs") + self.assertEqual(args_3[1], "arn:aws:sqs:us-east-1:123456789012:test-queue") + carrier_get_3 = args_3[2] + self.assertEqual( + carrier_get_3("dd-pathway-ctx-base64"), "another_valid_sns_record" + ) + def test_sns_to_sqs_source_arn_not_found(self): sns_notification = { "Type": "Notification", @@ -3574,6 +3671,64 @@ def test_kinesis_batch_processing(self): carrier_get_2 = args_2[2] self.assertEqual(carrier_get_2("dd-pathway-ctx-base64"), "record2") + def test_kinesis_batch_processing_with_invalid_records(self): + dd_data_1 = {"dd-pathway-ctx-base64": "valid_kinesis_record"} + kinesis_data_1 = {"_datadog": dd_data_1, "message": "test1"} + encoded_data_1 = base64.b64encode(json.dumps(kinesis_data_1).encode()).decode() + + dd_data_3 = {"dd-pathway-ctx-base64": "another_valid_kinesis_record"} + kinesis_data_3 = {"_datadog": dd_data_3, "message": "test3"} + encoded_data_3 = base64.b64encode(json.dumps(kinesis_data_3).encode()).decode() + + event = { + "Records": [ + { + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream", + "kinesis": {"data": encoded_data_1}, + }, + { + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream", + "kinesis": { + "data": "invalid-base64-data" + }, # This will cause extraction to fail + }, + { + "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream", + "kinesis": {"data": encoded_data_3}, + }, + ] + } + + extract_context_from_kinesis_event(event, self.lambda_context) + + self.assertEqual(self.mock_checkpoint.call_count, 3) + + args_1, _ = self.mock_checkpoint.call_args_list[0] + self.assertEqual(args_1[0], "kinesis") + self.assertEqual( + args_1[1], "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream" + ) + carrier_get_1 = args_1[2] + self.assertEqual(carrier_get_1("dd-pathway-ctx-base64"), "valid_kinesis_record") + + args_2, _ = self.mock_checkpoint.call_args_list[1] + self.assertEqual(args_2[0], "kinesis") + self.assertEqual( + args_2[1], "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream" + ) + carrier_get_2 = args_2[2] + self.assertEqual(carrier_get_2("dd-pathway-ctx-base64"), None) + + args_3, _ = self.mock_checkpoint.call_args_list[2] + self.assertEqual(args_3[0], "kinesis") + self.assertEqual( + args_3[1], "arn:aws:kinesis:us-east-1:123456789012:stream/test-stream" + ) + carrier_get_3 = args_3[2] + self.assertEqual( + carrier_get_3("dd-pathway-ctx-base64"), "another_valid_kinesis_record" + ) + def test_kinesis_source_arn_not_found(self): kinesis_data = {"message": "test"} kinesis_data_str = json.dumps(kinesis_data)