Skip to content

Commit f38673b

Browse files
original for loop approach
1 parent 643d1b3 commit f38673b

File tree

1 file changed

+114
-152
lines changed

1 file changed

+114
-152
lines changed

datadog_lambda/tracing.py

Lines changed: 114 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -68,32 +68,6 @@
6868
LOWER_64_BITS = "LOWER_64_BITS"
6969

7070

71-
def _dsm_set_context_sqs_event(event):
72-
for record in event.get("Records", [])[1:]:
73-
if arn := record.get("eventSourceARN"):
74-
try:
75-
context = _extract_context_from_sqs_or_sns_record(record)
76-
except Exception as e:
77-
logger.debug(
78-
f"DSM: Failed to extract context with error {e}, will still set checkpoint"
79-
)
80-
context = None
81-
_dsm_set_checkpoint(context, "sqs", arn)
82-
83-
84-
def _dsm_set_context_kinesis_event(event):
85-
for record in event.get("Records", [])[1:]:
86-
if (arn := record.get("eventSourceARN")) and (kinesis := record.get("kinesis")):
87-
try:
88-
context = _extract_context_from_kinesis_record(kinesis)
89-
except Exception as e:
90-
logger.debug(
91-
f"DSM: Failed to extract context with error {e}, will still set checkpoint"
92-
)
93-
context = None
94-
_dsm_set_checkpoint(context, "kinesis", arn)
95-
96-
9771
def _dsm_set_checkpoint(context_json, event_type, arn):
9872
if not config.data_streams_enabled:
9973
return
@@ -263,6 +237,7 @@ def extract_context_from_sqs_or_sns_event_or_context(
263237
Falls back to lambda context if no trace data is found in the SQS message attributes.
264238
Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
265239
"""
240+
source_arn = ""
266241
event_type = "sqs" if event_source.equals(EventTypes.SQS) else "sns"
267242

268243
# EventBridge => SQS
@@ -273,106 +248,98 @@ def extract_context_from_sqs_or_sns_event_or_context(
273248
except Exception:
274249
logger.debug("Failed extracting context as EventBridge to SQS.")
275250

276-
try:
277-
first_record = event.get("Records")[0]
278-
source_arn = (
279-
first_record.get("Sns", {}).get("TopicArn")
280-
if event_type == "sns"
281-
else first_record.get("eventSourceARN")
282-
)
283-
dd_data = _extract_context_from_sqs_or_sns_record(first_record)
284-
if dd_data:
285-
if is_step_function_event(dd_data):
251+
context = None
252+
for idx, record in enumerate(event.get("Records", [])):
253+
try:
254+
source_arn = record.get("eventSourceARN", "")
255+
dd_ctx = None
256+
257+
# logic to deal with SNS => SQS event
258+
if "body" in record:
259+
body_str = record.get("body")
286260
try:
287-
return extract_context_from_step_functions(dd_data, None)
261+
body = json.loads(body_str)
262+
if body.get("Type", "") == "Notification" and "TopicArn" in body:
263+
logger.debug("Found SNS message inside SQS event")
264+
record = get_first_record(create_sns_event(body))
288265
except Exception:
266+
pass
267+
268+
msg_attributes = record.get("messageAttributes")
269+
if msg_attributes is None:
270+
sns_record = record.get("Sns") or {}
271+
# SNS->SQS event would extract SNS arn without this check
272+
if event_source.equals(EventTypes.SNS):
273+
source_arn = sns_record.get("TopicArn", "")
274+
msg_attributes = sns_record.get("MessageAttributes") or {}
275+
dd_payload = msg_attributes.get("_datadog")
276+
if dd_payload:
277+
# SQS uses dataType and binaryValue/stringValue
278+
# SNS uses Type and Value
279+
# fmt: off
280+
dd_json_data = None
281+
dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType")
282+
if dd_json_data_type == "Binary":
283+
import base64
284+
285+
dd_json_data = dd_payload.get("binaryValue") or dd_payload.get("Value")
286+
if dd_json_data:
287+
dd_json_data = base64.b64decode(dd_json_data)
288+
elif dd_json_data_type == "String":
289+
dd_json_data = dd_payload.get("stringValue") or dd_payload.get("Value")
290+
# fmt: on
291+
else:
289292
logger.debug(
290-
"Failed to extract Step Functions context from SQS/SNS event."
293+
"Datadog Lambda Python only supports extracting trace"
294+
"context from String or Binary SQS/SNS message attributes"
291295
)
292-
context = propagator.extract(dd_data)
293-
_dsm_set_checkpoint(dd_data, event_type, source_arn)
294-
# Batch Processing does not occur for SNS events
295-
if config.data_streams_enabled and event_type == "sqs":
296-
_dsm_set_context_sqs_event(event)
297-
return context
298-
else:
299-
# Handle case where trace context is injected into attributes.AWSTraceHeader
300-
# example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
301-
attrs = event.get("Records")[0].get("attributes")
302-
if attrs:
303-
x_ray_header = attrs.get("AWSTraceHeader")
304-
if x_ray_header:
305-
x_ray_context = parse_xray_header(x_ray_header)
306-
trace_id_parts = x_ray_context.get("trace_id", "").split("-")
307-
if len(trace_id_parts) > 2 and trace_id_parts[2].startswith(
308-
DD_TRACE_JAVA_TRACE_ID_PADDING
309-
):
310-
# If it starts with eight 0's padding,
311-
# then this AWSTraceHeader contains Datadog injected trace context
312-
logger.debug(
313-
"Found dd-trace injected trace context from AWSTraceHeader"
314-
)
315-
return Context(
316-
trace_id=int(trace_id_parts[2][8:], 16),
317-
span_id=int(x_ray_context["parent_id"], 16),
318-
sampling_priority=float(x_ray_context["sampled"]),
319-
)
320-
# Still want to set a DSM checkpoint even if DSM context not propagated
321-
_dsm_set_checkpoint(None, event_type, source_arn)
322-
# Batch Processing does not occur for SNS events
323-
if config.data_streams_enabled and event_type == "sqs":
324-
_dsm_set_context_sqs_event(event)
325-
return extract_context_from_lambda_context(lambda_context)
326-
except Exception as e:
327-
logger.debug("The trace extractor returned with error %s", e)
328-
# Still want to set a DSM checkpoint even if DSM context not propagated
329-
_dsm_set_checkpoint(None, event_type, source_arn)
330-
# Batch Processing does not occur for SNS events
331-
if config.data_streams_enabled and event_type == "sqs":
332-
_dsm_set_context_sqs_event(event)
333-
return extract_context_from_lambda_context(lambda_context)
334296

297+
if dd_json_data:
298+
dd_data = json.loads(dd_json_data)
299+
300+
if idx == 0:
301+
if is_step_function_event(dd_data):
302+
try:
303+
return extract_context_from_step_functions(
304+
dd_data, None
305+
)
306+
except Exception:
307+
logger.debug(
308+
"Failed to extract Step Functions context from SQS/SNS event."
309+
)
310+
context = propagator.extract(dd_data)
311+
if not config.data_streams_enabled:
312+
break
313+
dd_ctx = dd_data
314+
elif idx == 0:
315+
# Handle case where trace context is injected into attributes.AWSTraceHeader
316+
# example:Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
317+
attrs = event.get("Records")[0].get("attributes")
318+
if attrs:
319+
x_ray_header = attrs.get("AWSTraceHeader")
320+
if x_ray_header:
321+
x_ray_context = parse_xray_header(x_ray_header)
322+
trace_id_parts = x_ray_context.get("trace_id", "").split("-")
323+
if len(trace_id_parts) > 2 and trace_id_parts[2].startswith(
324+
DD_TRACE_JAVA_TRACE_ID_PADDING
325+
):
326+
# If it starts with eight 0's padding,
327+
# then this AWSTraceHeader contains Datadog injected trace context
328+
logger.debug(
329+
"Found dd-trace injected trace context from AWSTraceHeader"
330+
)
331+
return Context(
332+
trace_id=int(trace_id_parts[2][8:], 16),
333+
span_id=int(x_ray_context["parent_id"], 16),
334+
sampling_priority=float(x_ray_context["sampled"]),
335+
)
336+
except Exception as e:
337+
logger.debug("The trace extractor returned with error %s", e)
335338

336-
def _extract_context_from_sqs_or_sns_record(record):
337-
# logic to deal with SNS => SQS event
338-
if "body" in record:
339-
body_str = record.get("body")
340-
try:
341-
body = json.loads(body_str)
342-
if body.get("Type", "") == "Notification" and "TopicArn" in body:
343-
logger.debug("Found SNS message inside SQS event")
344-
record = get_first_record(create_sns_event(body))
345-
except Exception:
346-
pass
347-
348-
msg_attributes = record.get("messageAttributes")
349-
if msg_attributes is None:
350-
sns_record = record.get("Sns") or {}
351-
msg_attributes = sns_record.get("MessageAttributes") or {}
352-
dd_payload = msg_attributes.get("_datadog")
353-
if dd_payload:
354-
# SQS uses dataType and binaryValue/stringValue
355-
# SNS uses Type and Value
356-
dd_json_data = None
357-
dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType")
358-
if dd_json_data_type == "Binary":
359-
import base64
360-
361-
dd_json_data = dd_payload.get("binaryValue") or dd_payload.get("Value")
362-
if dd_json_data:
363-
dd_json_data = base64.b64decode(dd_json_data)
364-
elif dd_json_data_type == "String":
365-
dd_json_data = dd_payload.get("stringValue") or dd_payload.get("Value")
366-
else:
367-
logger.debug(
368-
"Datadog Lambda Python only supports extracting trace"
369-
"context from String or Binary SQS/SNS message attributes"
370-
)
339+
# Set DSM checkpoint once per record
340+
_dsm_set_checkpoint(dd_ctx, event_type, source_arn)
371341

372-
if dd_json_data:
373-
dd_data = json.loads(dd_json_data)
374-
return dd_data
375-
return None
342+
return context if context else extract_context_from_lambda_context(lambda_context)
376343

377344

378345
def _extract_context_from_eventbridge_sqs_event(event):
@@ -433,40 +400,35 @@ def extract_context_from_kinesis_event(event, lambda_context):
433400
Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
434401
"""
435402
source_arn = ""
436-
try:
437-
record = get_first_record(event)
438-
source_arn = record.get("eventSourceARN", "")
439-
kinesis = record.get("kinesis")
440-
if not kinesis:
441-
return extract_context_from_lambda_context(lambda_context)
442-
dd_ctx = _extract_context_from_kinesis_record(kinesis)
443-
if dd_ctx:
444-
context = propagator.extract(dd_ctx)
445-
_dsm_set_checkpoint(dd_ctx, "kinesis", source_arn)
446-
if config.data_streams_enabled:
447-
_dsm_set_context_kinesis_event(event)
448-
return context
449-
except Exception as e:
450-
logger.debug("The trace extractor returned with error %s", e)
451-
# Still want to set a DSM checkpoint even if DSM context not propagated
452-
_dsm_set_checkpoint(None, "kinesis", source_arn)
453-
if config.data_streams_enabled:
454-
_dsm_set_context_kinesis_event(event)
455-
return extract_context_from_lambda_context(lambda_context)
456-
457403

458-
def _extract_context_from_kinesis_record(record_kinesis_data):
459-
data = record_kinesis_data.get("data")
460-
if data:
461-
import base64
462-
463-
b64_bytes = data.encode("ascii")
464-
str_bytes = base64.b64decode(b64_bytes)
465-
data_str = str_bytes.decode("ascii")
466-
data_obj = json.loads(data_str)
467-
dd_ctx = data_obj.get("_datadog")
468-
return dd_ctx
469-
return None
404+
context = None
405+
for idx, record in enumerate(event.get("Records", [])):
406+
dd_ctx = None
407+
try:
408+
source_arn = record.get("eventSourceARN", "")
409+
kinesis = record.get("kinesis")
410+
if not kinesis:
411+
if idx == 0:
412+
return extract_context_from_lambda_context(lambda_context)
413+
continue
414+
data = kinesis.get("data")
415+
if data:
416+
import base64
417+
418+
b64_bytes = data.encode("ascii")
419+
str_bytes = base64.b64decode(b64_bytes)
420+
data_str = str_bytes.decode("ascii")
421+
data_obj = json.loads(data_str)
422+
dd_ctx = data_obj.get("_datadog")
423+
if dd_ctx:
424+
if idx == 0:
425+
context = propagator.extract(dd_ctx)
426+
if not config.data_streams_enabled:
427+
break
428+
except Exception as e:
429+
logger.debug("The trace extractor returned with error %s", e)
430+
_dsm_set_checkpoint(dd_ctx, "kinesis", source_arn)
431+
return context if context else extract_context_from_lambda_context(lambda_context)
470432

471433

472434
def _deterministic_sha256_hash(s: str, part: str) -> int:

0 commit comments

Comments
 (0)