Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
public class EnrichmentProcessor {

private static final Logger LOG = LoggerFactory.getLogger(EnrichmentProcessor.class);
private static final String ENRICHMENT_ARRIVAL_TIME = "enrichment.arrival.time";
private static final Timer enrichmentArrivalTimer =
private static final String ENRICHMENT_COMPLETION_TIME = "enrichment.completion.time";
private static final Timer enrichmentLagTimer =
PlatformMetricsRegistry.registerTimer(DataflowMetricUtils.ARRIVAL_LAG, new HashMap<>());

// Must use linked hashmap
Expand Down Expand Up @@ -68,8 +68,6 @@ public EnrichmentProcessor(List<EnricherInfo> enricherInfoList, ClientRegistry c

/** Enriches the Trace by Invoking various Enrichers registered in */
public void process(StructuredTrace trace) {
DataflowMetricUtils.reportArrivalLagAndInsertTimestamp(
trace, enrichmentArrivalTimer, ENRICHMENT_ARRIVAL_TIME);
AvroToJsonLogger.log(LOG, "Structured Trace before all the enrichment is: {}", trace);
for (Entry<String, Enricher> entry : enrichers.entrySet()) {
String metricKey = String.format("%s/%s", trace.getCustomerId(), entry.getKey());
Expand Down Expand Up @@ -101,6 +99,8 @@ public void process(StructuredTrace trace) {
}
}
AvroToJsonLogger.log(LOG, "Structured Trace after all the enrichment is: {}", trace);
DataflowMetricUtils.reportArrivalLagAndInsertTimestamp(
trace, enrichmentLagTimer, ENRICHMENT_COMPLETION_TIME);
}

private void applyEnricher(Enricher enricher, StructuredTrace trace) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class TraceEmitPunctuator implements Punctuator {
private static final Logger logger = LoggerFactory.getLogger(TraceEmitPunctuator.class);
private static final Object mutex = new Object();

private static final Timer spansGrouperArrivalLagTimer =
private static final Timer grouperLagTimer =
PlatformMetricsRegistry.registerTimer(DataflowMetricUtils.ARRIVAL_LAG, new HashMap<>());
private static final String TRACES_EMITTER_COUNTER = "hypertrace.emitted.traces";
private static final ConcurrentMap<String, Counter> tenantToTraceEmittedCounter =
Expand Down Expand Up @@ -164,12 +164,14 @@ public void punctuate(long timestamp) {
}

recordSpansPerTrace(rawSpanList.size(), List.of(Tag.of("tenant_id", tenantId)));
Timestamps timestamps =
trackEndToEndLatencyTimestamps(timestamp, traceState.getTraceStartTimestamp());
Timestamps timestamps = null;
if (rawSpanList.size() > 0) {
timestamps =
trackEndToEndLatencyTimestamps(timestamp, rawSpanList.get(0).getReceivedTimeMillis());
}
StructuredTrace trace =
StructuredTraceBuilder.buildStructuredTraceFromRawSpans(
rawSpanList, traceId, tenantId, timestamps);

if (logger.isDebugEnabled()) {
logger.debug(
"Emit tenant_id=[{}], trace_id=[{}], spans_count=[{}]",
Expand Down Expand Up @@ -237,9 +239,11 @@ private Timestamps trackEndToEndLatencyTimestamps(
long currentTimestamp, long firstSpanTimestamp) {
Timestamps timestamps = null;
if (!(Math.random() * 100 <= dataflowSamplingPercent)) {
spansGrouperArrivalLagTimer.record(
//Reports the time it took from span arrival to till trace is created and just before it is emitted.
grouperLagTimer.record(
currentTimestamp - firstSpanTimestamp, TimeUnit.MILLISECONDS);
Map<String, TimestampRecord> records = new HashMap<>();
//Saves the span arrival time in trace, so that it can be used by services downstream to calculate similar lag.
records.put(
DataflowMetricUtils.SPAN_ARRIVAL_TIME,
new TimestampRecord(DataflowMetricUtils.SPAN_ARRIVAL_TIME, firstSpanTimestamp));
Expand Down