Skip to content

Take defensive copy of parent scope stack when closing nested coroutines #8749

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ public void restoreThreadContext(

@Override
public ScopeState updateThreadContext(@NotNull final CoroutineContext coroutineContext) {
final ScopeState oldScopeState = AgentTracer.get().newScopeState();
oldScopeState.fetchFromActive();
final ScopeState oldScopeState = AgentTracer.get().oldScopeState();

final Job coroutine = CoroutineContextHelper.getJob(coroutineContext);
final ScopeStateCoroutineContextItem contextItem = contextItemPerCoroutine.get(coroutine);
Expand All @@ -53,22 +52,20 @@ public ScopeState updateThreadContext(@NotNull final CoroutineContext coroutineC

/** If there's a context item for the coroutine then try to close it */
public void maybeCloseScopeAndCancelContinuation(final Job coroutine, final Job parent) {
final ScopeStateCoroutineContextItem contextItem = contextItemPerCoroutine.get(coroutine);
final ScopeStateCoroutineContextItem contextItem = contextItemPerCoroutine.remove(coroutine);
if (contextItem != null) {
ScopeState currentThreadScopeState = null;
if (parent != null) {
final ScopeStateCoroutineContextItem parentItem = contextItemPerCoroutine.get(parent);
if (parentItem != null) {
currentThreadScopeState = parentItem.getScopeState();
currentThreadScopeState = parentItem.maybeCopyScopeState();
}
}
if (currentThreadScopeState == null) {
currentThreadScopeState = AgentTracer.get().newScopeState();
currentThreadScopeState.fetchFromActive();
currentThreadScopeState = AgentTracer.get().oldScopeState();
}

contextItem.maybeCloseScopeAndCancelContinuation();
contextItemPerCoroutine.remove(coroutine);

currentThreadScopeState.activate();
}
Expand Down Expand Up @@ -111,16 +108,23 @@ public static class ScopeStateCoroutineContextItem {
@Nullable private AgentScope.Continuation continuation;
@Nullable private AgentScope continuationScope;
private boolean isInitialized = false;
private volatile Thread activatedOn;

public ScopeStateCoroutineContextItem() {
coroutineScopeState = AgentTracer.get().newScopeState();
}

public ScopeState getScopeState() {
return coroutineScopeState;
public ScopeState maybeCopyScopeState() {
// take defensive copy of scope stack if on different thread
if (activatedOn != null && activatedOn != Thread.currentThread()) {
return coroutineScopeState.copy();
} else {
return coroutineScopeState;
}
}

public void activate() {
activatedOn = Thread.currentThread();
coroutineScopeState.activate();

if (continuation != null && continuationScope == null) {
Expand All @@ -144,6 +148,7 @@ public void maybeInitialize() {
* scope and cancels the continuation.
*/
public void maybeCloseScopeAndCancelContinuation() {
// only temporary activation, will be replaced by another activate in caller
coroutineScopeState.activate();

if (continuationScope != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ public void onSuspend() {
}

public void onResume() {
this.oldState = AgentTracer.get().newScopeState();
this.oldState.fetchFromActive();
this.oldState = AgentTracer.get().oldScopeState();

this.state.activate();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,11 @@ public EndpointTracker onRootSpanStarted(AgentSpan root) {
return null;
}

@Override
public ScopeState oldScopeState() {
return scopeManager.oldScopeState();
}

@Override
public ScopeState newScopeState() {
return scopeManager.newScopeState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,14 @@ ScopeStack scopeStack() {
return this.tlsScopeStack.get();
}

@Override
public ScopeState oldScopeState() {
return new ContinuableScopeState(tlsScopeStack.get());
}

@Override
public ScopeState newScopeState() {
return new ContinuableScopeState();
return new ContinuableScopeState(tlsScopeStack.initialValue());
}

@Override
Expand All @@ -371,17 +376,20 @@ public Context swap(Context context) {
}

private class ContinuableScopeState implements ScopeState {
private final ScopeStack localScopeStack;

private ScopeStack localScopeStack = tlsScopeStack.initialValue();
ContinuableScopeState(ScopeStack scopeStack) {
this.localScopeStack = scopeStack;
}

@Override
public void activate() {
tlsScopeStack.set(localScopeStack);
}

@Override
public void fetchFromActive() {
localScopeStack = tlsScopeStack.get();
public ScopeState copy() {
return new ContinuableScopeState(localScopeStack.copy());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ final class ScopeStack {
this.profilingContextIntegration = profilingContextIntegration;
}

ScopeStack copy() {
ScopeStack copy = new ScopeStack(profilingContextIntegration);
copy.stack.addAll(stack);
copy.top = top;
copy.overdueRootScope = overdueRootScope;
return copy;
}

ContinuableScope active() {
// avoid attaching further spans to the root scope when it's been marked as overdue
return top != overdueRootScope ? top : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ class ScopeManagerTest extends DDCoreSpecification {

def "scope state should be able to fetch and activate state when there is no active span"() {
when:
def initialScopeState = scopeManager.newScopeState()
initialScopeState.fetchFromActive()
def initialScopeState = scopeManager.oldScopeState()

then:
scopeManager.active() == null
Expand Down Expand Up @@ -125,8 +124,7 @@ class ScopeManagerTest extends DDCoreSpecification {
when:
def span = tracer.buildSpan("test", "test").start()
def scope = tracer.activateSpan(span)
def initialScopeState = scopeManager.newScopeState()
initialScopeState.fetchFromActive()
def initialScopeState = scopeManager.oldScopeState()

then:
scope.span() == span
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,11 @@ public AgentSpanContext notifyExtensionStart(Object event) {
@Override
public void notifyExtensionEnd(AgentSpan span, Object result, boolean isError) {}

@Override
public ScopeState oldScopeState() {
return null;
}

@Override
public ScopeState newScopeState() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@
public interface ScopeState {
void activate();

void fetchFromActive();
ScopeState copy();
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package datadog.trace.bootstrap.instrumentation.api;

public interface ScopeStateAware {
/** @return The old connected scope stack */
ScopeState oldScopeState();

/** @return A new disconnected scope stack */
ScopeState newScopeState();
}