Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ public boolean supportsDebuggerDiagnostics() {
return debuggerDiagnosticsEndpoint != null;
}

boolean supportsDropping() {
public boolean supportsDropping() {
return supportsDropping;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import datadog.trace.common.writer.ddagent.DDAgentApi;
import datadog.trace.core.CoreSpan;
import datadog.trace.core.DDTraceCoreInfo;
import datadog.trace.core.monitor.HealthMetrics;
import datadog.trace.util.AgentTaskScheduler;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -61,15 +62,19 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
private final long reportingInterval;
private final TimeUnit reportingIntervalTimeUnit;
private final DDAgentFeaturesDiscovery features;
private final HealthMetrics healthMetrics;

private volatile AgentTaskScheduler.Scheduled<?> cancellation;

public ConflatingMetricsAggregator(
Config config, SharedCommunicationObjects sharedCommunicationObjects) {
Config config,
SharedCommunicationObjects sharedCommunicationObjects,
HealthMetrics healthMetrics) {
this(
config.getWellKnownTags(),
config.getMetricsIgnoredResources(),
sharedCommunicationObjects.featuresDiscovery(config),
healthMetrics,
new OkHttpSink(
sharedCommunicationObjects.okHttpClient,
sharedCommunicationObjects.agentUrl.toString(),
Expand All @@ -85,16 +90,27 @@ public ConflatingMetricsAggregator(
WellKnownTags wellKnownTags,
Set<String> ignoredResources,
DDAgentFeaturesDiscovery features,
HealthMetrics healthMetric,
Sink sink,
int maxAggregates,
int queueSize) {
this(wellKnownTags, ignoredResources, features, sink, maxAggregates, queueSize, 10, SECONDS);
this(
wellKnownTags,
ignoredResources,
features,
healthMetric,
sink,
maxAggregates,
queueSize,
10,
SECONDS);
}

ConflatingMetricsAggregator(
WellKnownTags wellKnownTags,
Set<String> ignoredResources,
DDAgentFeaturesDiscovery features,
HealthMetrics healthMetric,
Sink sink,
int maxAggregates,
int queueSize,
Expand All @@ -103,6 +119,7 @@ public ConflatingMetricsAggregator(
this(
ignoredResources,
features,
healthMetric,
sink,
new SerializingMetricWriter(wellKnownTags, sink),
maxAggregates,
Expand All @@ -114,6 +131,7 @@ public ConflatingMetricsAggregator(
ConflatingMetricsAggregator(
Set<String> ignoredResources,
DDAgentFeaturesDiscovery features,
HealthMetrics healthMetric,
Sink sink,
MetricWriter metricWriter,
int maxAggregates,
Expand All @@ -126,6 +144,7 @@ public ConflatingMetricsAggregator(
this.pending = new NonBlockingHashMap<>(maxAggregates * 4 / 3);
this.keys = new NonBlockingHashMap<>();
this.features = features;
this.healthMetrics = healthMetric;
this.sink = sink;
this.aggregator =
new Aggregator(
Expand Down Expand Up @@ -215,17 +234,22 @@ public Future<Boolean> forceReport() {
@Override
public boolean publish(List<? extends CoreSpan<?>> trace) {
boolean forceKeep = false;
int counted = 0;
if (features.supportsMetrics()) {
for (CoreSpan<?> span : trace) {
boolean isTopLevel = span.isTopLevel();
if (shouldComputeMetric(span)) {
if (ignoredResources.contains(span.getResourceName().toString())) {
// skip publishing all children
return false;
forceKeep = false;
break;
}
counted++;
forceKeep |= publish(span, isTopLevel);
}
}
healthMetrics.onClientStatTraceComputed(
counted, trace.size(), features.supportsDropping() && !forceKeep);
}
return forceKeep;
}
Expand Down Expand Up @@ -314,18 +338,23 @@ public void close() {

@Override
public void onEvent(EventType eventType, String message) {
healthMetrics.onClientStatPayloadSent();
switch (eventType) {
case DOWNGRADED:
log.debug("Agent downgrade was detected");
disable();
healthMetrics.onClientStatDowngraded();
break;
case BAD_PAYLOAD:
log.debug("bad metrics payload sent to trace agent: {}", message);
healthMetrics.onClientStatErrorReceived();
break;
case ERROR:
log.debug("trace agent errored receiving metrics payload: {}", message);
healthMetrics.onClientStatErrorReceived();
break;
default:
break;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@

import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.trace.api.Config;
import datadog.trace.core.monitor.HealthMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetricsAggregatorFactory {
private static final Logger log = LoggerFactory.getLogger(MetricsAggregatorFactory.class);

public static MetricsAggregator createMetricsAggregator(
Config config, SharedCommunicationObjects sharedCommunicationObjects) {
Config config,
SharedCommunicationObjects sharedCommunicationObjects,
HealthMetrics healthMetrics) {
if (config.isTracerMetricsEnabled()) {
log.debug("tracer metrics enabled");
return new ConflatingMetricsAggregator(config, sharedCommunicationObjects);
return new ConflatingMetricsAggregator(config, sharedCommunicationObjects, healthMetrics);
}
log.debug("tracer metrics disabled");
return NoOpMetricsAggregator.INSTANCE;
Expand Down
34 changes: 21 additions & 13 deletions dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ public static class CoreTracerBuilder {
private Map<String, String> baggageMapping;
private int partialFlushMinSpans;
private StatsDClient statsDClient;
private HealthMetrics healthMetrics;
private TagInterceptor tagInterceptor;
private boolean strictTraceWrites;
private InstrumentationGateway instrumentationGateway;
Expand Down Expand Up @@ -419,8 +420,8 @@ public CoreTracerBuilder tagInterceptor(TagInterceptor tagInterceptor) {
return this;
}

public CoreTracerBuilder statsDClient(TagInterceptor tagInterceptor) {
this.tagInterceptor = tagInterceptor;
public CoreTracerBuilder healthMetrics(HealthMetrics healthMetrics) {
this.healthMetrics = healthMetrics;
return this;
}

Expand Down Expand Up @@ -522,6 +523,7 @@ public CoreTracer build() {
baggageMapping,
partialFlushMinSpans,
statsDClient,
healthMetrics,
tagInterceptor,
strictTraceWrites,
instrumentationGateway,
Expand Down Expand Up @@ -553,6 +555,7 @@ private CoreTracer(
final Map<String, String> baggageMapping,
final int partialFlushMinSpans,
final StatsDClient statsDClient,
final HealthMetrics healthMetrics,
final TagInterceptor tagInterceptor,
final boolean strictTraceWrites,
final InstrumentationGateway instrumentationGateway,
Expand Down Expand Up @@ -580,6 +583,7 @@ private CoreTracer(
baggageMapping,
partialFlushMinSpans,
statsDClient,
healthMetrics,
tagInterceptor,
strictTraceWrites,
instrumentationGateway,
Expand Down Expand Up @@ -610,6 +614,7 @@ private CoreTracer(
final Map<String, String> baggageMapping,
final int partialFlushMinSpans,
final StatsDClient statsDClient,
final HealthMetrics healthMetrics,
final TagInterceptor tagInterceptor,
final boolean strictTraceWrites,
final InstrumentationGateway instrumentationGateway,
Expand Down Expand Up @@ -698,11 +703,13 @@ private CoreTracer(
config.isHealthMetricsEnabled()
? new MonitoringImpl(this.statsDClient, 10, SECONDS)
: Monitoring.DISABLED;
healthMetrics =
config.isHealthMetricsEnabled()
? new TracerHealthMetrics(this.statsDClient)
: HealthMetrics.NO_OP;
healthMetrics.start();
this.healthMetrics =
healthMetrics != null
? healthMetrics
: (config.isHealthMetricsEnabled()
? new TracerHealthMetrics(this.statsDClient)
: HealthMetrics.NO_OP);
this.healthMetrics.start();
performanceMonitoring =
config.isPerfMetricsEnabled()
? new MonitoringImpl(this.statsDClient, 10, SECONDS)
Expand All @@ -715,7 +722,7 @@ private CoreTracer(
config.getScopeDepthLimit(),
config.isScopeStrictMode(),
profilingContextIntegration,
healthMetrics);
this.healthMetrics);

externalAgentLauncher = new ExternalAgentLauncher(config);

Expand All @@ -740,7 +747,7 @@ private CoreTracer(
if (writer == null) {
this.writer =
WriterFactory.createWriter(
config, sharedCommunicationObjects, sampler, singleSpanSampler, healthMetrics);
config, sharedCommunicationObjects, sampler, singleSpanSampler, this.healthMetrics);
} else {
this.writer = writer;
}
Expand All @@ -757,22 +764,23 @@ private CoreTracer(
&& (config.isCiVisibilityAgentlessEnabled() || featuresDiscovery.supportsEvpProxy())) {
pendingTraceBuffer = PendingTraceBuffer.discarding();
traceCollectorFactory =
new StreamingTraceCollector.Factory(this, this.timeSource, healthMetrics);
new StreamingTraceCollector.Factory(this, this.timeSource, this.healthMetrics);
} else {
pendingTraceBuffer =
strictTraceWrites
? PendingTraceBuffer.discarding()
: PendingTraceBuffer.delaying(
this.timeSource, config, sharedCommunicationObjects, healthMetrics);
this.timeSource, config, sharedCommunicationObjects, this.healthMetrics);
traceCollectorFactory =
new PendingTrace.Factory(
this, pendingTraceBuffer, this.timeSource, strictTraceWrites, healthMetrics);
this, pendingTraceBuffer, this.timeSource, strictTraceWrites, this.healthMetrics);
}
pendingTraceBuffer.start();

sharedCommunicationObjects.whenReady(this.writer::start);

metricsAggregator = createMetricsAggregator(config, sharedCommunicationObjects);
metricsAggregator =
createMetricsAggregator(config, sharedCommunicationObjects, this.healthMetrics);
// Schedule the metrics aggregator to begin reporting after a random delay of 1 to 10 seconds
// (using milliseconds granularity.) This avoids a fleet of traced applications starting at the
// same time from sending metrics in sync.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,24 @@ public void onFailedSend(

public void onLongRunningUpdate(final int dropped, final int write, final int expired) {}

/**
* Report that a trace has been used to compute client stats.
*
* @param countedSpan the number of spans used for the stat computation
* @param totalSpan the number of total spans in the trace
* @param dropped true if the trace can be dropped. Note: the PayloadDispatcher also count this.
* However, this counter will report how many p0 dropped we could achieve before that the span
* got sampled.
*/
public void onClientStatTraceComputed(
final int countedSpan, final int totalSpan, boolean dropped) {}

public void onClientStatPayloadSent() {}

public void onClientStatErrorReceived() {}

public void onClientStatDowngraded() {}

/** @return Human-readable summary of the current health metrics. */
public String summary() {
return "";
Expand Down
Loading
Loading