diff --git a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java index b527b08e13f..789747216b0 100644 --- a/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java +++ b/communication/src/main/java/datadog/communication/ddagent/DDAgentFeaturesDiscovery.java @@ -365,7 +365,7 @@ public boolean supportsDebuggerDiagnostics() { return debuggerDiagnosticsEndpoint != null; } - boolean supportsDropping() { + public boolean supportsDropping() { return supportsDropping; } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 1280c0acf0d..2f2d5118110 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -23,6 +23,7 @@ import datadog.trace.common.writer.ddagent.DDAgentApi; import datadog.trace.core.CoreSpan; import datadog.trace.core.DDTraceCoreInfo; +import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.util.AgentTaskScheduler; import java.util.Collections; import java.util.List; @@ -61,15 +62,19 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private final long reportingInterval; private final TimeUnit reportingIntervalTimeUnit; private final DDAgentFeaturesDiscovery features; + private final HealthMetrics healthMetrics; private volatile AgentTaskScheduler.Scheduled cancellation; public ConflatingMetricsAggregator( - Config config, SharedCommunicationObjects sharedCommunicationObjects) { + Config config, + SharedCommunicationObjects sharedCommunicationObjects, + HealthMetrics healthMetrics) { this( config.getWellKnownTags(), config.getMetricsIgnoredResources(), sharedCommunicationObjects.featuresDiscovery(config), + healthMetrics, new OkHttpSink( sharedCommunicationObjects.okHttpClient, sharedCommunicationObjects.agentUrl.toString(), @@ -85,16 +90,27 @@ public ConflatingMetricsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, DDAgentFeaturesDiscovery features, + HealthMetrics healthMetric, Sink sink, int maxAggregates, int queueSize) { - this(wellKnownTags, ignoredResources, features, sink, maxAggregates, queueSize, 10, SECONDS); + this( + wellKnownTags, + ignoredResources, + features, + healthMetric, + sink, + maxAggregates, + queueSize, + 10, + SECONDS); } ConflatingMetricsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, DDAgentFeaturesDiscovery features, + HealthMetrics healthMetric, Sink sink, int maxAggregates, int queueSize, @@ -103,6 +119,7 @@ public ConflatingMetricsAggregator( this( ignoredResources, features, + healthMetric, sink, new SerializingMetricWriter(wellKnownTags, sink), maxAggregates, @@ -114,6 +131,7 @@ public ConflatingMetricsAggregator( ConflatingMetricsAggregator( Set ignoredResources, DDAgentFeaturesDiscovery features, + HealthMetrics healthMetric, Sink sink, MetricWriter metricWriter, int maxAggregates, @@ -126,6 +144,7 @@ public ConflatingMetricsAggregator( this.pending = new NonBlockingHashMap<>(maxAggregates * 4 / 3); this.keys = new NonBlockingHashMap<>(); this.features = features; + this.healthMetrics = healthMetric; this.sink = sink; this.aggregator = new Aggregator( @@ -215,17 +234,22 @@ public Future forceReport() { @Override public boolean publish(List> trace) { boolean forceKeep = false; + int counted = 0; if (features.supportsMetrics()) { for (CoreSpan span : trace) { boolean isTopLevel = span.isTopLevel(); if (shouldComputeMetric(span)) { if (ignoredResources.contains(span.getResourceName().toString())) { // skip publishing all children - return false; + forceKeep = false; + break; } + counted++; forceKeep |= publish(span, isTopLevel); } } + healthMetrics.onClientStatTraceComputed( + counted, trace.size(), features.supportsDropping() && !forceKeep); } return forceKeep; } @@ -314,18 +338,23 @@ public void close() { @Override public void onEvent(EventType eventType, String message) { + healthMetrics.onClientStatPayloadSent(); switch (eventType) { case DOWNGRADED: log.debug("Agent downgrade was detected"); disable(); + healthMetrics.onClientStatDowngraded(); break; case BAD_PAYLOAD: log.debug("bad metrics payload sent to trace agent: {}", message); + healthMetrics.onClientStatErrorReceived(); break; case ERROR: log.debug("trace agent errored receiving metrics payload: {}", message); + healthMetrics.onClientStatErrorReceived(); break; default: + break; } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java index 491cc12b1c0..09464310113 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java @@ -2,6 +2,7 @@ import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; +import datadog.trace.core.monitor.HealthMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -9,10 +10,12 @@ public class MetricsAggregatorFactory { private static final Logger log = LoggerFactory.getLogger(MetricsAggregatorFactory.class); public static MetricsAggregator createMetricsAggregator( - Config config, SharedCommunicationObjects sharedCommunicationObjects) { + Config config, + SharedCommunicationObjects sharedCommunicationObjects, + HealthMetrics healthMetrics) { if (config.isTracerMetricsEnabled()) { log.debug("tracer metrics enabled"); - return new ConflatingMetricsAggregator(config, sharedCommunicationObjects); + return new ConflatingMetricsAggregator(config, sharedCommunicationObjects, healthMetrics); } log.debug("tracer metrics disabled"); return NoOpMetricsAggregator.INSTANCE; diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java index 6b5ea0b5f10..ad35455d6c4 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java @@ -316,6 +316,7 @@ public static class CoreTracerBuilder { private Map baggageMapping; private int partialFlushMinSpans; private StatsDClient statsDClient; + private HealthMetrics healthMetrics; private TagInterceptor tagInterceptor; private boolean strictTraceWrites; private InstrumentationGateway instrumentationGateway; @@ -419,8 +420,8 @@ public CoreTracerBuilder tagInterceptor(TagInterceptor tagInterceptor) { return this; } - public CoreTracerBuilder statsDClient(TagInterceptor tagInterceptor) { - this.tagInterceptor = tagInterceptor; + public CoreTracerBuilder healthMetrics(HealthMetrics healthMetrics) { + this.healthMetrics = healthMetrics; return this; } @@ -522,6 +523,7 @@ public CoreTracer build() { baggageMapping, partialFlushMinSpans, statsDClient, + healthMetrics, tagInterceptor, strictTraceWrites, instrumentationGateway, @@ -553,6 +555,7 @@ private CoreTracer( final Map baggageMapping, final int partialFlushMinSpans, final StatsDClient statsDClient, + final HealthMetrics healthMetrics, final TagInterceptor tagInterceptor, final boolean strictTraceWrites, final InstrumentationGateway instrumentationGateway, @@ -580,6 +583,7 @@ private CoreTracer( baggageMapping, partialFlushMinSpans, statsDClient, + healthMetrics, tagInterceptor, strictTraceWrites, instrumentationGateway, @@ -610,6 +614,7 @@ private CoreTracer( final Map baggageMapping, final int partialFlushMinSpans, final StatsDClient statsDClient, + final HealthMetrics healthMetrics, final TagInterceptor tagInterceptor, final boolean strictTraceWrites, final InstrumentationGateway instrumentationGateway, @@ -698,11 +703,13 @@ private CoreTracer( config.isHealthMetricsEnabled() ? new MonitoringImpl(this.statsDClient, 10, SECONDS) : Monitoring.DISABLED; - healthMetrics = - config.isHealthMetricsEnabled() - ? new TracerHealthMetrics(this.statsDClient) - : HealthMetrics.NO_OP; - healthMetrics.start(); + this.healthMetrics = + healthMetrics != null + ? healthMetrics + : (config.isHealthMetricsEnabled() + ? new TracerHealthMetrics(this.statsDClient) + : HealthMetrics.NO_OP); + this.healthMetrics.start(); performanceMonitoring = config.isPerfMetricsEnabled() ? new MonitoringImpl(this.statsDClient, 10, SECONDS) @@ -715,7 +722,7 @@ private CoreTracer( config.getScopeDepthLimit(), config.isScopeStrictMode(), profilingContextIntegration, - healthMetrics); + this.healthMetrics); externalAgentLauncher = new ExternalAgentLauncher(config); @@ -740,7 +747,7 @@ private CoreTracer( if (writer == null) { this.writer = WriterFactory.createWriter( - config, sharedCommunicationObjects, sampler, singleSpanSampler, healthMetrics); + config, sharedCommunicationObjects, sampler, singleSpanSampler, this.healthMetrics); } else { this.writer = writer; } @@ -757,22 +764,23 @@ private CoreTracer( && (config.isCiVisibilityAgentlessEnabled() || featuresDiscovery.supportsEvpProxy())) { pendingTraceBuffer = PendingTraceBuffer.discarding(); traceCollectorFactory = - new StreamingTraceCollector.Factory(this, this.timeSource, healthMetrics); + new StreamingTraceCollector.Factory(this, this.timeSource, this.healthMetrics); } else { pendingTraceBuffer = strictTraceWrites ? PendingTraceBuffer.discarding() : PendingTraceBuffer.delaying( - this.timeSource, config, sharedCommunicationObjects, healthMetrics); + this.timeSource, config, sharedCommunicationObjects, this.healthMetrics); traceCollectorFactory = new PendingTrace.Factory( - this, pendingTraceBuffer, this.timeSource, strictTraceWrites, healthMetrics); + this, pendingTraceBuffer, this.timeSource, strictTraceWrites, this.healthMetrics); } pendingTraceBuffer.start(); sharedCommunicationObjects.whenReady(this.writer::start); - metricsAggregator = createMetricsAggregator(config, sharedCommunicationObjects); + metricsAggregator = + createMetricsAggregator(config, sharedCommunicationObjects, this.healthMetrics); // Schedule the metrics aggregator to begin reporting after a random delay of 1 to 10 seconds // (using milliseconds granularity.) This avoids a fleet of traced applications starting at the // same time from sending metrics in sync. diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java index 47ec862b258..ea6ffdebf0b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java @@ -73,6 +73,24 @@ public void onFailedSend( public void onLongRunningUpdate(final int dropped, final int write, final int expired) {} + /** + * Report that a trace has been used to compute client stats. + * + * @param countedSpan the number of spans used for the stat computation + * @param totalSpan the number of total spans in the trace + * @param dropped true if the trace can be dropped. Note: the PayloadDispatcher also count this. + * However, this counter will report how many p0 dropped we could achieve before that the span + * got sampled. + */ + public void onClientStatTraceComputed( + final int countedSpan, final int totalSpan, boolean dropped) {} + + public void onClientStatPayloadSent() {} + + public void onClientStatErrorReceived() {} + + public void onClientStatDowngraded() {} + /** @return Human-readable summary of the current health metrics. */ public String summary() { return ""; diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java index 20d07843ab2..127bbf1c3f4 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java @@ -132,6 +132,21 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable private final FixedSizeStripedLongCounter longRunningTracesExpired = CountersFactory.createFixedSizeStripedCounter(8); + private final FixedSizeStripedLongCounter clientStatsProcessedSpans = + CountersFactory.createFixedSizeStripedCounter(8); + private final FixedSizeStripedLongCounter clientStatsProcessedTraces = + CountersFactory.createFixedSizeStripedCounter(8); + private final FixedSizeStripedLongCounter clientStatsP0DroppedSpans = + CountersFactory.createFixedSizeStripedCounter(8); + private final FixedSizeStripedLongCounter clientStatsP0DroppedTraces = + CountersFactory.createFixedSizeStripedCounter(8); + private final FixedSizeStripedLongCounter clientStatsRequests = + CountersFactory.createFixedSizeStripedCounter(8); + private final FixedSizeStripedLongCounter clientStatsErrors = + CountersFactory.createFixedSizeStripedCounter(8); + private final FixedSizeStripedLongCounter clientStatsDowngrades = + CountersFactory.createFixedSizeStripedCounter(8); + private final StatsDClient statsd; private final long interval; private final TimeUnit units; @@ -360,6 +375,31 @@ private void onSendAttempt( } } + @Override + public void onClientStatTraceComputed(int countedSpans, int totalSpans, boolean dropped) { + clientStatsProcessedTraces.inc(); + clientStatsProcessedSpans.inc(countedSpans); + if (dropped) { + clientStatsP0DroppedTraces.inc(); + clientStatsP0DroppedSpans.inc(totalSpans); + } + } + + @Override + public void onClientStatPayloadSent() { + clientStatsRequests.inc(); + } + + @Override + public void onClientStatDowngraded() { + clientStatsDowngrades.inc(); + } + + @Override + public void onClientStatErrorReceived() { + clientStatsErrors.inc(); + } + @Override public void close() { if (null != cancellation) { @@ -488,6 +528,20 @@ public void run(TracerHealthMetrics target) { reportIfChanged( target.statsd, "long-running.expired", target.longRunningTracesExpired, NO_TAGS); + reportIfChanged( + target.statsd, "stats.traces_in", target.clientStatsProcessedTraces, NO_TAGS); + + reportIfChanged(target.statsd, "stats.spans_in", target.clientStatsProcessedSpans, NO_TAGS); + reportIfChanged( + target.statsd, "stats.p0_dropped_traces", target.clientStatsP0DroppedTraces, NO_TAGS); + reportIfChanged( + target.statsd, "stats.p0_dropped_spans", target.clientStatsP0DroppedSpans, NO_TAGS); + reportIfChanged( + target.statsd, "stats.flushed_payloads", target.clientStatsRequests, NO_TAGS); + reportIfChanged(target.statsd, "stats.flush_errors", target.clientStatsErrors, NO_TAGS); + reportIfChanged( + target.statsd, "stats.agent_downgrades", target.clientStatsDowngrades, NO_TAGS); + } catch (ArrayIndexOutOfBoundsException e) { log.warn( "previousCounts array needs resizing to at least {}, was {}", @@ -606,6 +660,21 @@ public String summary() { + "\nlongRunningTracesDropped=" + longRunningTracesDropped.get() + "\nlongRunningTracesExpired=" - + longRunningTracesExpired.get(); + + longRunningTracesExpired.get() + + "\n" + + "\nclientStatsRequests=" + + clientStatsRequests.get() + + "\nclientStatsErrors=" + + clientStatsErrors.get() + + "\nclientStatsDowngrades=" + + clientStatsDowngrades.get() + + "\nclientStatsP0DroppedSpans=" + + clientStatsP0DroppedSpans.get() + + "\nclientStatsP0DroppedTraces=" + + clientStatsP0DroppedTraces.get() + + "\nclientStatsProcessedSpans=" + + clientStatsProcessedSpans.get() + + "\nclientStatsProcessedTraces=" + + clientStatsProcessedTraces.get(); } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index 3e2875c2993..6c635dc393c 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -4,6 +4,7 @@ import datadog.communication.ddagent.DDAgentFeaturesDiscovery import datadog.trace.api.WellKnownTags import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString import datadog.trace.core.CoreSpan +import datadog.trace.core.monitor.HealthMetrics import datadog.trace.test.util.DDSpecification import spock.lang.Shared @@ -38,6 +39,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { wellKnownTags, empty, features, + HealthMetrics.NO_OP, sink, 10, queueSize, @@ -67,6 +69,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { wellKnownTags, [ignoredResourceName].toSet(), features, + HealthMetrics.NO_OP, sink, 10, queueSize, @@ -100,7 +103,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, sink, writer, 10, queueSize, reportingInterval, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: @@ -129,7 +132,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.spanKindsToComputedStats() >> ["client", "server", "producer", "consumer"] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, sink, writer, 10, queueSize, reportingInterval, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() when: @@ -165,7 +168,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, + ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, + sink, writer, 10, queueSize, reportingInterval, SECONDS) aggregator.start() @@ -202,7 +206,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, sink, writer, 10, queueSize, reportingInterval, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) long duration = 100 List trace = [ new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, duration, HTTP_OK), @@ -246,7 +250,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) long duration = 100 aggregator.start() @@ -283,7 +287,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) long duration = 100 aggregator.start() @@ -340,7 +344,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) long duration = 100 aggregator.start() @@ -384,7 +388,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, sink, writer, maxAggregates, queueSize, 1, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) long duration = 100 aggregator.start() @@ -419,7 +423,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, sink, writer, maxAggregates, queueSize, 1, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) long duration = 100 aggregator.start() @@ -457,7 +461,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, sink, writer, maxAggregates, queueSize, 1, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) long duration = 100 aggregator.start() @@ -488,7 +492,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, sink, writer, maxAggregates, queueSize, 1, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) aggregator.start() when: @@ -509,7 +513,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> false ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, sink, writer, 10, queueSize, 200, MILLISECONDS) + features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS) final spans = [ new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 10, HTTP_OK) ] @@ -541,7 +545,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - features, sink, writer, maxAggregates, queueSize, 1, SECONDS) + features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) when: def async = CompletableFuture.supplyAsync(new Supplier() { diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy index e75ee5f8564..4d50d7f156c 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy @@ -2,6 +2,7 @@ package datadog.trace.common.metrics import datadog.communication.ddagent.DDAgentFeaturesDiscovery import datadog.trace.api.WellKnownTags +import datadog.trace.core.monitor.HealthMetrics import datadog.trace.test.util.DDSpecification import org.openjdk.jol.info.GraphLayout import spock.lang.Requires @@ -32,6 +33,7 @@ class FootprintForkedTest extends DDSpecification { new WellKnownTags("runtimeid","hostname", "env", "service", "version","language"), [].toSet() as Set, features, + HealthMetrics.NO_OP, sink, 1000, 1000, diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy index fc47ba0ecfa..07f246bf9a9 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy @@ -2,6 +2,7 @@ package datadog.trace.common.metrics import datadog.communication.ddagent.SharedCommunicationObjects import datadog.trace.api.Config +import datadog.trace.core.monitor.HealthMetrics import datadog.trace.test.util.DDSpecification import okhttp3.HttpUrl @@ -14,7 +15,7 @@ class MetricsAggregatorFactoryTest extends DDSpecification { def sco = Mock(SharedCommunicationObjects) sco.agentUrl = HttpUrl.parse("http://localhost:8126") expect: - def aggregator = MetricsAggregatorFactory.createMetricsAggregator(config, sco) + def aggregator = MetricsAggregatorFactory.createMetricsAggregator(config, sco, HealthMetrics.NO_OP,) assert aggregator instanceof NoOpMetricsAggregator } @@ -25,7 +26,8 @@ class MetricsAggregatorFactoryTest extends DDSpecification { def sco = Mock(SharedCommunicationObjects) sco.agentUrl = HttpUrl.parse("http://localhost:8126") expect: - def aggregator = MetricsAggregatorFactory.createMetricsAggregator(config, sco) + def aggregator = MetricsAggregatorFactory.createMetricsAggregator(config, sco, HealthMetrics.NO_OP, + ) assert aggregator instanceof ConflatingMetricsAggregator } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsReliabilityTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsReliabilityTest.groovy index a8458099b8b..8c7c64a275b 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsReliabilityTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsReliabilityTest.groovy @@ -2,7 +2,11 @@ package datadog.trace.common.metrics import datadog.communication.ddagent.SharedCommunicationObjects import datadog.trace.api.Config +import datadog.trace.api.StatsDClient +import datadog.trace.core.monitor.HealthMetrics +import datadog.trace.core.monitor.TracerHealthMetrics import datadog.trace.core.test.DDCoreSpecification +import datadog.trace.util.Strings import java.util.concurrent.CountDownLatch @@ -11,12 +15,16 @@ import static datadog.trace.agent.test.server.http.TestHttpServer.httpServer class MetricsReliabilityTest extends DDCoreSpecification { static class State { - boolean agentMetricsAvailable + boolean agentMetricsAvailable = true + int statsResponseCode = 200 boolean receivedStats boolean receivedClientComputedHeader CountDownLatch latch - def reset(agentMetricsAvailable) { + String hash + + def reset(agentMetricsAvailable, statsResponseCode = 200) { this.agentMetricsAvailable = agentMetricsAvailable + this.statsResponseCode = statsResponseCode receivedStats = false receivedClientComputedHeader = false latch = new CountDownLatch(1) @@ -27,17 +35,18 @@ class MetricsReliabilityTest extends DDCoreSpecification { httpServer { handlers { get("/info") { - response.send('{"endpoints":[' + (state.agentMetricsAvailable ? '"/v0.6/stats", ' : '') - + '"/v0.4/traces"]}') + final def res = '{"endpoints":[' + (state.agentMetricsAvailable ? '"/v0.6/stats", ' : '') + '"/v0.4/traces"], "client_drop_p0s" : true}' + state.hash = Strings.sha256(res) + response.send(res) state.latch.countDown() } post("/v0.6/stats", { state.receivedStats = true - response.status(state.agentMetricsAvailable ? 200 : 404).send() + response.status(state.statsResponseCode).send() }) put("/v0.4/traces", { state.receivedClientComputedHeader = "true" == request.getHeader('Datadog-Client-Computed-Stats') - response.status(200).send() + response.status(200).addHeader("Datadog-Agent-State", state.hash).send() }) } } @@ -56,8 +65,8 @@ class MetricsReliabilityTest extends DDCoreSpecification { def sharedComm = new SharedCommunicationObjects() sharedComm.createRemaining(config) def featuresDiscovery = sharedComm.featuresDiscovery(config) - def tracer = tracerBuilder().sharedCommunicationObjects(sharedComm).config(config).build() - + def healthMetrics = new TracerHealthMetrics(StatsDClient.NO_OP) + def tracer = tracerBuilder().sharedCommunicationObjects(sharedComm).healthMetrics(healthMetrics).config(config).build() when: "metrics enabled and discovery is performed" featuresDiscovery.discover() @@ -73,9 +82,12 @@ class MetricsReliabilityTest extends DDCoreSpecification { then: "should have sent statistics and informed the agent that we calculate the stats" assert state.receivedClientComputedHeader assert state.receivedStats + // 1 trace processed. not a p0 drop (first time we see it). No errors + assertMetrics(healthMetrics, 1, 0, 1, 0, 0) + when: "simulate an agent downgrade" - state.reset(false) + state.reset(false, 404) tracer.startSpan("test", "test").finish() tracer.flush() tracer.flushMetrics() @@ -83,8 +95,12 @@ class MetricsReliabilityTest extends DDCoreSpecification { then: "a discovery should have done - we do not support anymore stats calculation" state.latch.await() assert !featuresDiscovery.supportsMetrics() + // 2 traces processed. 1 p0 dropped. 2 requests and 1 downgrade no errors + assertMetrics(healthMetrics, 2, 1, 2, 0, 1) + when: "a span is published" + state.reset(false) // we have a call to stats for the downgrade so let's reset the counter tracer.startSpan("test", "test").finish() tracer.flush() tracer.flushMetrics() @@ -92,6 +108,8 @@ class MetricsReliabilityTest extends DDCoreSpecification { then: "should have not sent statistics and informed the agent that we don't calculate the stats anymore" assert !state.receivedClientComputedHeader assert !state.receivedStats + // 2 traces processed. 1 p0 dropped. 2 requests and 1 downgrade no errors + assertMetrics(healthMetrics, 2, 1, 2, 0, 1) when: "we detect that the agent can calculate the stats again" state.reset(true) @@ -109,9 +127,47 @@ class MetricsReliabilityTest extends DDCoreSpecification { then: "we should have sent the stats and informed the agent to not calculate the stats on the trace payload" assert state.receivedClientComputedHeader assert state.receivedStats + // 3 traces processed. 2 p0 dropped. 3 requests and 1 downgrade no errors + assertMetrics(healthMetrics, 3, 2, 3, 0, 1) + + when: "an error occurred on the agent stats endpoint" + state.reset(true, 500) + tracer.startSpan("test", "test").finish() + tracer.flush() + tracer.flushMetrics() + + then: "the error counter is incremented" + assert state.receivedClientComputedHeader + assert state.receivedStats + // 4 traces processed. 3 p0 dropped. 4 requests and 1 downgrade - 1 error + assertMetrics(healthMetrics, 4, 3, 4, 1, 1) + + when: "the next call succeed" + state.reset(true) + tracer.startSpan("test", "test").setError(true).finish() + tracer.flush() + tracer.flushMetrics() + + then: "the request counter is incremented" + assert state.receivedClientComputedHeader + assert state.receivedStats + // 5 traces processed. 3 p0 dropped (this one is errored so it's not dropped). + // 5 requests and 1 downgrade - 1 error + assertMetrics(healthMetrics, 5, 3, 5, 1, 1) cleanup: tracer.close() agent.stop() } + + void assertMetrics(HealthMetrics healthMetrics, int traces, int drops, int requests, int errors, int downgrades) { + def summary = healthMetrics.summary() + assert summary.contains("clientStatsRequests=$requests") + assert summary.contains("clientStatsErrors=$errors") + assert summary.contains("clientStatsDowngrades=$downgrades") + assert summary.contains("clientStatsP0DroppedSpans=$drops") + assert summary.contains("clientStatsP0DroppedTraces=$drops") + assert summary.contains("clientStatsProcessedSpans=$traces") + assert summary.contains("clientStatsProcessedTraces=$traces") + } }