Skip to content

fix(dsm): set dsm checkpoint for all records in event #643

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

michael-zhao459
Copy link
Collaborator

@michael-zhao459 michael-zhao459 commented Jul 30, 2025

What does this PR do?

This PR

Sets a DSM checkpoint for every single record in a event. Does this by extracting out a helper that gets datadog context per record, and if DSM is enabled continues to loop starting with the second record and setting DSM checkpoints (always using the first record for APM).

Motivation

Screenshot 2025-07-30 at 2 19 19 PM

Please note the discrepancy between the msg/s from incoming produce and outgoing by downstream queue. When batch processing, if the consume service does not set a checkpoint for each message coming from upstream we lose track of them causing the noticeable drop in throughput.

Testing Guidelines

All of the tests in #622 this table are maintained. Ensured no regressions by making sure that all test_tracing.py continued to pass. Tested on sandbox AWS account for all queue types to see context propagation and ensure throughput matches. Added tests to show that DSM can now handle multiple records in a event.

Additional Notes

Types of Changes

  • Bug fix
  • New feature
  • Breaking change
  • Misc (docs, refactoring, dependency upgrade, etc.)

Check all that apply

  • This PR's description is comprehensive
  • This PR contains breaking changes that are documented in the description
  • This PR introduces new APIs or parameters that are documented and unlikely to change in the foreseeable future
  • This PR impacts documentation, and it has been updated (or a ticket has been logged)
  • This PR's changes are covered by the automated tests
  • This PR collects user input/sensitive content into Datadog
  • This PR passes the integration tests (ask a Datadog member to run the tests)

@michael-zhao459 michael-zhao459 changed the title set dsm checkpoint for all records in array fix(dsm): set dsm checkpoint for all records in array Jul 30, 2025
Copy link

@piochelepiotr piochelepiotr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The big question I guess is do we:

  1. Couple a bit more APM & DSM, but avoid de-serializing two times the datadog context for the first record
  2. Couple APM & DSM less, but at the cost of de-serializing two times the datadog context for the first record

not event_source.equals(EventTypes.KINESIS)
and not event_source.equals(EventTypes.SNS)
and not event_source.equals(EventTypes.SQS)
):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we expect this to happen? If not, let's add a debug log here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree the double de-serialization introduce some performance cost. But since we only need 1 extra deserialization for tracing purpose, and one JSON.loads(50 bytes) is at the level of nanosecond cost. So I feel comfortable not worrying about it at all. I'd say decoupling and better code structure is worth more. Also for lambdas, cold start costs are more critical than this. With that being said,...

  1. If in the future we need to deserialize all the tracecontext for spanlinks for example, that would bring the cost to ms level for each invocation and we might want to refactor the code to do it only once then.
  2. Might be too late to mention, but I start to think maybe datadog-lambda-js is a better starting place for refactoring like this because there we are using a bunch of extractors and could be easier to refactor.

@michael-zhao459 michael-zhao459 force-pushed the michael.zhao/dsm-ckpt-all-records branch from 69bbcd8 to c0906a0 Compare August 4, 2025 20:05
@michael-zhao459 michael-zhao459 changed the title fix(dsm): set dsm checkpoint for all records in array fix(dsm): set dsm checkpoint for all records in event Aug 4, 2025
)
# Handle case where trace context is injected into attributes.AWSTraceHeader
# example:Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
attrs = event.get("Records")[0].get("attributes")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here, it's accessing records[0] again. I would put this whole section in an else if idx == 0 (line 315)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
attrs = event.get("Records")[0].get("attributes")
attrs = record.get("attributes")

)
if idx == 0:
context = propagator.extract(dd_data)
dsm_data = dd_data
else:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this else, dsm_data is not set. Is that an issue?

Copy link
Collaborator Author

@michael-zhao459 michael-zhao459 Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not an issue. I checked in dd-trace-py and DSM never injects context into attributes.AWSTraceHeader, we can just set a checkpoint with None

)
context = None
for idx, record in enumerate(records):
dsm_data = None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not specific to dsm, it's dd_ctx

if idx == 0
else context
)
_dsm_set_checkpoint(None, "kinesis", source_arn)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are setting a checkpoint kinesis if not kinesis, I think the name kinesis is wrong, because this code is a bit confusing.

Copy link
Collaborator Author

@michael-zhao459 michael-zhao459 Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. This is deep enough inside the extract function where we believe that the event source is from Kinesis from parsing beforehand. However, all AWS documentation says Kinesis lambda event should have this field. To my understanding, this check is for lambda synchronous invocations with records that match Kinesis, but doesn't actually come from a Kinesis stream. @DataDog/apm-serverless Can you help confirm why this check is here in the first place?

for idx, record in enumerate(records):
try:
source_arn = record.get("eventSourceARN", "")
dsm_data = None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as bellow, dsm_data is not specific to data streams here. I would name it same as bellow, dd_ctx, or something like that.

if dd_json_data_type == "Binary":
import base64
context = None
records = (

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe let's have records always be: event.get("Records", []), however, in the loop, we can break early if data streams is disabled.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if index == 0:
     do apm stuff
     if data streams is enabled:
         break

@michael-zhao459 michael-zhao459 force-pushed the michael.zhao/dsm-ckpt-all-records branch from f38673b to 60ca41b Compare August 6, 2025 15:06
@michael-zhao459 michael-zhao459 force-pushed the michael.zhao/dsm-ckpt-all-records branch from 60ca41b to 2f8dfaa Compare August 6, 2025 15:11
@DataDog DataDog deleted a comment from piochelepiotr Aug 6, 2025
)
# Handle case where trace context is injected into attributes.AWSTraceHeader
# example:Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
attrs = event.get("Records")[0].get("attributes")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
attrs = event.get("Records")[0].get("attributes")
attrs = record.get("attributes")

"Failed to extract Step Functions context from SQS/SNS event."
)
context = propagator.extract(dd_data)
if not config.data_streams_enabled:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this break is too hidden. It is in if dd_payload block?

I suggest this high level approach:

apm_context = None
for record in records:
  context = extract_context(record)
  if apm_context is None:
      apm_context = context
  if data_streams_enabled:
    set_checkpoint()
  if !data_streams_enabled:
    # APM only looks at the first record.
    break

You can break down the code into helper function to make that structure be very clear.
Basically, can you avoid some of the nested conditions? Like if not config.data_streams_enabled here?

# example:Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
attrs = event.get("Records")[0].get("attributes")
if attrs:
x_ray_header = attrs.get("AWSTraceHeader")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe put this logic in the extract_context I suggested above. The extract_context can take an argument: extract_from_xray?

(extract_context is probably not a great name, I let you find a better one)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants