From 0fb713d7bdd2f7711f5d3736ad4da1d7ee2711fb Mon Sep 17 00:00:00 2001 From: Pawel Leszczynski Date: Mon, 8 Sep 2025 10:32:37 +0200 Subject: [PATCH] Fix tracer OpenLineage instrumentation Signed-off-by: Pawel Leszczynski --- .../spark/AbstractDatadogSparkListener.java | 34 +++++++++++-- .../spark/AbstractSparkListenerTest.groovy | 49 +++++++++++++++++++ 2 files changed, 80 insertions(+), 3 deletions(-) diff --git a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java index 5ac7f524ce7..50a9614cee1 100644 --- a/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java +++ b/dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java @@ -7,6 +7,7 @@ import datadog.trace.api.Config; import datadog.trace.api.DDTags; import datadog.trace.api.DDTraceId; +import datadog.trace.api.ProcessTags; import datadog.trace.api.datastreams.DataStreamsTags; import datadog.trace.api.sampling.PrioritySampling; import datadog.trace.api.sampling.SamplingMechanism; @@ -180,7 +181,9 @@ public void setupOpenLineage(DDTraceId traceId) { "_dd.trace_id:" + traceId.toString() + ";_dd.ol_intake.emit_spans:false;_dd.ol_service:" - + sparkServiceName + + getServiceForOpenLineage(sparkConf, isRunningOnDatabricks) + + ";_dd.ol_intake.process_tags:" + + ProcessTags.getTagsForSerialization() + ";_dd.ol_app_id:" + appId); return; @@ -669,6 +672,9 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) { } } + // OpenLineage call should be prior to method return statements + notifyOl(x -> openLineageSparkListener.onTaskEnd(x), taskEnd); + // Only sending failing tasks if (!(taskEnd.reason() instanceof TaskFailedReason)) { return; @@ -687,8 +693,6 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) { Properties props = stageProperties.get(stageSpanKey); sendTaskSpan(stageSpan, taskEnd, props); - - notifyOl(x -> openLineageSparkListener.onTaskEnd(x), taskEnd); } public static boolean classIsLoadable(String className) { @@ -1292,6 +1296,30 @@ private static String getSparkServiceName(SparkConf conf, boolean isRunningOnDat return sparkAppName; } + private static String getServiceForOpenLineage(SparkConf conf, boolean isRunningOnDatabricks) { + // Service for OpenLineage in Databricks is not supported yet + if (isRunningOnDatabricks) { + return null; + } + + // Keep service set by user, except if it is only "spark" or "hadoop" that can be set by USM + String serviceName = Config.get().getServiceName(); + if (Config.get().isServiceNameSetByUser() + && !"spark".equals(serviceName) + && !"hadoop".equals(serviceName)) { + log.debug("Service '{}' explicitly set by user, not using the application name", serviceName); + return serviceName; + } + + String sparkAppName = conf.get("spark.app.name", null); + if (sparkAppName != null) { + log.debug( + "Using Spark application name '{}' as the Datadog service for OpenLineage", sparkAppName); + } + + return sparkAppName; + } + private static void reportKafkaOffsets( final String appName, final AgentSpan span, final SourceProgress progress) { if (!traceConfig().isDataStreamsEnabled() diff --git a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy index c5db469fb98..2145b3b07d2 100644 --- a/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy +++ b/dd-java-agent/instrumentation/spark/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkListenerTest.groovy @@ -4,6 +4,8 @@ import com.datadoghq.sketch.ddsketch.DDSketchProtoBinding import com.datadoghq.sketch.ddsketch.proto.DDSketch import com.datadoghq.sketch.ddsketch.store.CollapsingLowestDenseStore import datadog.trace.agent.test.InstrumentationSpecification +import datadog.trace.api.DDTraceId +import datadog.trace.api.ProcessTags import org.apache.spark.SparkConf import org.apache.spark.Success$ import org.apache.spark.executor.TaskMetrics @@ -12,6 +14,7 @@ import org.apache.spark.scheduler.SparkListenerApplicationEnd import org.apache.spark.scheduler.SparkListenerApplicationStart import org.apache.spark.scheduler.SparkListenerExecutorAdded import org.apache.spark.scheduler.SparkListenerExecutorRemoved +import org.apache.spark.scheduler.SparkListenerInterface import org.apache.spark.scheduler.SparkListenerJobEnd import org.apache.spark.scheduler.SparkListenerJobStart import org.apache.spark.scheduler.SparkListenerStageCompleted @@ -519,6 +522,52 @@ abstract class AbstractSparkListenerTest extends InstrumentationSpecification { } } + def "test setupOpenLineage gets service name"(String serviceNameSetByUser, String serviceName, String sparkAppName) { + setup: + SparkConf sparkConf = new SparkConf() + injectSysConfig("dd.service.name.set.by.user", serviceNameSetByUser) + if (Boolean.parseBoolean(serviceNameSetByUser)) { + injectSysConfig("dd.service.name", serviceName) + } + if (sparkAppName != null) { + sparkConf.set("spark.app.name", sparkAppName) + } + + def listener = getTestDatadogSparkListener(sparkConf) + listener.openLineageSparkListener = Mock(SparkListenerInterface) + listener.openLineageSparkConf = new SparkConf() + listener.setupOpenLineage(Mock(DDTraceId)) + + expect: + assert listener + .openLineageSparkConf + .get("spark.openlineage.run.tags") + .split(";") + .contains("_dd.ol_service:expected-service-name") + + where: + serviceNameSetByUser | serviceName | sparkAppName + true | "expected-service-name" | null + false | null | "expected-service-name" + true | "spark" | "expected-service-name" + true | "hadoop" | "expected-service-name" + } + + def "test setupOpenLineage fills ProcessTags"() { + setup: + def listener = getTestDatadogSparkListener() + listener.openLineageSparkListener = Mock(SparkListenerInterface) + listener.openLineageSparkConf = new SparkConf() + listener.setupOpenLineage(Mock(DDTraceId)) + + expect: + assert listener + .openLineageSparkConf + .get("spark.openlineage.run.tags") + .split(";") + .contains("_dd.ol_intake.process_tags:" + ProcessTags.getTagsForSerialization()) + } + protected validateRelativeError(double value, double expected, double relativeAccuracy) { double relativeError = Math.abs(value - expected) / expected assert relativeError < relativeAccuracy