Skip to content

Commit c0906a0

Browse files
set checkpoint for all messages one loop approach
1 parent 7bdc7d6 commit c0906a0

File tree

2 files changed

+326
-104
lines changed

2 files changed

+326
-104
lines changed

datadog_lambda/tracing.py

Lines changed: 131 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -248,91 +248,104 @@ def extract_context_from_sqs_or_sns_event_or_context(
248248
except Exception:
249249
logger.debug("Failed extracting context as EventBridge to SQS.")
250250

251-
try:
252-
first_record = event.get("Records")[0]
253-
source_arn = first_record.get("eventSourceARN", "")
254-
255-
# logic to deal with SNS => SQS event
256-
if "body" in first_record:
257-
body_str = first_record.get("body")
258-
try:
259-
body = json.loads(body_str)
260-
if body.get("Type", "") == "Notification" and "TopicArn" in body:
261-
logger.debug("Found SNS message inside SQS event")
262-
first_record = get_first_record(create_sns_event(body))
263-
except Exception:
264-
pass
265-
266-
msg_attributes = first_record.get("messageAttributes")
267-
if msg_attributes is None:
268-
sns_record = first_record.get("Sns") or {}
269-
# SNS->SQS event would extract SNS arn without this check
270-
if event_source.equals(EventTypes.SNS):
271-
source_arn = sns_record.get("TopicArn", "")
272-
msg_attributes = sns_record.get("MessageAttributes") or {}
273-
dd_payload = msg_attributes.get("_datadog")
274-
if dd_payload:
275-
# SQS uses dataType and binaryValue/stringValue
276-
# SNS uses Type and Value
277-
dd_json_data = None
278-
dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType")
279-
if dd_json_data_type == "Binary":
280-
import base64
251+
context = None
252+
records = (
253+
event.get("Records", [])
254+
if config.data_streams_enabled
255+
else [event.get("Records")[0]]
256+
)
257+
is_first_record = True
258+
for record in records:
259+
try:
260+
source_arn = record.get("eventSourceARN", "")
261+
dsm_data = None
262+
263+
# logic to deal with SNS => SQS event
264+
if "body" in record:
265+
body_str = record.get("body")
266+
try:
267+
body = json.loads(body_str)
268+
if body.get("Type", "") == "Notification" and "TopicArn" in body:
269+
logger.debug("Found SNS message inside SQS event")
270+
record = get_first_record(create_sns_event(body))
271+
except Exception:
272+
pass
273+
274+
msg_attributes = record.get("messageAttributes")
275+
if msg_attributes is None:
276+
sns_record = record.get("Sns") or {}
277+
# SNS->SQS event would extract SNS arn without this check
278+
if event_source.equals(EventTypes.SNS):
279+
source_arn = sns_record.get("TopicArn", "")
280+
msg_attributes = sns_record.get("MessageAttributes") or {}
281+
dd_payload = msg_attributes.get("_datadog")
282+
if dd_payload:
283+
# SQS uses dataType and binaryValue/stringValue
284+
# SNS uses Type and Value
285+
dd_json_data = None
286+
dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType")
287+
if dd_json_data_type == "Binary":
288+
import base64
289+
290+
dd_json_data = dd_payload.get("binaryValue") or dd_payload.get(
291+
"Value"
292+
)
293+
if dd_json_data:
294+
dd_json_data = base64.b64decode(dd_json_data)
295+
elif dd_json_data_type == "String":
296+
dd_json_data = dd_payload.get("stringValue") or dd_payload.get(
297+
"Value"
298+
)
299+
else:
300+
logger.debug(
301+
"Datadog Lambda Python only supports extracting trace"
302+
"context from String or Binary SQS/SNS message attributes"
303+
)
281304

282-
dd_json_data = dd_payload.get("binaryValue") or dd_payload.get("Value")
283305
if dd_json_data:
284-
dd_json_data = base64.b64decode(dd_json_data)
285-
elif dd_json_data_type == "String":
286-
dd_json_data = dd_payload.get("stringValue") or dd_payload.get("Value")
306+
dd_data = json.loads(dd_json_data)
307+
308+
if is_step_function_event(dd_data):
309+
try:
310+
return extract_context_from_step_functions(dd_data, None)
311+
except Exception:
312+
logger.debug(
313+
"Failed to extract Step Functions context from SQS/SNS event."
314+
)
315+
if is_first_record:
316+
context = propagator.extract(dd_data)
317+
dsm_data = dd_data
287318
else:
288-
logger.debug(
289-
"Datadog Lambda Python only supports extracting trace"
290-
"context from String or Binary SQS/SNS message attributes"
291-
)
319+
# Handle case where trace context is injected into attributes.AWSTraceHeader
320+
# example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
321+
attrs = event.get("Records")[0].get("attributes")
322+
if attrs:
323+
x_ray_header = attrs.get("AWSTraceHeader")
324+
if x_ray_header:
325+
x_ray_context = parse_xray_header(x_ray_header)
326+
trace_id_parts = x_ray_context.get("trace_id", "").split("-")
327+
if len(trace_id_parts) > 2 and trace_id_parts[2].startswith(
328+
DD_TRACE_JAVA_TRACE_ID_PADDING
329+
):
330+
# If it starts with eight 0's padding,
331+
# then this AWSTraceHeader contains Datadog injected trace context
332+
logger.debug(
333+
"Found dd-trace injected trace context from AWSTraceHeader"
334+
)
335+
if is_first_record:
336+
context = Context(
337+
trace_id=int(trace_id_parts[2][8:], 16),
338+
span_id=int(x_ray_context["parent_id"], 16),
339+
sampling_priority=float(x_ray_context["sampled"]),
340+
)
341+
except Exception as e:
342+
logger.debug("The trace extractor returned with error %s", e)
292343

293-
if dd_json_data:
294-
dd_data = json.loads(dd_json_data)
344+
# Set DSM checkpoint once per record
345+
_dsm_set_checkpoint(dsm_data, event_type, source_arn)
346+
is_first_record = False
295347

296-
if is_step_function_event(dd_data):
297-
try:
298-
return extract_context_from_step_functions(dd_data, None)
299-
except Exception:
300-
logger.debug(
301-
"Failed to extract Step Functions context from SQS/SNS event."
302-
)
303-
context = propagator.extract(dd_data)
304-
_dsm_set_checkpoint(dd_data, event_type, source_arn)
305-
return context
306-
else:
307-
# Handle case where trace context is injected into attributes.AWSTraceHeader
308-
# example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
309-
attrs = event.get("Records")[0].get("attributes")
310-
if attrs:
311-
x_ray_header = attrs.get("AWSTraceHeader")
312-
if x_ray_header:
313-
x_ray_context = parse_xray_header(x_ray_header)
314-
trace_id_parts = x_ray_context.get("trace_id", "").split("-")
315-
if len(trace_id_parts) > 2 and trace_id_parts[2].startswith(
316-
DD_TRACE_JAVA_TRACE_ID_PADDING
317-
):
318-
# If it starts with eight 0's padding,
319-
# then this AWSTraceHeader contains Datadog injected trace context
320-
logger.debug(
321-
"Found dd-trace injected trace context from AWSTraceHeader"
322-
)
323-
return Context(
324-
trace_id=int(trace_id_parts[2][8:], 16),
325-
span_id=int(x_ray_context["parent_id"], 16),
326-
sampling_priority=float(x_ray_context["sampled"]),
327-
)
328-
# Still want to set a DSM checkpoint even if DSM context not propagated
329-
_dsm_set_checkpoint(None, event_type, source_arn)
330-
return extract_context_from_lambda_context(lambda_context)
331-
except Exception as e:
332-
logger.debug("The trace extractor returned with error %s", e)
333-
# Still want to set a DSM checkpoint even if DSM context not propagated
334-
_dsm_set_checkpoint(None, event_type, source_arn)
335-
return extract_context_from_lambda_context(lambda_context)
348+
return context if context else extract_context_from_lambda_context(lambda_context)
336349

337350

338351
def _extract_context_from_eventbridge_sqs_event(event):
@@ -393,30 +406,44 @@ def extract_context_from_kinesis_event(event, lambda_context):
393406
Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
394407
"""
395408
source_arn = ""
396-
try:
397-
record = get_first_record(event)
398-
source_arn = record.get("eventSourceARN", "")
399-
kinesis = record.get("kinesis")
400-
if not kinesis:
401-
return extract_context_from_lambda_context(lambda_context)
402-
data = kinesis.get("data")
403-
if data:
404-
import base64
405-
406-
b64_bytes = data.encode("ascii")
407-
str_bytes = base64.b64decode(b64_bytes)
408-
data_str = str_bytes.decode("ascii")
409-
data_obj = json.loads(data_str)
410-
dd_ctx = data_obj.get("_datadog")
411-
if dd_ctx:
412-
context = propagator.extract(dd_ctx)
413-
_dsm_set_checkpoint(dd_ctx, "kinesis", source_arn)
414-
return context
415-
except Exception as e:
416-
logger.debug("The trace extractor returned with error %s", e)
417-
# Still want to set a DSM checkpoint even if DSM context not propagated
418-
_dsm_set_checkpoint(None, "kinesis", source_arn)
419-
return extract_context_from_lambda_context(lambda_context)
409+
records = (
410+
[get_first_record(event)]
411+
if not config.data_streams_enabled
412+
else event.get("Records")
413+
)
414+
context = None
415+
is_first_record = True
416+
for record in records:
417+
dsm_data = None
418+
try:
419+
source_arn = record.get("eventSourceARN", "")
420+
kinesis = record.get("kinesis")
421+
if not kinesis:
422+
context = (
423+
extract_context_from_lambda_context(lambda_context)
424+
if is_first_record
425+
else context
426+
)
427+
is_first_record = False
428+
continue
429+
data = kinesis.get("data")
430+
if data:
431+
import base64
432+
433+
b64_bytes = data.encode("ascii")
434+
str_bytes = base64.b64decode(b64_bytes)
435+
data_str = str_bytes.decode("ascii")
436+
data_obj = json.loads(data_str)
437+
dd_ctx = data_obj.get("_datadog")
438+
if dd_ctx:
439+
if is_first_record:
440+
context = propagator.extract(dd_ctx)
441+
dsm_data = dd_ctx
442+
except Exception as e:
443+
logger.debug("The trace extractor returned with error %s", e)
444+
_dsm_set_checkpoint(dsm_data, "kinesis", source_arn)
445+
is_first_record = False
446+
return context if context else extract_context_from_lambda_context(lambda_context)
420447

421448

422449
def _deterministic_sha256_hash(s: str, part: str) -> int:

0 commit comments

Comments
 (0)