diff --git a/dd-java-agent/instrumentation/phantom/phantom.gradle b/dd-java-agent/instrumentation/phantom/phantom.gradle new file mode 100644 index 00000000000..71f90877f8f --- /dev/null +++ b/dd-java-agent/instrumentation/phantom/phantom.gradle @@ -0,0 +1,38 @@ +// Set properties before any plugins get loaded +ext { + minJavaVersionForTests = JavaVersion.VERSION_1_8 +} + +apply from: "${rootDir}/gradle/java.gradle" +apply from: "${rootDir}/gradle/test-with-scala.gradle" + +apply plugin: 'org.unbroken-dome.test-sets' + + +dependencies { + main_java8CompileOnly group: 'com.outworkers', name: 'phantom-dsl_2.11', version: '2.59.0' + main_java8CompileOnly group: 'com.datastax.cassandra', name: 'cassandra-driver-core', version: '3.0.0' + main_java8CompileOnly group: 'io.monix', name: 'monix_2.11', version: '3.2.1' + compile project(':dd-trace-api') + compile project(':dd-java-agent:agent-tooling') + +// compile deps.opentracing +// compile deps.autoservice +// annotationProcessor deps.autoservice + //compile project(':dd-java-agent:instrumentation:hibernate') + testCompile project(':dd-java-agent:instrumentation:trace-annotation') + testCompile project(':dd-java-agent:instrumentation:java-concurrent') + testCompile project(':dd-java-agent:instrumentation:datastax-cassandra-3') + testCompile project(':dd-java-agent:instrumentation:phantom') + + testCompile group: 'com.outworkers', name: 'phantom-dsl_2.11', version: '2.59.0' + testCompile group: 'com.outworkers', name: 'phantom-monix_2.11', version: '2.59.0' + testCompile group: 'com.datastax.cassandra', name: 'cassandra-driver-core', version: '3.0.0' + testCompile group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.5.31' + testCompile group: 'io.monix', name: 'monix_2.11', version: '3.2.1' + testCompile deps.scala + testCompile group: 'org.scala-lang', name: 'scala-reflect', version: '2.11.12' + testCompile 'org.scala-lang.modules:scala-java8-compat_2.11:0.9.1' + testCompile group: 'org.cassandraunit', name: 'cassandra-unit', version: '3.11.2.0' +} + diff --git a/dd-java-agent/instrumentation/phantom/src/main/java/datadog/trace/instrumentation/phantom/FutureCompletionListener.java b/dd-java-agent/instrumentation/phantom/src/main/java/datadog/trace/instrumentation/phantom/FutureCompletionListener.java new file mode 100644 index 00000000000..cb8e31c0631 --- /dev/null +++ b/dd-java-agent/instrumentation/phantom/src/main/java/datadog/trace/instrumentation/phantom/FutureCompletionListener.java @@ -0,0 +1,49 @@ +package datadog.trace.instrumentation.phantom; + +import com.outworkers.phantom.ResultSet; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.runtime.AbstractFunction1; +import scala.util.Try; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope; +import static datadog.trace.instrumentation.phantom.PhantomDecorator.DECORATE; + +public class FutureCompletionListener extends AbstractFunction1, Object> { + private static final Logger log = LoggerFactory.getLogger(FutureCompletionListener.class); + + private final AgentSpan agentSpan; + public FutureCompletionListener(AgentSpan agentSpan) { + this.agentSpan = agentSpan; + } + + @Override + public Object apply(final Try resultSetTry) { + log.debug("phantom future completed"); + final AgentScope scope = activateSpan(agentSpan); + try { + final ResultSet resultSet = resultSetTry.get(); // TODO: Optimize the potential throw + log.debug("Call completed successfully"); + if (resultSet != null) { + String keyspace = resultSet.getExecutionInfo().getStatement().getKeyspace(); + if (keyspace != null) { + agentSpan.setTag("keyspace", keyspace); + } + } + DECORATE.beforeFinish(agentSpan); + } catch (final Throwable t) { + log.debug("Call completed with error"); + DECORATE.onError(agentSpan, t); + } finally { + log.debug("doing finish and close"); + agentSpan.finish(); + scope.setAsyncPropagation(false); + scope.close(); + } + return null; + } +} diff --git a/dd-java-agent/instrumentation/phantom/src/main/java/datadog/trace/instrumentation/phantom/GuavaAdapterAdvice.java b/dd-java-agent/instrumentation/phantom/src/main/java/datadog/trace/instrumentation/phantom/GuavaAdapterAdvice.java new file mode 100644 index 00000000000..7f949039a53 --- /dev/null +++ b/dd-java-agent/instrumentation/phantom/src/main/java/datadog/trace/instrumentation/phantom/GuavaAdapterAdvice.java @@ -0,0 +1,51 @@ +package datadog.trace.instrumentation.phantom; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static datadog.trace.instrumentation.phantom.PhantomDecorator.DECORATE; + +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.outworkers.phantom.ResultSet; +import com.outworkers.phantom.ops.QueryContext; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.implementation.bytecode.assign.Assigner; +import scala.concurrent.ExecutionContext; +import scala.concurrent.ExecutionContext$; +import scala.concurrent.ExecutionContextExecutor; +import scala.concurrent.Future; + +public class GuavaAdapterAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope enter(@Advice.Argument(value = 0) final Statement statement, + @Advice.Argument(value = 1) final Session session, + @Advice.Argument(value = 2) final ExecutionContextExecutor ctx ) { + System.out.println("Calling with context " + ctx.toString()); + final AgentScope scope = startSpanWithScope(statement); + scope.setAsyncPropagation(true); + return scope; + } + + public static AgentScope startSpanWithScope(final Statement statement) { + final AgentSpan span = startSpan("phantom.future"); + DECORATE.afterStart(span); + DECORATE.onStatement(span, statement.toString()); + //log.debug("activating span " + span); + return activateSpan(span); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void exit( + @Advice.Argument(value = 2) final ExecutionContextExecutor ctx, + @Advice.Return(readOnly = false, typing = Assigner.Typing.DYNAMIC) Future resultSetFuture, + @Advice.Enter final AgentScope agentScope) { + //log.debug("onMethodExit " + agentScope.toString()); + if (agentScope == null || resultSetFuture == null) { + return; + } + + resultSetFuture.onComplete(new FutureCompletionListener(agentScope.span()), ctx); + } +} diff --git a/dd-java-agent/instrumentation/phantom/src/main/java/datadog/trace/instrumentation/phantom/PhantomAdvice.java b/dd-java-agent/instrumentation/phantom/src/main/java/datadog/trace/instrumentation/phantom/PhantomAdvice.java new file mode 100644 index 00000000000..b3c94a74b1c --- /dev/null +++ b/dd-java-agent/instrumentation/phantom/src/main/java/datadog/trace/instrumentation/phantom/PhantomAdvice.java @@ -0,0 +1,48 @@ +package datadog.trace.instrumentation.phantom; + +import com.datastax.driver.core.Session; +import com.outworkers.phantom.ResultSet; +import com.outworkers.phantom.ops.QueryContext; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.implementation.bytecode.assign.Assigner; +import scala.concurrent.ExecutionContextExecutor; +import scala.concurrent.Future; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static datadog.trace.instrumentation.phantom.PhantomDecorator.DECORATE; + +public class PhantomAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope enter(@Advice.This final QueryContext.RootQueryOps rootQueryOps, + @Advice.Argument(value = 0) final Session session, + @Advice.Argument(value = 1) final ExecutionContextExecutor ctx ) { + System.out.println("Calling with context " + ctx.toString()); + final AgentScope scope = startSpanWithScope(rootQueryOps); + scope.setAsyncPropagation(true); + return scope; + } + + public static AgentScope startSpanWithScope(final QueryContext.RootQueryOps queryOps) { + final AgentSpan span = startSpan("phantom.future"); + DECORATE.afterStart(span); + DECORATE.onStatement(span, queryOps.query().queryString()); + //log.debug("activating span " + span); + return activateSpan(span); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void exit( + @Advice.Argument(value = 1) final ExecutionContextExecutor ctx, + @Advice.Return(readOnly = false, typing = Assigner.Typing.DYNAMIC) Future resultSetFuture, + @Advice.Enter final AgentScope agentScope) { + //log.debug("onMethodExit " + agentScope.toString()); + if (agentScope == null || resultSetFuture == null) { + return; + } + + resultSetFuture.onComplete(new FutureCompletionListener(agentScope.span()), ctx); + } +} diff --git a/dd-java-agent/instrumentation/phantom/src/main/java/datadog/trace/instrumentation/phantom/PhantomDecorator.java b/dd-java-agent/instrumentation/phantom/src/main/java/datadog/trace/instrumentation/phantom/PhantomDecorator.java new file mode 100644 index 00000000000..a454027e8a8 --- /dev/null +++ b/dd-java-agent/instrumentation/phantom/src/main/java/datadog/trace/instrumentation/phantom/PhantomDecorator.java @@ -0,0 +1,49 @@ +package datadog.trace.instrumentation.phantom; + +import datadog.trace.api.DDSpanTypes; +import datadog.trace.bootstrap.instrumentation.decorator.OrmClientDecorator; +import datadog.trace.bootstrap.instrumentation.decorator.ServerDecorator; + +public class PhantomDecorator extends OrmClientDecorator { + public static final PhantomDecorator DECORATE = new PhantomDecorator(); + + @Override + protected String[] instrumentationNames() { + return new String[] {"phantom"}; + } + + @Override + protected String spanType() { + return "phantom"; + } + + @Override + protected String component() { + return "scala-phantom"; + } + + @Override + public String entityName(Object entity) { + return null; + } + + @Override + protected String dbType() { + return DDSpanTypes.CASSANDRA; + } + + @Override + protected String dbUser(Object o) { + return null; + } + + @Override + protected String dbInstance(Object o) { + return null; + } + + @Override + protected String service() { + return "phantom"; + } +} diff --git a/dd-java-agent/instrumentation/phantom/src/main/java/datadog/trace/instrumentation/phantom/PhantomInstrumentation.java b/dd-java-agent/instrumentation/phantom/src/main/java/datadog/trace/instrumentation/phantom/PhantomInstrumentation.java new file mode 100644 index 00000000000..977f85b0719 --- /dev/null +++ b/dd-java-agent/instrumentation/phantom/src/main/java/datadog/trace/instrumentation/phantom/PhantomInstrumentation.java @@ -0,0 +1,60 @@ +package datadog.trace.instrumentation.phantom; + +import com.datastax.driver.core.Session; +import com.datastax.driver.core.Statement; +import com.google.auto.service.AutoService; +import com.outworkers.phantom.ResultSet; +import com.outworkers.phantom.ops.QueryContext; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.context.TraceScope; +import java.util.HashMap; +import lombok.extern.slf4j.Slf4j; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import net.bytebuddy.matcher.ElementMatcher; +import net.bytebuddy.matcher.ElementMatchers; +import scala.runtime.AbstractFunction1; +import scala.concurrent.ExecutionContextExecutor; +import scala.concurrent.Future; +import scala.util.Try; + +import java.util.Collections; +import java.util.Map; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.*; +import static net.bytebuddy.matcher.ElementMatchers.*; + +@Slf4j +@AutoService(Instrumenter.class) +public class PhantomInstrumentation extends Instrumenter.Default { + public PhantomInstrumentation() { + super("phantom", "scala-phantom"); + } + + + @Override + public ElementMatcher typeMatcher() { + return ElementMatchers.nameStartsWith("com.outworkers.phantom.builder.query.execution.PromiseInterface"); + } + + @Override + public Map, String> transformers() { + final Map, String> transformers = new HashMap<>(); + //transformers.put(isMethod().and(named("future")), packageName + ".PhantomAdvice"); + transformers.put(isMethod().and(named("fromGuava").and(ElementMatchers.takesArgument(0, named("com.datastax.driver.core.Statement")))), + packageName + ".GuavaAdapterAdvice"); + return transformers; + } + + @Override + public String[] helperClassNames() { + return new String[]{ + packageName + ".FutureCompletionListener", + packageName + ".PhantomDecorator" + }; + } +} diff --git a/dd-java-agent/instrumentation/phantom/src/main/java/datadog/trace/instrumentation/phantom/TaskCompletionListener.java b/dd-java-agent/instrumentation/phantom/src/main/java/datadog/trace/instrumentation/phantom/TaskCompletionListener.java new file mode 100644 index 00000000000..7f9efbc7127 --- /dev/null +++ b/dd-java-agent/instrumentation/phantom/src/main/java/datadog/trace/instrumentation/phantom/TaskCompletionListener.java @@ -0,0 +1,26 @@ +package datadog.trace.instrumentation.phantom; + +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import monix.eval.Task; +import scala.Function1; +import scala.Option; +import scala.runtime.AbstractFunction1; +import scala.runtime.BoxedUnit; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +public class TaskCompletionListener extends AbstractFunction1, Task> { + private final AgentSpan agentSpan; + + public TaskCompletionListener(final AgentSpan agentSpan) { + this.agentSpan = agentSpan; + } + + @Override + public Task apply(final Option throwableOption) { + final AgentScope scope = activateSpan(agentSpan); + agentSpan.finish(); + scope.setAsyncPropagation(false); + scope.close(); + return null; + } +} diff --git a/dd-java-agent/instrumentation/phantom/src/test/groovy/PhantomInstrumentationTest.groovy b/dd-java-agent/instrumentation/phantom/src/test/groovy/PhantomInstrumentationTest.groovy new file mode 100644 index 00000000000..41b204ea9da --- /dev/null +++ b/dd-java-agent/instrumentation/phantom/src/test/groovy/PhantomInstrumentationTest.groovy @@ -0,0 +1,211 @@ +import akka.actor.ActorSystem +import com.datastax.driver.core.Cluster +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.agent.test.asserts.TraceAssert +import datadog.trace.api.DDSpanTypes +import datadog.trace.bootstrap.instrumentation.api.Tags +import datadog.trace.core.DDSpan +import org.cassandraunit.utils.EmbeddedCassandraServerHelper +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.ExecutionContext$ +import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.duration.Duration +import spock.lang.Shared + +import java.util.concurrent.TimeoutException + +import static datadog.trace.agent.test.utils.TraceUtils.basicSpan +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace + +class PhantomInstrumentationTest extends AgentTestRunner { + + @Shared + Cluster cluster + @Shared + int port + @Shared + BooksOps booksOps + @Shared + TestOps testOps + @Shared + ActorSystem testSystem = ActorSystem.apply("phantom-instrumentation-test") + + def setupSpec() { + /* + This timeout seems excessive but we've seen tests fail with timeout of 40s. + TODO: if we continue to see failures we may want to consider using 'real' Cassandra + started in container like we do for memcached. Note: this will complicate things because + tests would have to assume they run under shared Cassandra and act accordingly. + */ + EmbeddedCassandraServerHelper.startEmbeddedCassandra(EmbeddedCassandraServerHelper.CASSANDRA_RNDPORT_YML_FILE, 120000L) + + cluster = EmbeddedCassandraServerHelper.getCluster() + + /* + Looks like sometimes our requests fail because Cassandra takes to long to respond, + Increase this timeout as well to try to cope with this. + */ + cluster.getConfiguration().getSocketOptions().setReadTimeoutMillis(120000) + cluster.newSession().execute("select * from system.local").one() + + + // create the DB + port = EmbeddedCassandraServerHelper.nativeTransportPort + BooksDatabase booksDatabase = new EmbeddedBooksDatabase(port) + new BooksDatabaseUtils(booksDatabase).create() + + booksOps = new BooksOps(booksDatabase) + testOps = new TestOps(booksOps) + + System.out.println("Started embedded cassandra on " + port) + } + + def cleanupSpec() { + EmbeddedCassandraServerHelper.cleanEmbeddedCassandra() + testSystem.terminate() + } + + @Shared + def insertBook = { book, ec -> + testOps.insertBookAndWait((Book) book, (ExecutionContextExecutor) ec)} + + @Shared + ExecutionContext globalEc = ExecutionContext$.MODULE$.global() + +// def "test multi operation for expression"() { +// setup: +// Book testBook = Book.apply(id, "Code Complete", "McConnell", "OutOfStock", 0) +// runUnderTrace("parent") { +// Await.result(testOps.multiOperationExpression(testBook, generalEc, phantomEc), Duration.create(5, "seconds")) +// } +// +// +// expect: +// assertTraces(1) { +// trace(0, 4) { +// basicSpan(it, 0, "parent") +//// phantomSpan(it, 1, cql, null, it.span(0), null) +//// cassandraSpan(it, 2, cql, null, false, it.span(1)) +// } +// } +// +// cleanup: +// shutdownScopes(5000) +// +// where: +// id | phantomEc | generalEc +// UUID.randomUUID() | globalEc | globalEc +//// UUID.randomUUID() | testSystem.dispatcher | testSystem.dispatcher +//// UUID.randomUUID() | globalEc | globalEc +//// UUID.randomUUID() | testSystem.dispatcher | globalEc +// +// +// } + + + def "test read book" () { + setup: + + + when: + runUnderTrace("parent") { + booksOps.getBook(id) + blockUntilChildSpansFinished(1) + } + + then: + assertTraces(1) { + trace(0, 3) { + basicSpan(it, 0, "parent") + phantomSpan(it, 1, cql, it.span(0), null) + cassandraSpan(it, 2, cql, it.span(1)) + } + } + + cleanup: + shutdownScopes(5000) + + where: + id | cql + UUID.randomUUID() | "SELECT * FROM books.books WHERE id = " + id + " LIMIT 1;" + + } + + def "test insert future"() { + setup: + + + when: + runUnderTrace("parent") { + Book book = Book.apply(id, title, author, "", 0) + cmd(book, ec) + blockUntilChildSpansFinished(1) + } + + then: + assertTraces(1) { + trace(0, 3) { + basicSpan(it, 1, "parent") + phantomSpan(it, 0, cql, it.span(1), null) + cassandraSpan(it, 2, cql, it.span(0)) + } + } + + cleanup: + shutdownScopes(5000) + + where: + title | author | id | cmd | cql | ec + "Programming in Scala" | "Odersky" | UUID.randomUUID() | insertBook | "UPDATE books.books SET title = '" + title + "', author = '" + author + "' WHERE id = " + id + ";" | globalEc + "Programming in Scala" | "Odersky" | UUID.randomUUID() | insertBook | "UPDATE books.books SET title = '" + title + "', author = '" + author + "' WHERE id = " + id + ";" | testSystem.dispatcher + } + + def shutdownScopes(Long waitMillis) { + Long deadline = System.currentTimeMillis() + waitMillis + while (testTracer.activeSpan() != null) { + if (System.currentTimeMillis() > deadline) { + throw new TimeoutException("active trace still open: " + testTracer.activeSpan().toString()) + } + System.println("active scope: " + testTracer.activeScope()) + testTracer.activeScope().close() + Thread.sleep(100) + } + true + } + + def phantomSpan(TraceAssert trace, int index, String statement, Object parentSpan = null, Throwable exception = null) { + trace.span(index) { + serviceName "phantom" + operationName "cassandra.query" + resourceName statement + spanType "phantom" + if (parentSpan == null) { + parent() + } else { + childOf((DDSpan) parentSpan) + } + tags { + "$Tags.COMPONENT" "scala-phantom" + "$Tags.DB_TYPE" "cassandra" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT + defaultTags() + } + } + } + + def cassandraSpan(TraceAssert trace, int index, String statement, Object parentSpan = null, Throwable exception = null) { + trace.span(index) { + serviceName "cassandra" + operationName "cassandra.query" + resourceName statement + spanType DDSpanTypes.CASSANDRA + if (parentSpan == null) { + parent() + } else { + childOf((DDSpan) parentSpan) + } + } + } + +} diff --git a/dd-java-agent/instrumentation/phantom/src/test/scala/BooksDatabase.scala b/dd-java-agent/instrumentation/phantom/src/test/scala/BooksDatabase.scala new file mode 100644 index 00000000000..a071802d140 --- /dev/null +++ b/dd-java-agent/instrumentation/phantom/src/test/scala/BooksDatabase.scala @@ -0,0 +1,38 @@ +import java.util.UUID + +import com.outworkers.phantom.Table +import com.outworkers.phantom.connectors.{CassandraConnection, ContactPoint} +import com.outworkers.phantom.database.{Database, DatabaseProvider} +import com.outworkers.phantom.dsl._ + +case class Book( + id: UUID, + title: String, + author: String, + status: String, + inventory: Int + ) + +abstract class Books extends Table[Books, Book] { + object id extends UUIDColumn with PartitionKey + object title extends StringColumn + object author extends StringColumn + object status extends StringColumn + object inventory extends IntColumn +} + +class BooksDatabase(override val connector: CassandraConnection) extends Database[BooksDatabase](connector) { + object Books extends Books with Connector +} + +class EmbeddedBooksDatabase(port: Int) extends BooksDatabase(ContactPoint.apply(port).keySpace("books")) + +class BooksDatabaseUtils(db: BooksDatabase) extends DatabaseProvider[BooksDatabase] { + + import scala.concurrent.duration._ + + override def database: BooksDatabase = db + + def create: Unit = database.create(5.seconds)(scala.concurrent.ExecutionContext.Implicits.global) + +} diff --git a/dd-java-agent/instrumentation/phantom/src/test/scala/BooksOps.scala b/dd-java-agent/instrumentation/phantom/src/test/scala/BooksOps.scala new file mode 100644 index 00000000000..4dd1067cc5d --- /dev/null +++ b/dd-java-agent/instrumentation/phantom/src/test/scala/BooksOps.scala @@ -0,0 +1,35 @@ +// This code is in it's own compilation unit to keep the scope of DefaultImports.context contained +import com.outworkers.phantom.dsl._ +import java.util.UUID +import scala.concurrent.{ExecutionContextExecutor, Future} + +class BooksOps(database: BooksDatabase) { + implicit val session = database.session + implicit val space = database.space + + def insertBook(book: Book, ecc: ExecutionContextExecutor): Future[ResultSet] = { + database.Books.update() + .where(_.id eqs book.id) + .modify(_.title setTo book.title) + .and(_.author setTo book.author) + .future()(session, ecc) + } + + def setBookStatus(id: UUID, status: String, ecc: ExecutionContextExecutor): Future[ResultSet] = { + database.Books.update() + .where(_.id eqs id) + .modify(_.status setTo status) + .future()(session, ecc) + } + + def setInventory(id: UUID, count: Int, ecc: ExecutionContextExecutor): Future[ResultSet] = { + database.Books.update() + .where(_.id eqs id) + .modify(_.inventory setTo count) + .future()(session, ecc) + } + + def getBook(id: UUID): Future[Option[Book]] = { + database.Books.select.where(_.id eqs id).one() + } +} diff --git a/dd-java-agent/instrumentation/phantom/src/test/scala/TestOps.scala b/dd-java-agent/instrumentation/phantom/src/test/scala/TestOps.scala new file mode 100644 index 00000000000..8c48cb43b72 --- /dev/null +++ b/dd-java-agent/instrumentation/phantom/src/test/scala/TestOps.scala @@ -0,0 +1,64 @@ +import java.util.concurrent.Executor + +import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture} +import datadog.trace.api.Trace +import org.slf4j.{Logger, LoggerFactory} + +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future, Promise} +import scala.compat.java8.FutureConverters._ +import scala.util.Try +class TestOps(booksOps: BooksOps) { + val log: Logger = LoggerFactory.getLogger(classOf[TestOps]) + + def insertBookAndWait(book: Book, ecc: ExecutionContextExecutor): Unit = { + Await.result(booksOps.insertBook(book, ecc), 5.seconds) + } + + def multiOperationExpression(book: Book, generalExecutionContext: ExecutionContext, phantomExecutor: ExecutionContextExecutor): Future[Boolean] = { + implicit val ec = generalExecutionContext + val ops = for { + rs1 <- Future(booksOps.insertBook(book, phantomExecutor)) + rs2 <- Future{ + booksOps.setBookStatus(book.id, "In stock", phantomExecutor) + } + rs3 <- Future { + booksOps.setInventory(book.id, 100, phantomExecutor) + } + } yield {true} + ops + } + + import FutureAdapter._ + def multiOperationExpressionPlain(book: Book, generalExecutionContext: ExecutionContext, phantomExecutor: ExecutionContextExecutor): Future[Boolean] = { + implicit val ec = generalExecutionContext + implicit val ec2 = phantomExecutor + booksOps.session.executeAsync(s"UPDATE books.books set title = '${book.title}', author = '${book.author}' where id=${book.id};").asScala + .flatMap(rs => { + println("Starting second update") + booksOps.session.executeAsync(s"UPDATE books.books set status = 'Instock' where id=${book.id};").asScala + } ) + .flatMap(rs => { + println("Starting third update") + booksOps.session.executeAsync( s"UPDATE books.books set inventory = 10 where id=${book.id};").asScala + }) + .map(_ => true) + } + +} + +object FutureAdapter { + + implicit class RichListenableFuture[T](val lf: ListenableFuture[T]) extends AnyVal { + def asScala(implicit e: Executor): Future[T] = { + val p = Promise[T]() + lf.addListener(new Runnable { + override def run(): Unit = { + p.complete(Try(lf.get())) + } + }, e) + p.future + } + } + +} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/scopemanager/ContinuableScopeManager.java b/dd-trace-core/src/main/java/datadog/trace/core/scopemanager/ContinuableScopeManager.java index 612bd2c52d8..e2ca90f3d56 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/scopemanager/ContinuableScopeManager.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/scopemanager/ContinuableScopeManager.java @@ -10,6 +10,7 @@ import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; @@ -68,6 +69,7 @@ public Scope handleSpan(final AgentSpan span) { private Scope handleSpan(final Continuation continuation, final AgentSpan span) { final ContinuableScope scope = new ContinuableScope(continuation, delegate.handleSpan(span)); + log.debug(String.format("Changing scope from %s TO====> %s", tlsScope.get(), scope)); tlsScope.set(scope); scope.afterActivated(); return scope; @@ -115,7 +117,6 @@ public void close() { if (referenceCount.decrementAndGet() > 0) { return; } - if (tlsScope.get() != this) { log.debug("Tried to close {} scope when {} is on top. Ignoring!", this, tlsScope.get()); return; diff --git a/settings.gradle b/settings.gradle index 36208afb064..eab0292f2d3 100644 --- a/settings.gradle +++ b/settings.gradle @@ -124,6 +124,7 @@ include ':dd-java-agent:instrumentation:netty-4.0' include ':dd-java-agent:instrumentation:netty-4.1' include ':dd-java-agent:instrumentation:okhttp-2' include ':dd-java-agent:instrumentation:okhttp-3' +include ':dd-java-agent:instrumentation:phantom' include ':dd-java-agent:instrumentation:play-2.3' include ':dd-java-agent:instrumentation:play-2.4' include ':dd-java-agent:instrumentation:play-2.6'