diff --git a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/EnrichmentProcessor.java b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/EnrichmentProcessor.java index eda9f492b..d14ee9d14 100644 --- a/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/EnrichmentProcessor.java +++ b/hypertrace-trace-enricher/hypertrace-trace-enricher-impl/src/main/java/org/hypertrace/traceenricher/enrichment/EnrichmentProcessor.java @@ -29,8 +29,8 @@ public class EnrichmentProcessor { private static final Logger LOG = LoggerFactory.getLogger(EnrichmentProcessor.class); - private static final String ENRICHMENT_ARRIVAL_TIME = "enrichment.arrival.time"; - private static final Timer enrichmentArrivalTimer = + private static final String ENRICHMENT_COMPLETION_TIME = "enrichment.completion.time"; + private static final Timer enrichmentLagTimer = PlatformMetricsRegistry.registerTimer(DataflowMetricUtils.ARRIVAL_LAG, new HashMap<>()); // Must use linked hashmap @@ -68,8 +68,6 @@ public EnrichmentProcessor(List enricherInfoList, ClientRegistry c /** Enriches the Trace by Invoking various Enrichers registered in */ public void process(StructuredTrace trace) { - DataflowMetricUtils.reportArrivalLagAndInsertTimestamp( - trace, enrichmentArrivalTimer, ENRICHMENT_ARRIVAL_TIME); AvroToJsonLogger.log(LOG, "Structured Trace before all the enrichment is: {}", trace); for (Entry entry : enrichers.entrySet()) { String metricKey = String.format("%s/%s", trace.getCustomerId(), entry.getKey()); @@ -101,6 +99,8 @@ public void process(StructuredTrace trace) { } } AvroToJsonLogger.log(LOG, "Structured Trace after all the enrichment is: {}", trace); + DataflowMetricUtils.reportArrivalLagAndInsertTimestamp( + trace, enrichmentLagTimer, ENRICHMENT_COMPLETION_TIME); } private void applyEnricher(Enricher enricher, StructuredTrace trace) { diff --git a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java index 2c2872357..51721d756 100644 --- a/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java +++ b/raw-spans-grouper/raw-spans-grouper/src/main/java/org/hypertrace/core/rawspansgrouper/TraceEmitPunctuator.java @@ -49,7 +49,7 @@ class TraceEmitPunctuator implements Punctuator { private static final Logger logger = LoggerFactory.getLogger(TraceEmitPunctuator.class); private static final Object mutex = new Object(); - private static final Timer spansGrouperArrivalLagTimer = + private static final Timer grouperLagTimer = PlatformMetricsRegistry.registerTimer(DataflowMetricUtils.ARRIVAL_LAG, new HashMap<>()); private static final String TRACES_EMITTER_COUNTER = "hypertrace.emitted.traces"; private static final ConcurrentMap tenantToTraceEmittedCounter = @@ -164,12 +164,14 @@ public void punctuate(long timestamp) { } recordSpansPerTrace(rawSpanList.size(), List.of(Tag.of("tenant_id", tenantId))); - Timestamps timestamps = - trackEndToEndLatencyTimestamps(timestamp, traceState.getTraceStartTimestamp()); + Timestamps timestamps = null; + if (rawSpanList.size() > 0) { + timestamps = + trackEndToEndLatencyTimestamps(timestamp, rawSpanList.get(0).getReceivedTimeMillis()); + } StructuredTrace trace = StructuredTraceBuilder.buildStructuredTraceFromRawSpans( rawSpanList, traceId, tenantId, timestamps); - if (logger.isDebugEnabled()) { logger.debug( "Emit tenant_id=[{}], trace_id=[{}], spans_count=[{}]", @@ -237,9 +239,11 @@ private Timestamps trackEndToEndLatencyTimestamps( long currentTimestamp, long firstSpanTimestamp) { Timestamps timestamps = null; if (!(Math.random() * 100 <= dataflowSamplingPercent)) { - spansGrouperArrivalLagTimer.record( + //Reports the time it took from span arrival to till trace is created and just before it is emitted. + grouperLagTimer.record( currentTimestamp - firstSpanTimestamp, TimeUnit.MILLISECONDS); Map records = new HashMap<>(); + //Saves the span arrival time in trace, so that it can be used by services downstream to calculate similar lag. records.put( DataflowMetricUtils.SPAN_ARRIVAL_TIME, new TimestampRecord(DataflowMetricUtils.SPAN_ARRIVAL_TIME, firstSpanTimestamp));