Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
package datadog.trace.instrumentation.java.concurrent.structuredconcurrency;

import static datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils.capture;
import static datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter.ExcludeType.FORK_JOIN_TASK;
import static java.util.Collections.singleton;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.ExcludeFilterProvider;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Platform;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import java.util.Collection;
import java.util.Map;
import net.bytebuddy.asm.Advice;

Expand All @@ -27,10 +22,7 @@
@SuppressWarnings("unused")
@AutoService(InstrumenterModule.class)
public class StructuredTaskScopeInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForBootstrap,
Instrumenter.ForSingleType,
Instrumenter.HasMethodAdvice,
ExcludeFilterProvider {
implements Instrumenter.ForBootstrap, Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {

public StructuredTaskScopeInstrumentation() {
super("java_concurrent", "structured_task_scope");
Expand All @@ -57,14 +49,6 @@ public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(isConstructor(), getClass().getName() + "$ConstructorAdvice");
}

@Override
public Map<ExcludeFilter.ExcludeType, ? extends Collection<String>> excludedClasses() {
// Prevent the ForkJoinPool instrumentation to enable the task scope too early on the carrier
// thread rather than on the expected running thread, which is virtual by default.
return singletonMap(
FORK_JOIN_TASK, singleton("java.util.concurrent.ForkJoinTask$RunnableExecuteAction"));
}

public static final class ConstructorAdvice {
@Advice.OnMethodExit
public static <T> void captureScope(
Expand Down
48 changes: 48 additions & 0 deletions dd-smoke-tests/concurrent/java-21/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
plugins {
id 'application'
id 'com.github.johnrengelman.shadow'
}

ext {
minJavaVersionForTests = JavaVersion.VERSION_21
}

apply from: "$rootDir/gradle/java.gradle"

description = 'JDK 21 Concurrent Integration Tests'

java {
toolchain {
languageVersion = JavaLanguageVersion.of(21)
}
}
tasks.withType(JavaCompile).configureEach {
setJavaVersion(it, 21)
sourceCompatibility = JavaVersion.VERSION_21
targetCompatibility = JavaVersion.VERSION_21
}

// Disable plugin tasks that do not support Java 21:
// * forbiddenApis is missing classes
// * spotless as the google-java-format version does not support Java 21 and can't be changed once applied
// * spotbugs failed to read class using newer bytecode versions
forbiddenApisMain {
failOnMissingClasses = false
}
['spotlessApply', 'spotlessCheck', 'spotlessJava', 'spotbugsMain'].each {
tasks.named(it).configure { enabled = false }
}

application {
mainClassName = 'datadog.smoketest.concurrent.ConcurrentApp'
}

dependencies {
implementation group: 'io.opentelemetry.instrumentation', name: 'opentelemetry-instrumentation-annotations', version: '2.13.3'
testImplementation project(':dd-smoke-tests')
}

tasks.withType(Test).configureEach {
dependsOn "shadowJar"
jvmArgs "-Ddatadog.smoketest.shadowJar.path=${tasks.shadowJar.archiveFile.get()}"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package datadog.smoketest.concurrent;

import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.util.concurrent.ExecutionException;

public class ConcurrentApp {
@WithSpan("main")
public static void main(String[] args) {
for (String arg : args) {
try (FibonacciCalculator calc = getCalculator(arg)) {
calc.computeFibonacci(10);
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Failed to compute fibonacci number.", e);
}
}
}

private static FibonacciCalculator getCalculator(String name) {
return switch (name) {
case "virtualThreadExecute" -> new VirtualThreadExecuteCalculator();
case "virtualThreadSubmitRunnable" -> new VirtualThreadSubmitRunnableCalculator();
case "virtualThreadSubmitCallable" -> new VirtualThreadSubmitCallableCalculator();
case "virtualThreadInvokeAll" -> new VirtualThreadInvokeAllCalculator();
case "virtualThreadInvokeAny" -> new VirtualThreadInvokeAnyCalculator();
default -> throw new RuntimeException("Unknown Fibonacci calculator: " + name);
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package datadog.smoketest.concurrent;

import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class VirtualThreadExecuteCalculator implements FibonacciCalculator {
private final ExecutorService executor;

public VirtualThreadExecuteCalculator() {
this.executor = Executors.newVirtualThreadPerTaskExecutor();
}

@Override
public long computeFibonacci(int n) throws ExecutionException, InterruptedException {
FibonacciExecuteTask task = new FibonacciExecuteTask(n);
this.executor.execute(task);
return task.result.get();
}

public class FibonacciExecuteTask implements Runnable {
private final long n;
private final CompletableFuture<Long> result;

public FibonacciExecuteTask(long n) {
this.n = n;
this.result = new CompletableFuture<>();
}

@WithSpan("compute")
public void run() {
if (this.n <= 1) {
this.result.complete(this.n);
return;
}
FibonacciExecuteTask task1 = new FibonacciExecuteTask(this.n - 1);
FibonacciExecuteTask task2 = new FibonacciExecuteTask(this.n - 2);
executor.execute(task1);
executor.execute(task2);
try {
this.result.complete(task1.result.get() + task2.result.get());
} catch (InterruptedException | ExecutionException e) {
this.result.completeExceptionally(e);
}
}
}

@Override
public void close() {
this.executor.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package datadog.smoketest.concurrent;

import static java.util.Set.of;

import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class VirtualThreadInvokeAllCalculator implements FibonacciCalculator {
private final ExecutorService executor;

public VirtualThreadInvokeAllCalculator() {
this.executor = Executors.newVirtualThreadPerTaskExecutor();
}

@Override
public long computeFibonacci(int n) throws ExecutionException, InterruptedException {
FibonacciSubmitTask task = new FibonacciSubmitTask(n);
return this.executor.invokeAll(of(task)).getFirst().get();
}

public class FibonacciSubmitTask implements Callable<Long> {
private final long n;

public FibonacciSubmitTask(long n) {
this.n = n;
}

@WithSpan("compute")
public Long call() throws ExecutionException, InterruptedException {
if (this.n <= 1) {
return this.n;
}
FibonacciSubmitTask task1 = new FibonacciSubmitTask(this.n - 1);
FibonacciSubmitTask task2 = new FibonacciSubmitTask(this.n - 2);
List<Future<Long>> futures = executor.invokeAll(List.of(task1, task2));
return futures.getFirst().get() + futures.getLast().get();
}
}

@Override
public void close() {
this.executor.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package datadog.smoketest.concurrent;

import static java.util.Set.of;

import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class VirtualThreadInvokeAnyCalculator implements FibonacciCalculator {
private final ExecutorService executor;

public VirtualThreadInvokeAnyCalculator() {
this.executor = Executors.newVirtualThreadPerTaskExecutor();
}

@Override
public long computeFibonacci(int n) throws ExecutionException, InterruptedException {
FibonacciSubmitTask task = new FibonacciSubmitTask(n);
return this.executor.invokeAny(of(task));
}

public class FibonacciSubmitTask implements Callable<Long> {
private final long n;

public FibonacciSubmitTask(long n) {
this.n = n;
}

@WithSpan("compute")
public Long call() throws ExecutionException, InterruptedException {
if (this.n <= 1) {
return this.n;
}
FibonacciSubmitTask task1 = new FibonacciSubmitTask(this.n - 1);
FibonacciSubmitTask task2 = new FibonacciSubmitTask(this.n - 2);
return executor.invokeAny(of(task1)) + executor.invokeAny(of(task2));
}
}

@Override
public void close() {
this.executor.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package datadog.smoketest.concurrent;

import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class VirtualThreadSubmitCallableCalculator implements FibonacciCalculator {
private final ExecutorService executor;

public VirtualThreadSubmitCallableCalculator() {
this.executor = Executors.newVirtualThreadPerTaskExecutor();
}

@Override
public long computeFibonacci(int n) throws ExecutionException, InterruptedException {
FibonacciSubmitTask task = new FibonacciSubmitTask(n);
return this.executor.submit(task).get();
}

public class FibonacciSubmitTask implements Callable<Long> {
private final long n;

public FibonacciSubmitTask(long n) {
this.n = n;
}

@WithSpan("compute")
public Long call() throws ExecutionException, InterruptedException {
if (this.n <= 1) {
return this.n;
}
FibonacciSubmitTask task1 = new FibonacciSubmitTask(this.n - 1);
FibonacciSubmitTask task2 = new FibonacciSubmitTask(this.n - 2);
Future<Long> future1 = executor.submit(task1);
Future<Long> future2 = executor.submit(task2);
return future1.get() + future2.get();
}
}

@Override
public void close() {
this.executor.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package datadog.smoketest.concurrent;

import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class VirtualThreadSubmitRunnableCalculator implements FibonacciCalculator {
private final ExecutorService executor;

public VirtualThreadSubmitRunnableCalculator() {
this.executor = Executors.newVirtualThreadPerTaskExecutor();
}

@Override
public long computeFibonacci(int n) throws ExecutionException, InterruptedException {
FibonacciSubmitTask task = new FibonacciSubmitTask(n);
this.executor.execute(task);
return task.result.get();
}

public class FibonacciSubmitTask implements Runnable {
private final long n;
private final CompletableFuture<Long> result;

public FibonacciSubmitTask(long n) {
this.n = n;
this.result = new CompletableFuture<>();
}

@WithSpan("compute")
public void run() {
if (this.n <= 1) {
this.result.complete(this.n);
return;
}
FibonacciSubmitTask task1 = new FibonacciSubmitTask(this.n - 1);
FibonacciSubmitTask task2 = new FibonacciSubmitTask(this.n - 2);
executor.submit(task1);
executor.submit(task2);
try {
this.result.complete(task1.result.get() + task2.result.get());
} catch (InterruptedException | ExecutionException e) {
this.result.completeExceptionally(e);
}
}
}

@Override
public void close() {
this.executor.shutdown();
}
}
Loading
Loading