Skip to content

Commit adf13d0

Browse files
Introduced basic trace post-processing
1 parent 39767bb commit adf13d0

File tree

7 files changed

+71
-0
lines changed

7 files changed

+71
-0
lines changed

dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@
77

88
import datadog.communication.ddagent.DroppingPolicy;
99
import datadog.trace.api.Config;
10+
import datadog.trace.api.gateway.CallbackProvider;
11+
import datadog.trace.api.gateway.Events;
12+
import datadog.trace.api.gateway.Flow;
13+
import datadog.trace.api.gateway.RequestContext;
14+
import datadog.trace.api.gateway.RequestContextSlot;
15+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
1016
import datadog.trace.common.sampling.SingleSpanSampler;
1117
import datadog.trace.common.writer.ddagent.FlushEvent;
1218
import datadog.trace.common.writer.ddagent.Prioritization;
@@ -17,6 +23,7 @@
1723
import java.util.List;
1824
import java.util.concurrent.CountDownLatch;
1925
import java.util.concurrent.TimeUnit;
26+
import java.util.function.Function;
2027
import org.jctools.queues.MessagePassingQueue;
2128
import org.jctools.queues.MpscBlockingConsumerArrayQueue;
2229
import org.slf4j.Logger;
@@ -235,6 +242,18 @@ public void onEvent(Object event) {
235242
try {
236243
if (event instanceof List) {
237244
List<DDSpan> trace = (List<DDSpan>) event;
245+
246+
// Do post-processing if required and priority > 0
247+
DDSpan rootSpan = trace.get(0);
248+
if (rootSpan != null
249+
&& rootSpan.isRequiresPostProcessing()
250+
&& rootSpan.samplingPriority() > 0) {
251+
CallbackProvider cbp = AgentTracer.get().getCallbackProvider(RequestContextSlot.APPSEC);
252+
Function<RequestContext, Flow<Void>> reqPostProcCb =
253+
cbp.getCallback(Events.EVENTS.requestPostProcessing());
254+
reqPostProcCb.apply(rootSpan.getRequestContext());
255+
}
256+
238257
// TODO populate `_sample_rate` metric in a way that accounts for lost/dropped traces
239258
payloadDispatcher.addTrace(trace);
240259
} else if (event instanceof FlushEvent) {

dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ static DDSpan create(
103103

104104
// the request is to be blocked (AppSec)
105105
private volatile Flow.Action.RequestBlockingAction requestBlockingAction;
106+
private boolean requiresPostProcessing;
106107

107108
/**
108109
* Version of a span that can be set by the long running spans feature:
@@ -399,6 +400,16 @@ public Flow.Action.RequestBlockingAction getRequestBlockingAction() {
399400
return requestBlockingAction;
400401
}
401402

403+
@Override
404+
public void setRequiresPostProcessing(boolean postProcessing) {
405+
this.requiresPostProcessing = postProcessing;
406+
}
407+
408+
@Override
409+
public boolean isRequiresPostProcessing() {
410+
return this.requiresPostProcessing;
411+
}
412+
402413
@Override
403414
public DDSpan setTag(final String tag, final int value) {
404415
// can't use tag interceptor because it might set a metric

internal-api/src/main/java/datadog/trace/api/gateway/Events.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,17 @@ public EventType<BiFunction<RequestContext, Object, Flow<Void>>> grpcServerReque
189189
return (EventType<BiFunction<RequestContext, Object, Flow<Void>>>) GRPC_SERVER_REQUEST_MESSAGE;
190190
}
191191

192+
static final int REQUEST_POST_PROCESSING_ID = 15;
193+
194+
@SuppressWarnings("rawtypes")
195+
private static final EventType REQUEST_POST_PROCESSING =
196+
new ET<>("request.post.processing", REQUEST_POST_PROCESSING_ID);
197+
/** The request has been processed and is ready to be sent to the backend */
198+
@SuppressWarnings("unchecked")
199+
public EventType<Function<RequestContext, Flow<Void>>> requestPostProcessing() {
200+
return (EventType<Function<RequestContext, Flow<Void>>>) REQUEST_POST_PROCESSING;
201+
}
202+
192203
static final int MAX_EVENTS = nextId.get();
193204

194205
private static final class ET<T> extends EventType<T> {

internal-api/src/main/java/datadog/trace/api/gateway/IGSpanInfo.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,8 @@ public interface IGSpanInfo {
1616
void setRequestBlockingAction(Flow.Action.RequestBlockingAction rba);
1717

1818
Flow.Action.RequestBlockingAction getRequestBlockingAction();
19+
20+
void setRequiresPostProcessing(boolean postProcessing);
21+
22+
boolean isRequiresPostProcessing();
1923
}

internal-api/src/main/java/datadog/trace/api/gateway/InstrumentationGateway.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import static datadog.trace.api.gateway.Events.REQUEST_INFERRED_CLIENT_ADDRESS_ID;
1313
import static datadog.trace.api.gateway.Events.REQUEST_METHOD_URI_RAW_ID;
1414
import static datadog.trace.api.gateway.Events.REQUEST_PATH_PARAMS_ID;
15+
import static datadog.trace.api.gateway.Events.REQUEST_POST_PROCESSING_ID;
1516
import static datadog.trace.api.gateway.Events.REQUEST_STARTED_ID;
1617
import static datadog.trace.api.gateway.Events.RESPONSE_HEADER_DONE_ID;
1718
import static datadog.trace.api.gateway.Events.RESPONSE_HEADER_ID;
@@ -358,6 +359,19 @@ public Flow<Void> apply(RequestContext ctx, Integer status) {
358359
}
359360
}
360361
};
362+
case REQUEST_POST_PROCESSING_ID:
363+
return (C)
364+
new Function<RequestContext, Flow<Void>>() {
365+
@Override
366+
public Flow<Void> apply(RequestContext ctx) {
367+
try {
368+
return ((Function<RequestContext, Flow<Void>>) callback).apply(ctx);
369+
} catch (Throwable t) {
370+
log.warn("Callback for {} threw.", eventType, t);
371+
return Flow.ResultFlow.empty();
372+
}
373+
}
374+
};
361375
default:
362376
log.warn("Unwrapped callback for {}", eventType);
363377
return callback;

internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentTracer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,14 @@ public Flow.Action.RequestBlockingAction getRequestBlockingAction() {
578578
return null;
579579
}
580580

581+
@Override
582+
public void setRequiresPostProcessing(boolean postProcessing) {}
583+
584+
@Override
585+
public boolean isRequiresPostProcessing() {
586+
return false;
587+
}
588+
581589
@Override
582590
public AgentSpan setTag(final String tag, final Number value) {
583591
return this;

internal-api/src/test/java/datadog/trace/api/gateway/InstrumentationGatewayTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,8 @@ public void testNormalCalls() {
194194
cbp.getCallback(events.responseHeader()).accept(null, null, null);
195195
ss.registerCallback(events.responseHeaderDone(), callback.function);
196196
cbp.getCallback(events.responseHeaderDone()).apply(null);
197+
ss.registerCallback(events.requestPostProcessing(), callback.function);
198+
cbp.getCallback(events.requestPostProcessing()).apply(null);
197199
assertThat(callback.count).isEqualTo(Events.MAX_EVENTS);
198200
}
199201

@@ -240,6 +242,8 @@ public void testThrowableBlocking() {
240242
cbp.getCallback(events.responseHeader()).accept(null, null, null);
241243
ss.registerCallback(events.responseHeaderDone(), throwback.function);
242244
cbp.getCallback(events.responseHeaderDone()).apply(null);
245+
ss.registerCallback(events.requestPostProcessing(), throwback.function);
246+
cbp.getCallback(events.requestPostProcessing()).apply(null);
243247
assertThat(throwback.count).isEqualTo(Events.MAX_EVENTS);
244248
}
245249

0 commit comments

Comments
 (0)