Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import datadog.trace.api.BaseHash;
import datadog.trace.api.Config;
import datadog.trace.api.DDTags;
import datadog.trace.api.DDTraceId;
Expand Down Expand Up @@ -180,7 +181,9 @@ public void setupOpenLineage(DDTraceId traceId) {
"_dd.trace_id:"
+ traceId.toString()
+ ";_dd.ol_intake.emit_spans:false;_dd.ol_service:"
+ sparkServiceName
+ getServiceForOpenLineage(sparkConf, isRunningOnDatabricks)
+ ";_dd.ol_intake.base_hash:"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for adding that, but actually given that you support also container tags, it would be better to skip the base hash and add directly the process tags .
These are currently enabled for jvm 21 and will be enabled by default on next version

+ BaseHash.getBaseHashStr()
+ ";_dd.ol_app_id:"
+ appId);
return;
Expand Down Expand Up @@ -669,6 +672,9 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {
}
}

// OpenLineage call should be prior to method return statements
notifyOl(x -> openLineageSparkListener.onTaskEnd(x), taskEnd);

// Only sending failing tasks
if (!(taskEnd.reason() instanceof TaskFailedReason)) {
return;
Expand All @@ -687,8 +693,6 @@ public void onTaskEnd(SparkListenerTaskEnd taskEnd) {

Properties props = stageProperties.get(stageSpanKey);
sendTaskSpan(stageSpan, taskEnd, props);

notifyOl(x -> openLineageSparkListener.onTaskEnd(x), taskEnd);
}

public static boolean classIsLoadable(String className) {
Expand Down Expand Up @@ -1292,6 +1296,30 @@ private static String getSparkServiceName(SparkConf conf, boolean isRunningOnDat
return sparkAppName;
}

private static String getServiceForOpenLineage(SparkConf conf, boolean isRunningOnDatabricks) {
// Service for OpenLineage in Databricks is not supported yet
if (isRunningOnDatabricks) {
return null;
}

// Keep service set by user, except if it is only "spark" or "hadoop" that can be set by USM
String serviceName = Config.get().getServiceName();
if (Config.get().isServiceNameSetByUser()
&& !"spark".equals(serviceName)
&& !"hadoop".equals(serviceName)) {
log.debug("Service '{}' explicitly set by user, not using the application name", serviceName);
return serviceName;
}

String sparkAppName = conf.get("spark.app.name", null);
if (sparkAppName != null) {
log.info(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this one be log.debug() too?

"Using Spark application name '{}' as the Datadog service for OpenLineage", sparkAppName);
}

return sparkAppName;
}

private static void reportKafkaOffsets(
final String appName, final AgentSpan span, final SourceProgress progress) {
if (!traceConfig().isDataStreamsEnabled()
Expand Down
Loading