diff --git a/dd-java-agent/instrumentation/couchbase-2.0/couchbase-2.0.gradle b/dd-java-agent/instrumentation/couchbase-2.0/couchbase-2.0.gradle index 0aeb5d3960d..95869236063 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/couchbase-2.0.gradle +++ b/dd-java-agent/instrumentation/couchbase-2.0/couchbase-2.0.gradle @@ -44,10 +44,8 @@ muzzle { } dependencies { - compile project(':dd-java-agent:instrumentation:rxjava-1') - compileOnly group: 'com.couchbase.client', name: 'java-client', version: '2.0.0' - + testCompile group: 'com.couchbase.mock', name: 'CouchbaseMock', version: '1.5.19' testCompile group: 'org.springframework.data', name: 'spring-data-couchbase', version: '2.0.0.RELEASE' diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseBucketInstrumentation.java b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseBucketInstrumentation.java index 770bcc0df24..50ed987a319 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseBucketInstrumentation.java +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseBucketInstrumentation.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.couchbase.client; +import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isInterface; import static net.bytebuddy.matcher.ElementMatchers.isMethod; @@ -11,14 +12,22 @@ import com.couchbase.client.java.CouchbaseCluster; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.api.DDTags; import datadog.trace.bootstrap.CallDepthThreadLocalMap; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.noop.NoopSpan; +import io.opentracing.util.GlobalTracer; import java.lang.reflect.Method; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import rx.Observable; +import rx.functions.Action0; +import rx.functions.Action1; @AutoService(Instrumenter.class) public class CouchbaseBucketInstrumentation extends Instrumenter.Default { @@ -38,15 +47,13 @@ public ElementMatcher typeMatcher() { @Override public String[] helperClassNames() { return new String[] { - "rx.DDTracingUtil", "datadog.trace.agent.decorator.BaseDecorator", "datadog.trace.agent.decorator.ClientDecorator", "datadog.trace.agent.decorator.DatabaseClientDecorator", - "datadog.trace.instrumentation.rxjava.SpanFinishingSubscription", - "datadog.trace.instrumentation.rxjava.TracedSubscriber", - "datadog.trace.instrumentation.rxjava.TracedOnSubscribe", packageName + ".CouchbaseClientDecorator", - packageName + ".CouchbaseOnSubscribe", + getClass().getName() + "$TraceSpanStart", + getClass().getName() + "$TraceSpanFinish", + getClass().getName() + "$TraceSpanError", }; } @@ -69,13 +76,94 @@ public static void subscribeResult( @Advice.Enter final int callDepth, @Advice.Origin final Method method, @Advice.FieldValue("bucket") final String bucket, + @Advice.AllArguments final Object[] args, @Advice.Return(readOnly = false) Observable result) { if (callDepth > 0) { return; } CallDepthThreadLocalMap.reset(CouchbaseCluster.class); + final AtomicReference spanRef = new AtomicReference<>(); + result = + result + .doOnSubscribe(new TraceSpanStart(method, bucket, spanRef)) + .doOnCompleted(new TraceSpanFinish(spanRef)) + .doOnError(new TraceSpanError(spanRef)); + } + } + + public static class TraceSpanStart implements Action0 { + private final Method method; + private final String bucket; + private final AtomicReference spanRef; + + public TraceSpanStart( + final Method method, final String bucket, final AtomicReference spanRef) { + this.method = method; + this.bucket = bucket; + this.spanRef = spanRef; + } + + @Override + public void call() { + // This is called each time an observer has a new subscriber, but we should only time it once. + if (!spanRef.compareAndSet(null, NoopSpan.INSTANCE)) { + return; + } + final Class declaringClass = method.getDeclaringClass(); + final String className = + declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", ""); + final String resourceName = className + "." + method.getName(); + + final Span span = + GlobalTracer.get() + .buildSpan("couchbase.call") + .withTag(DDTags.RESOURCE_NAME, resourceName) + .withTag("bucket", bucket) + .start(); + try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { + // just replace the no-op span. + spanRef.set(DECORATE.afterStart(span)); + } + } + } - result = Observable.create(new CouchbaseOnSubscribe(result, method, bucket)); + public static class TraceSpanFinish implements Action0 { + private final AtomicReference spanRef; + + public TraceSpanFinish(final AtomicReference spanRef) { + this.spanRef = spanRef; + } + + @Override + public void call() { + final Span span = spanRef.getAndSet(null); + + if (span != null) { + try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { + DECORATE.beforeFinish(span); + span.finish(); + } + } + } + } + + public static class TraceSpanError implements Action1 { + private final AtomicReference spanRef; + + public TraceSpanError(final AtomicReference spanRef) { + this.spanRef = spanRef; + } + + @Override + public void call(final Throwable throwable) { + final Span span = spanRef.getAndSet(null); + if (span != null) { + try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { + DECORATE.onError(span, throwable); + DECORATE.beforeFinish(span); + span.finish(); + } + } } } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java index 090672c3598..b5549d014a2 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java @@ -1,5 +1,6 @@ package datadog.trace.instrumentation.couchbase.client; +import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isInterface; import static net.bytebuddy.matcher.ElementMatchers.isMethod; @@ -11,14 +12,22 @@ import com.couchbase.client.java.CouchbaseCluster; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.api.DDTags; import datadog.trace.bootstrap.CallDepthThreadLocalMap; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.noop.NoopSpan; +import io.opentracing.util.GlobalTracer; import java.lang.reflect.Method; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import rx.Observable; +import rx.functions.Action0; +import rx.functions.Action1; @AutoService(Instrumenter.class) public class CouchbaseClusterInstrumentation extends Instrumenter.Default { @@ -38,22 +47,20 @@ public ElementMatcher typeMatcher() { @Override public String[] helperClassNames() { return new String[] { - "rx.DDTracingUtil", "datadog.trace.agent.decorator.BaseDecorator", "datadog.trace.agent.decorator.ClientDecorator", "datadog.trace.agent.decorator.DatabaseClientDecorator", - "datadog.trace.instrumentation.rxjava.SpanFinishingSubscription", - "datadog.trace.instrumentation.rxjava.TracedSubscriber", - "datadog.trace.instrumentation.rxjava.TracedOnSubscribe", packageName + ".CouchbaseClientDecorator", - packageName + ".CouchbaseOnSubscribe", + getClass().getName() + "$TraceSpanStart", + getClass().getName() + "$TraceSpanFinish", + getClass().getName() + "$TraceSpanError", }; } @Override public Map, String> transformers() { return singletonMap( - isMethod().and(isPublic()).and(returns(named("rx.Observable"))).and(not(named("core"))), + isMethod().and(isPublic()).and(returns(named("rx.Observable"))), CouchbaseClientAdvice.class.getName()); } @@ -73,7 +80,84 @@ public static void subscribeResult( return; } CallDepthThreadLocalMap.reset(CouchbaseCluster.class); - result = Observable.create(new CouchbaseOnSubscribe(result, method, null)); + final AtomicReference spanRef = new AtomicReference<>(); + result = + result + .doOnSubscribe(new TraceSpanStart(method, spanRef)) + .doOnCompleted(new TraceSpanFinish(spanRef)) + .doOnError(new TraceSpanError(spanRef)); + } + } + + public static class TraceSpanStart implements Action0 { + private final Method method; + private final AtomicReference spanRef; + + public TraceSpanStart(final Method method, final AtomicReference spanRef) { + this.method = method; + this.spanRef = spanRef; + } + + @Override + public void call() { + // This is called each time an observer has a new subscriber, but we should only time it once. + if (!spanRef.compareAndSet(null, NoopSpan.INSTANCE)) { + return; + } + final Class declaringClass = method.getDeclaringClass(); + final String className = + declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", ""); + final String resourceName = className + "." + method.getName(); + + final Span span = + GlobalTracer.get() + .buildSpan("couchbase.call") + .withTag(DDTags.RESOURCE_NAME, resourceName) + .start(); + try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { + // just replace the no-op span. + spanRef.set(DECORATE.afterStart(scope.span())); + } + } + } + + public static class TraceSpanFinish implements Action0 { + private final AtomicReference spanRef; + + public TraceSpanFinish(final AtomicReference spanRef) { + this.spanRef = spanRef; + } + + @Override + public void call() { + final Span span = spanRef.getAndSet(null); + + if (span != null) { + try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { + DECORATE.beforeFinish(span); + span.finish(); + } + } + } + } + + public static class TraceSpanError implements Action1 { + private final AtomicReference spanRef; + + public TraceSpanError(final AtomicReference spanRef) { + this.spanRef = spanRef; + } + + @Override + public void call(final Throwable throwable) { + final Span span = spanRef.getAndSet(null); + if (span != null) { + try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { + DECORATE.onError(span, throwable); + DECORATE.beforeFinish(span); + span.finish(); + } + } } } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseOnSubscribe.java b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseOnSubscribe.java deleted file mode 100644 index b20fcee0ab4..00000000000 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseOnSubscribe.java +++ /dev/null @@ -1,36 +0,0 @@ -package datadog.trace.instrumentation.couchbase.client; - -import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE; - -import datadog.trace.api.DDTags; -import datadog.trace.instrumentation.rxjava.TracedOnSubscribe; -import io.opentracing.Span; -import java.lang.reflect.Method; -import rx.Observable; - -public class CouchbaseOnSubscribe extends TracedOnSubscribe { - private final String resourceName; - private final String bucket; - - public CouchbaseOnSubscribe( - final Observable originalObservable, final Method method, final String bucket) { - super(originalObservable, "couchbase.call", DECORATE); - - final Class declaringClass = method.getDeclaringClass(); - final String className = - declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", ""); - resourceName = className + "." + method.getName(); - this.bucket = bucket; - } - - @Override - protected void afterStart(final Span span) { - super.afterStart(span); - - span.setTag(DDTags.RESOURCE_NAME, resourceName); - - if (bucket != null) { - span.setTag("bucket", bucket); - } - } -} diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy deleted file mode 100644 index fe0bc662eee..00000000000 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy +++ /dev/null @@ -1,172 +0,0 @@ -import com.couchbase.client.java.AsyncCluster -import com.couchbase.client.java.CouchbaseAsyncCluster -import com.couchbase.client.java.document.JsonDocument -import com.couchbase.client.java.document.json.JsonObject -import com.couchbase.client.java.env.CouchbaseEnvironment -import com.couchbase.client.java.query.N1qlQuery -import spock.util.concurrent.BlockingVariable -import util.AbstractCouchbaseTest - -import java.util.concurrent.TimeUnit - -import static datadog.trace.agent.test.utils.TraceUtils.basicSpan -import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace - -class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { - def "test hasBucket #type"() { - setup: - def hasBucket = new BlockingVariable() - - when: - cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ bkt -> - manager.hasBucket(bucketSettings.name()).subscribe({ result -> hasBucket.set(result) }) - }) - - then: - assert hasBucket.get() - sortAndAssertTraces(1) { - trace(0, 2) { - assertCouchbaseCall(it, 0, "Cluster.openBucket", null) - assertCouchbaseCall(it, 1, "ClusterManager.hasBucket", null, span(0)) - } - } - - cleanup: - cluster?.disconnect()?.timeout(10, TimeUnit.SECONDS)?.toBlocking()?.single() - environment.shutdown() - - where: - bucketSettings << [bucketCouchbase, bucketMemcache] - - environment = envBuilder(bucketSettings).build() - cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1")) - manager = cluster.clusterManager(USERNAME, PASSWORD).toBlocking().single() - type = bucketSettings.type().name() - } - - def "test upsert #type"() { - setup: - JsonObject content = JsonObject.create().put("hello", "world") - def inserted = new BlockingVariable() - - when: - runUnderTrace("someTrace") { - // Connect to the bucket and open it - cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ bkt -> - bkt.upsert(JsonDocument.create("helloworld", content)).subscribe({ result -> inserted.set(result) }) - }) - - blockUntilChildSpansFinished(2) // Improve span ordering consistency - } - - then: - inserted.get().content().getString("hello") == "world" - - sortAndAssertTraces(1) { - trace(0, 3) { - basicSpan(it, 0, "someTrace") - - assertCouchbaseCall(it, 2, "Cluster.openBucket", null, span(0)) - assertCouchbaseCall(it, 1, "Bucket.upsert", bucketSettings.name(), span(2)) - } - } - - cleanup: - cluster?.disconnect()?.timeout(10, TimeUnit.SECONDS)?.toBlocking()?.single() - environment.shutdown() - - where: - bucketSettings << [bucketCouchbase, bucketMemcache] - - environment = envBuilder(bucketSettings).build() - cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1")) - type = bucketSettings.type().name() - } - - def "test upsert and get #type"() { - setup: - JsonObject content = JsonObject.create().put("hello", "world") - def inserted = new BlockingVariable() - def found = new BlockingVariable() - - when: - runUnderTrace("someTrace") { - cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ bkt -> - bkt.upsert(JsonDocument.create("helloworld", content)) - .subscribe({ result -> - inserted.set(result) - bkt.get("helloworld") - .subscribe({ searchResult -> found.set(searchResult) - }) - }) - }) - - blockUntilChildSpansFinished(3) // Improve span ordering consistency - } - - // Create a JSON document and store it with the ID "helloworld" - then: - found.get() == inserted.get() - found.get().content().getString("hello") == "world" - - sortAndAssertTraces(1) { - trace(0, 4) { - basicSpan(it, 0, "someTrace") - - assertCouchbaseCall(it, 3, "Cluster.openBucket", null, span(0)) - assertCouchbaseCall(it, 2, "Bucket.upsert", bucketSettings.name(), span(3)) - assertCouchbaseCall(it, 1, "Bucket.get", bucketSettings.name(), span(2)) - } - } - - cleanup: - cluster?.disconnect()?.timeout(10, TimeUnit.SECONDS)?.toBlocking()?.single() - environment.shutdown() - - where: - bucketSettings << [bucketCouchbase, bucketMemcache] - - environment = envBuilder(bucketSettings).build() - cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1")) - type = bucketSettings.type().name() - } - - def "test query"() { - setup: - // Only couchbase buckets support queries. - CouchbaseEnvironment environment = envBuilder(bucketCouchbase).build() - AsyncCluster cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1")) - def queryResult = new BlockingVariable() - - when: - // Mock expects this specific query. - // See com.couchbase.mock.http.query.QueryServer.handleString. - runUnderTrace("someTrace") { - cluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()).subscribe({ - bkt -> - bkt.query(N1qlQuery.simple("SELECT mockrow")) - .flatMap({ query -> query.rows() }) - .single() - .subscribe({ row -> queryResult.set(row.value()) }) - }) - - blockUntilChildSpansFinished(2) // Improve span ordering consistency - } - - then: - queryResult.get().get("row") == "value" - - sortAndAssertTraces(1) { - trace(0, 3) { - basicSpan(it, 0, "someTrace") - - assertCouchbaseCall(it, 2, "Cluster.openBucket", null, span(0)) - assertCouchbaseCall(it, 1, "Bucket.query", bucketCouchbase.name(), span(2)) - } - } - - cleanup: - cluster?.disconnect()?.timeout(10, TimeUnit.SECONDS)?.toBlocking()?.single() - environment.shutdown() - } -} diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy index 62f901b6f1a..872a46da3d2 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy @@ -1,42 +1,38 @@ import com.couchbase.client.java.Bucket -import com.couchbase.client.java.Cluster -import com.couchbase.client.java.CouchbaseCluster import com.couchbase.client.java.document.JsonDocument import com.couchbase.client.java.document.json.JsonObject -import com.couchbase.client.java.env.CouchbaseEnvironment import com.couchbase.client.java.query.N1qlQuery +import datadog.trace.api.DDSpanTypes +import io.opentracing.tag.Tags import util.AbstractCouchbaseTest -import static datadog.trace.agent.test.utils.TraceUtils.basicSpan -import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace - class CouchbaseClientTest extends AbstractCouchbaseTest { - def "test hasBucket #type"() { + + def "test client #type"() { when: - def hasBucket = manager.hasBucket(bucketSettings.name()) + manager.hasBucket(bucketSettings.name()) then: - assert hasBucket - sortAndAssertTraces(1) { + assertTraces(1) { trace(0, 1) { - assertCouchbaseCall(it, 0, "ClusterManager.hasBucket") + span(0) { + serviceName "couchbase" + resourceName "ClusterManager.hasBucket" + operationName "couchbase.call" + spanType DDSpanTypes.COUCHBASE + errored false + parent() + tags { + "$Tags.COMPONENT.key" "couchbase-client" + "$Tags.DB_TYPE.key" "couchbase" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + defaultTags() + } + } } } + TEST_WRITER.clear() - cleanup: - cluster?.disconnect() - environment.shutdown() - - where: - bucketSettings << [bucketCouchbase, bucketMemcache] - - environment = envBuilder(bucketSettings).build() - cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1")) - manager = cluster.clusterManager(USERNAME, PASSWORD) - type = bucketSettings.type().name() - } - - def "test upsert and get #type"() { when: // Connect to the bucket and open it Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password()) @@ -44,87 +40,70 @@ class CouchbaseClientTest extends AbstractCouchbaseTest { // Create a JSON document and store it with the ID "helloworld" JsonObject content = JsonObject.create().put("hello", "world") def inserted = bkt.upsert(JsonDocument.create("helloworld", content)) - def found = bkt.get("helloworld") then: - found == inserted - found.content().getString("hello") == "world" - - sortAndAssertTraces(3) { + assertTraces(1) { trace(0, 1) { - assertCouchbaseCall(it, 0, "Cluster.openBucket") - } - trace(1, 1) { - assertCouchbaseCall(it, 0, "Bucket.upsert", bucketSettings.name()) - } - trace(2, 1) { - assertCouchbaseCall(it, 0, "Bucket.get", bucketSettings.name()) + span(0) { + serviceName "couchbase" + resourceName "Bucket.upsert" + operationName "couchbase.call" + spanType DDSpanTypes.COUCHBASE + errored false + parent() + tags { + "$Tags.COMPONENT.key" "couchbase-client" + "$Tags.DB_TYPE.key" "couchbase" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "bucket" bkt.name() + defaultTags() + } + } } } + TEST_WRITER.clear() - cleanup: - cluster?.disconnect() - environment.shutdown() - - where: - bucketSettings << [bucketCouchbase, bucketMemcache] - - environment = envBuilder(bucketSettings).build() - cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1")) - type = bucketSettings.type().name() - } - - def "test upsert and get #type under trace"() { when: - // Connect to the bucket and open it - Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password()) - - // Create a JSON document and store it with the ID "helloworld" - JsonObject content = JsonObject.create().put("hello", "world") - - def inserted - def found - - runUnderTrace("someTrace") { - inserted = bkt.upsert(JsonDocument.create("helloworld", content)) - found = bkt.get("helloworld") - - blockUntilChildSpansFinished(2) - } + def found = bkt.get("helloworld") then: found == inserted found.content().getString("hello") == "world" - sortAndAssertTraces(2) { + and: + assertTraces(1) { trace(0, 1) { - assertCouchbaseCall(it, 0, "Cluster.openBucket") - } - trace(1, 3) { - basicSpan(it, 0, "someTrace") - assertCouchbaseCall(it, 2, "Bucket.upsert", bucketSettings.name(), span(0)) - assertCouchbaseCall(it, 1, "Bucket.get", bucketSettings.name(), span(0)) + span(0) { + serviceName "couchbase" + resourceName "Bucket.get" + operationName "couchbase.call" + spanType DDSpanTypes.COUCHBASE + errored false + parent() + tags { + "$Tags.COMPONENT.key" "couchbase-client" + "$Tags.DB_TYPE.key" "couchbase" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "bucket" bkt.name() + defaultTags() + } + } } } + TEST_WRITER.clear() - cleanup: - cluster?.disconnect() - environment.shutdown() where: - bucketSettings << [bucketCouchbase, bucketMemcache] + manager | cluster | bucketSettings + couchbaseManager | couchbaseCluster | bucketCouchbase + memcacheManager | memcacheCluster | bucketMemcache - environment = envBuilder(bucketSettings).build() - cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1")) type = bucketSettings.type().name() } def "test query"() { setup: - // Only couchbase buckets support queries. - CouchbaseEnvironment environment = envBuilder(bucketCouchbase).build() - Cluster cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1")) - Bucket bkt = cluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()) + Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password()) when: // Mock expects this specific query. @@ -137,17 +116,31 @@ class CouchbaseClientTest extends AbstractCouchbaseTest { result.first().value().get("row") == "value" and: - sortAndAssertTraces(2) { + assertTraces(1) { trace(0, 1) { - assertCouchbaseCall(it, 0, "Cluster.openBucket") - } - trace(1, 1) { - assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name()) + span(0) { + serviceName "couchbase" + resourceName "Bucket.query" + operationName "couchbase.call" + spanType DDSpanTypes.COUCHBASE + errored false + parent() + tags { + "$Tags.COMPONENT.key" "couchbase-client" + "$Tags.DB_TYPE.key" "couchbase" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "bucket" bkt.name() + defaultTags() + } + } } } - cleanup: - cluster?.disconnect() - environment.shutdown() + where: + manager | cluster | bucketSettings + couchbaseManager | couchbaseCluster | bucketCouchbase + // Only couchbase buckets support queries. + + type = bucketSettings.type().name() } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringRepositoryTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringRepositoryTest.groovy index 19c210f3686..75c139e7d09 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringRepositoryTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringRepositoryTest.groovy @@ -1,10 +1,9 @@ package springdata -import com.couchbase.client.java.Cluster -import com.couchbase.client.java.CouchbaseCluster -import com.couchbase.client.java.env.CouchbaseEnvironment import com.couchbase.client.java.view.DefaultView import com.couchbase.client.java.view.DesignDocument +import datadog.trace.api.DDSpanTypes +import io.opentracing.tag.Tags import org.springframework.context.ConfigurableApplicationContext import org.springframework.context.annotation.AnnotationConfigApplicationContext import org.springframework.data.repository.CrudRepository @@ -32,23 +31,21 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { DocRepository repo def setupSpec() { - CouchbaseEnvironment environment = envBuilder(bucketCouchbase).build() - Cluster couchbaseCluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1")) // Create view for SpringRepository's findAll() couchbaseCluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()).bucketManager() .insertDesignDocument( - DesignDocument.create("doc", Collections.singletonList(DefaultView.create("all", - ''' + DesignDocument.create("doc", Collections.singletonList(DefaultView.create("all", + ''' function (doc, meta) { if (doc._class == "springdata.Doc") { emit(meta.id, null); } } '''.stripIndent() - ))) - ) - CouchbaseConfig.setEnvironment(environment) + ))) + ) + CouchbaseConfig.setEnvironment(couchbaseEnvironment) CouchbaseConfig.setBucketSettings(bucketCouchbase) // Close all buckets and disconnect @@ -78,7 +75,21 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { and: assertTraces(1) { trace(0, 1) { - assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name()) + span(0) { + serviceName "couchbase" + resourceName "Bucket.query" + operationName "couchbase.call" + spanType DDSpanTypes.COUCHBASE + errored false + parent() + tags { + "$Tags.COMPONENT.key" "couchbase-client" + "$Tags.DB_TYPE.key" "couchbase" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "bucket" bucketCouchbase.name() + defaultTags() + } + } } } @@ -96,7 +107,21 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { and: assertTraces(1) { trace(0, 1) { - assertCouchbaseCall(it, 0, "Bucket.upsert", bucketCouchbase.name()) + span(0) { + serviceName "couchbase" + resourceName "Bucket.upsert" + operationName "couchbase.call" + spanType DDSpanTypes.COUCHBASE + errored false + parent() + tags { + "$Tags.COMPONENT.key" "couchbase-client" + "$Tags.DB_TYPE.key" "couchbase" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "bucket" bucketCouchbase.name() + defaultTags() + } + } } } TEST_WRITER.clear() @@ -107,7 +132,21 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { and: assertTraces(1) { trace(0, 1) { - assertCouchbaseCall(it, 0, "Bucket.get", bucketCouchbase.name()) + span(0) { + serviceName "couchbase" + resourceName "Bucket.get" + operationName "couchbase.call" + spanType DDSpanTypes.COUCHBASE + errored false + parent() + tags { + "$Tags.COMPONENT.key" "couchbase-client" + "$Tags.DB_TYPE.key" "couchbase" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "bucket" bucketCouchbase.name() + defaultTags() + } + } } } TEST_WRITER.clear() @@ -121,13 +160,55 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { assertTraces(3) { trace(0, 1) { - assertCouchbaseCall(it, 0, "Bucket.upsert", bucketCouchbase.name()) + span(0) { + serviceName "couchbase" + resourceName "Bucket.upsert" + operationName "couchbase.call" + spanType DDSpanTypes.COUCHBASE + errored false + parent() + tags { + "$Tags.COMPONENT.key" "couchbase-client" + "$Tags.DB_TYPE.key" "couchbase" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "bucket" bucketCouchbase.name() + defaultTags() + } + } } trace(1, 1) { - assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name()) + span(0) { + serviceName "couchbase" + resourceName "Bucket.query" + operationName "couchbase.call" + spanType DDSpanTypes.COUCHBASE + errored false + parent() + tags { + "$Tags.COMPONENT.key" "couchbase-client" + "$Tags.DB_TYPE.key" "couchbase" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "bucket" bucketCouchbase.name() + defaultTags() + } + } } trace(2, 1) { - assertCouchbaseCall(it, 0, "Bucket.get", bucketCouchbase.name()) + span(0) { + serviceName "couchbase" + resourceName "Bucket.get" + operationName "couchbase.call" + spanType DDSpanTypes.COUCHBASE + errored false + parent() + tags { + "$Tags.COMPONENT.key" "couchbase-client" + "$Tags.DB_TYPE.key" "couchbase" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "bucket" bucketCouchbase.name() + defaultTags() + } + } } } TEST_WRITER.clear() @@ -141,10 +222,38 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { and: assertTraces(2) { trace(0, 1) { - assertCouchbaseCall(it, 0, "Bucket.remove", bucketCouchbase.name()) + span(0) { + serviceName "couchbase" + resourceName "Bucket.remove" + operationName "couchbase.call" + spanType DDSpanTypes.COUCHBASE + errored false + parent() + tags { + "$Tags.COMPONENT.key" "couchbase-client" + "$Tags.DB_TYPE.key" "couchbase" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "bucket" bucketCouchbase.name() + defaultTags() + } + } } trace(1, 1) { - assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name()) + span(0) { + serviceName "couchbase" + resourceName "Bucket.query" + operationName "couchbase.call" + spanType DDSpanTypes.COUCHBASE + errored false + parent() + tags { + "$Tags.COMPONENT.key" "couchbase-client" + "$Tags.DB_TYPE.key" "couchbase" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "bucket" bucketCouchbase.name() + defaultTags() + } + } } } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringTemplateTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringTemplateTest.groovy index 5ec6e6072c3..157e2876bbc 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringTemplateTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringTemplateTest.groovy @@ -1,10 +1,8 @@ package springdata import com.couchbase.client.java.Bucket -import com.couchbase.client.java.Cluster -import com.couchbase.client.java.CouchbaseCluster -import com.couchbase.client.java.cluster.ClusterManager -import com.couchbase.client.java.env.CouchbaseEnvironment +import datadog.trace.api.DDSpanTypes +import io.opentracing.tag.Tags import org.springframework.data.couchbase.core.CouchbaseTemplate import spock.lang.Shared import util.AbstractCouchbaseTest @@ -14,26 +12,7 @@ class CouchbaseSpringTemplateTest extends AbstractCouchbaseTest { @Shared List templates - @Shared - Cluster couchbaseCluster - - @Shared - Cluster memcacheCluster - - @Shared - protected CouchbaseEnvironment couchbaseEnvironment - @Shared - protected CouchbaseEnvironment memcacheEnvironment - def setupSpec() { - couchbaseEnvironment = envBuilder(bucketCouchbase).build() - memcacheEnvironment = envBuilder(bucketMemcache).build() - - couchbaseCluster = CouchbaseCluster.create(couchbaseEnvironment, Arrays.asList("127.0.0.1")) - memcacheCluster = CouchbaseCluster.create(memcacheEnvironment, Arrays.asList("127.0.0.1")) - ClusterManager couchbaseManager = couchbaseCluster.clusterManager(USERNAME, PASSWORD) - ClusterManager memcacheManager = memcacheCluster.clusterManager(USERNAME, PASSWORD) - Bucket bucketCouchbase = couchbaseCluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()) Bucket bucketMemcache = memcacheCluster.openBucket(bucketMemcache.name(), bucketMemcache.password()) @@ -41,14 +20,8 @@ class CouchbaseSpringTemplateTest extends AbstractCouchbaseTest { new CouchbaseTemplate(memcacheManager.info(), bucketMemcache)] } - def cleanupSpec() { - couchbaseCluster?.disconnect() - memcacheCluster?.disconnect() - couchbaseEnvironment.shutdown() - memcacheEnvironment.shutdown() - } - def "test write #name"() { + def "test write/read #name"() { setup: def doc = new Doc() @@ -58,42 +31,81 @@ class CouchbaseSpringTemplateTest extends AbstractCouchbaseTest { then: template.findById("1", Doc) != null - and: - assertTraces(2) { - trace(0, 1) { - assertCouchbaseCall(it, 0, "Bucket.upsert", name) - } - trace(1, 1) { - assertCouchbaseCall(it, 0, "Bucket.get", name) - } - } - - where: - template << templates - name = template.couchbaseBucket.name() - } - - def "test remove #name"() { - setup: - def doc = new Doc() - when: - template.save(doc) template.remove(doc) then: template.findById("1", Doc) == null and: - assertTraces(3) { + assertTraces(4) { trace(0, 1) { - assertCouchbaseCall(it, 0, "Bucket.upsert", name) + span(0) { + serviceName "couchbase" + resourceName "Bucket.upsert" + operationName "couchbase.call" + spanType DDSpanTypes.COUCHBASE + errored false + parent() + tags { + "$Tags.COMPONENT.key" "couchbase-client" + "$Tags.DB_TYPE.key" "couchbase" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "bucket" name + defaultTags() + } + } } trace(1, 1) { - assertCouchbaseCall(it, 0, "Bucket.remove", name) + span(0) { + serviceName "couchbase" + resourceName "Bucket.get" + operationName "couchbase.call" + spanType DDSpanTypes.COUCHBASE + errored false + parent() + tags { + "$Tags.COMPONENT.key" "couchbase-client" + "$Tags.DB_TYPE.key" "couchbase" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "bucket" name + defaultTags() + } + } } trace(2, 1) { - assertCouchbaseCall(it, 0, "Bucket.get", name) + span(0) { + serviceName "couchbase" + resourceName "Bucket.remove" + operationName "couchbase.call" + spanType DDSpanTypes.COUCHBASE + errored false + parent() + tags { + "$Tags.COMPONENT.key" "couchbase-client" + "$Tags.DB_TYPE.key" "couchbase" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "bucket" name + defaultTags() + } + } + } + trace(3, 1) { + span(0) { + serviceName "couchbase" + resourceName "Bucket.get" + operationName "couchbase.call" + spanType DDSpanTypes.COUCHBASE + errored false + parent() + tags { + "$Tags.COMPONENT.key" "couchbase-client" + "$Tags.DB_TYPE.key" "couchbase" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + "bucket" name + defaultTags() + } + } } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy index 5f82218d947..b5e3f250874 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy @@ -1,33 +1,31 @@ package util + import com.couchbase.client.core.metrics.DefaultLatencyMetricsCollectorConfig import com.couchbase.client.core.metrics.DefaultMetricsCollectorConfig +import com.couchbase.client.java.CouchbaseCluster import com.couchbase.client.java.bucket.BucketType import com.couchbase.client.java.cluster.BucketSettings +import com.couchbase.client.java.cluster.ClusterManager import com.couchbase.client.java.cluster.DefaultBucketSettings +import com.couchbase.client.java.env.CouchbaseEnvironment import com.couchbase.client.java.env.DefaultCouchbaseEnvironment import com.couchbase.mock.Bucket import com.couchbase.mock.BucketConfiguration import com.couchbase.mock.CouchbaseMock import com.couchbase.mock.http.query.QueryServer -import datadog.opentracing.DDSpan import datadog.trace.agent.test.AgentTestRunner -import datadog.trace.agent.test.asserts.ListWriterAssert -import datadog.trace.agent.test.asserts.TraceAssert import datadog.trace.agent.test.utils.PortUtils import datadog.trace.api.Config -import datadog.trace.api.DDSpanTypes -import groovy.transform.stc.ClosureParams -import groovy.transform.stc.SimpleType -import io.opentracing.tag.Tags import spock.lang.Shared +import java.util.concurrent.RejectedExecutionException import java.util.concurrent.TimeUnit abstract class AbstractCouchbaseTest extends AgentTestRunner { - static final USERNAME = "Administrator" - static final PASSWORD = "password" + private static final USERNAME = "Administrator" + private static final PASSWORD = "password" @Shared private int port = PortUtils.randomOpenPort() @@ -55,16 +53,42 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner { @Shared CouchbaseMock mock + @Shared + protected CouchbaseCluster couchbaseCluster + @Shared + protected CouchbaseCluster memcacheCluster + @Shared + protected CouchbaseEnvironment couchbaseEnvironment + @Shared + protected CouchbaseEnvironment memcacheEnvironment + @Shared + protected ClusterManager couchbaseManager + @Shared + protected ClusterManager memcacheManager def setupSpec() { + mock = new CouchbaseMock("127.0.0.1", port, 1, 1) mock.httpServer.register("/query", new QueryServer()) mock.start() println "CouchbaseMock listening on localhost:$port" mock.createBucket(convert(bucketCouchbase)) + + couchbaseEnvironment = envBuilder(bucketCouchbase).build() + couchbaseCluster = CouchbaseCluster.create(couchbaseEnvironment, Arrays.asList("127.0.0.1")) + couchbaseManager = couchbaseCluster.clusterManager(USERNAME, PASSWORD) + mock.createBucket(convert(bucketMemcache)) + memcacheEnvironment = envBuilder(bucketMemcache).build() + memcacheCluster = CouchbaseCluster.create(memcacheEnvironment, Arrays.asList("127.0.0.1")) + memcacheManager = memcacheCluster.clusterManager(USERNAME, PASSWORD) + + // Cache buckets: + couchbaseCluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()) + memcacheCluster.openBucket(bucketMemcache.name(), bucketMemcache.password()) + // This setting should have no effect since decorator returns null for the instance. System.setProperty(Config.PREFIX + Config.DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "true") } @@ -80,12 +104,23 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner { } def cleanupSpec() { + try { + couchbaseCluster?.disconnect() + } catch (RejectedExecutionException e) { + // already closed by a test? + } + try { + memcacheCluster?.disconnect() + } catch (RejectedExecutionException e) { + // already closed by a test? + } + mock?.stop() System.clearProperty(Config.PREFIX + Config.DB_CLIENT_HOST_SPLIT_BY_INSTANCE) } - protected DefaultCouchbaseEnvironment.Builder envBuilder(BucketSettings bucketSettings) { + private DefaultCouchbaseEnvironment.Builder envBuilder(BucketSettings bucketSettings) { // Couchbase seems to be really slow to start sometimes def timeout = TimeUnit.SECONDS.toMillis(20) return DefaultCouchbaseEnvironment.builder() @@ -106,55 +141,4 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner { .analyticsTimeout(timeout) .socketConnectTimeout(timeout.intValue()) } - - void sortAndAssertTraces( - final int size, - @ClosureParams(value = SimpleType, options = "datadog.trace.agent.test.asserts.ListWriterAssert") - @DelegatesTo(value = ListWriterAssert, strategy = Closure.DELEGATE_FIRST) - final Closure spec) { - - TEST_WRITER.waitForTraces(size) - - TEST_WRITER.each { - it.sort({ - a, b -> - boolean aIsCouchbaseOperation = a.operationName == "couchbase.call" - boolean bIsCouchbaseOperation = b.operationName == "couchbase.call" - - if (aIsCouchbaseOperation && !bIsCouchbaseOperation) { - return 1 - } else if (!aIsCouchbaseOperation && bIsCouchbaseOperation) { - return -1 - } - - return a.resourceName.compareTo(b.resourceName) - }) - } - - assertTraces(size, spec) - } - - void assertCouchbaseCall(TraceAssert trace, int index, String name, String bucketName = null, Object parentSpan = null) { - trace.span(index) { - serviceName "couchbase" - resourceName name - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - if (parentSpan == null) { - parent() - } else { - childOf((DDSpan) parentSpan) - } - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - if (bucketName != null) { - "bucket" bucketName - } - defaultTags() - } - } - } } diff --git a/dd-java-agent/instrumentation/hystrix-1.4/hystrix-1.4.gradle b/dd-java-agent/instrumentation/hystrix-1.4/hystrix-1.4.gradle index a2b6b8a474b..60ae7f63460 100644 --- a/dd-java-agent/instrumentation/hystrix-1.4/hystrix-1.4.gradle +++ b/dd-java-agent/instrumentation/hystrix-1.4/hystrix-1.4.gradle @@ -17,14 +17,11 @@ testSets { } dependencies { - compile project(':dd-java-agent:instrumentation:rxjava-1') - compileOnly group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.4.0' compileOnly group: 'io.reactivex', name: 'rxjava', version: '1.0.7' testCompile project(':dd-java-agent:instrumentation:java-concurrent') testCompile project(':dd-java-agent:instrumentation:trace-annotation') - testCompile group: 'io.reactivex', name: 'rxjava', version: '1.0.7' testCompile group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.4.0' diff --git a/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/datadog/trace/instrumentation/hystrix/HystrixInstrumentation.java b/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/datadog/trace/instrumentation/hystrix/HystrixInstrumentation.java index 897ccce5816..dada3995196 100644 --- a/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/datadog/trace/instrumentation/hystrix/HystrixInstrumentation.java +++ b/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/datadog/trace/instrumentation/hystrix/HystrixInstrumentation.java @@ -8,15 +8,23 @@ import com.google.auto.service.AutoService; import com.netflix.hystrix.HystrixInvokableInfo; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.instrumentation.rxjava.TracedOnSubscribe; +import datadog.trace.context.TraceScope; +import io.opentracing.Scope; +import io.opentracing.ScopeManager; import io.opentracing.Span; +import io.opentracing.Tracer; +import io.opentracing.util.GlobalTracer; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; +import rx.DDTracingUtil; import rx.Observable; +import rx.Subscriber; +import rx.Subscription; @AutoService(Instrumenter.class) public class HystrixInstrumentation extends Instrumenter.Default { @@ -39,11 +47,10 @@ public String[] helperClassNames() { return new String[] { "rx.DDTracingUtil", "datadog.trace.agent.decorator.BaseDecorator", - "datadog.trace.instrumentation.rxjava.SpanFinishingSubscription", - "datadog.trace.instrumentation.rxjava.TracedSubscriber", - "datadog.trace.instrumentation.rxjava.TracedOnSubscribe", packageName + ".HystrixDecorator", - packageName + ".HystrixInstrumentation$HystrixOnSubscribe", + packageName + ".HystrixInstrumentation$SpanFinishingSubscription", + packageName + ".HystrixInstrumentation$TracedSubscriber", + packageName + ".HystrixInstrumentation$TracedOnSubscribe", }; } @@ -66,8 +73,11 @@ public static void stopSpan( @Advice.This final HystrixInvokableInfo command, @Advice.Return(readOnly = false) Observable result, @Advice.Thrown final Throwable throwable) { - - result = Observable.create(new HystrixOnSubscribe(result, command, "execute")); + final Observable.OnSubscribe onSubscribe = DDTracingUtil.extractOnSubscribe(result); + result = + Observable.create( + new TracedOnSubscribe( + onSubscribe, command, "execute", GlobalTracer.get().scopeManager().active())); } } @@ -78,30 +88,171 @@ public static void stopSpan( @Advice.This final HystrixInvokableInfo command, @Advice.Return(readOnly = false) Observable result, @Advice.Thrown final Throwable throwable) { - - result = Observable.create(new HystrixOnSubscribe(result, command, "fallback")); + final Observable.OnSubscribe onSubscribe = DDTracingUtil.extractOnSubscribe(result); + result = + Observable.create( + new TracedOnSubscribe( + onSubscribe, command, "fallback", GlobalTracer.get().scopeManager().active())); } } - public static class HystrixOnSubscribe extends TracedOnSubscribe { + public static class TracedOnSubscribe implements Observable.OnSubscribe { + + private final Observable.OnSubscribe delegate; private final HystrixInvokableInfo command; private final String methodName; + private final TraceScope.Continuation continuation; - public HystrixOnSubscribe( - final Observable originalObservable, + public TracedOnSubscribe( + final Observable.OnSubscribe delegate, final HystrixInvokableInfo command, - final String methodName) { - super(originalObservable, OPERATION_NAME, DECORATE); - + final String methodName, + final Scope parentScope) { + this.delegate = delegate; this.command = command; this.methodName = methodName; + continuation = + parentScope instanceof TraceScope ? ((TraceScope) parentScope).capture() : null; } @Override - protected void afterStart(final Span span) { - super.afterStart(span); - + public void call(final Subscriber subscriber) { + final Tracer tracer = GlobalTracer.get(); + final Span span; // span finished by TracedSubscriber + if (continuation != null) { + try (final TraceScope scope = continuation.activate()) { + span = tracer.buildSpan(OPERATION_NAME).start(); + } + } else { + span = tracer.buildSpan(OPERATION_NAME).start(); + } + DECORATE.afterStart(span); DECORATE.onCommand(span, command, methodName); + + try (final Scope scope = tracer.scopeManager().activate(span, false)) { + if (!((TraceScope) scope).isAsyncPropagating()) { + ((TraceScope) scope).setAsyncPropagation(true); + } + delegate.call(new TracedSubscriber(span, subscriber)); + } + } + } + + public static class TracedSubscriber extends Subscriber { + + private final ScopeManager scopeManager = GlobalTracer.get().scopeManager(); + private final AtomicReference spanRef; + private final Subscriber delegate; + + public TracedSubscriber(final Span span, final Subscriber delegate) { + spanRef = new AtomicReference<>(span); + this.delegate = delegate; + final SpanFinishingSubscription subscription = new SpanFinishingSubscription(spanRef); + delegate.add(subscription); + } + + @Override + public void onStart() { + final Span span = spanRef.get(); + if (span != null) { + try (final Scope scope = scopeManager.activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + delegate.onStart(); + } + } else { + delegate.onStart(); + } + } + + @Override + public void onNext(final T value) { + final Span span = spanRef.get(); + if (span != null) { + try (final Scope scope = scopeManager.activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + delegate.onNext(value); + } catch (final Throwable e) { + onError(e); + } + } else { + delegate.onNext(value); + } + } + + @Override + public void onCompleted() { + final Span span = spanRef.getAndSet(null); + if (span != null) { + boolean errored = false; + try (final Scope scope = scopeManager.activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + delegate.onCompleted(); + } catch (final Throwable e) { + // Repopulate the spanRef for onError + spanRef.compareAndSet(null, span); + onError(e); + errored = true; + } finally { + // finish called by onError, so don't finish again. + if (!errored) { + DECORATE.beforeFinish(span); + span.finish(); + } + } + } else { + delegate.onCompleted(); + } + } + + @Override + public void onError(final Throwable e) { + final Span span = spanRef.getAndSet(null); + if (span != null) { + try (final Scope scope = scopeManager.activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + DECORATE.onError(span, e); + delegate.onError(e); + } catch (final Throwable e2) { + DECORATE.onError(span, e2); + throw e2; + } finally { + DECORATE.beforeFinish(span); + span.finish(); + } + } else { + delegate.onError(e); + } + } + } + + public static class SpanFinishingSubscription implements Subscription { + + private final AtomicReference spanRef; + + public SpanFinishingSubscription(final AtomicReference spanRef) { + this.spanRef = spanRef; + } + + @Override + public void unsubscribe() { + final Span span = spanRef.getAndSet(null); + if (span != null) { + DECORATE.beforeFinish(span); + span.finish(); + } + } + + @Override + public boolean isUnsubscribed() { + return spanRef.get() == null; } } } diff --git a/dd-java-agent/instrumentation/rxjava-1/src/main/java/rx/DDTracingUtil.java b/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/rx/DDTracingUtil.java similarity index 100% rename from dd-java-agent/instrumentation/rxjava-1/src/main/java/rx/DDTracingUtil.java rename to dd-java-agent/instrumentation/hystrix-1.4/src/main/java/rx/DDTracingUtil.java diff --git a/dd-java-agent/instrumentation/rxjava-1/rxjava-1.gradle b/dd-java-agent/instrumentation/rxjava-1/rxjava-1.gradle deleted file mode 100644 index d9e224bf770..00000000000 --- a/dd-java-agent/instrumentation/rxjava-1/rxjava-1.gradle +++ /dev/null @@ -1,13 +0,0 @@ -apply from: "${rootDir}/gradle/java.gradle" - -apply plugin: 'org.unbroken-dome.test-sets' - -testSets { - latestDepTest { - dirName = 'test' - } -} - -dependencies { - compileOnly group: 'io.reactivex', name: 'rxjava', version: '1.0.7' -} diff --git a/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/SpanFinishingSubscription.java b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/SpanFinishingSubscription.java deleted file mode 100644 index 888d8ded9f2..00000000000 --- a/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/SpanFinishingSubscription.java +++ /dev/null @@ -1,31 +0,0 @@ -package datadog.trace.instrumentation.rxjava; - -import datadog.trace.agent.decorator.BaseDecorator; -import io.opentracing.Span; -import java.util.concurrent.atomic.AtomicReference; -import rx.Subscription; - -public class SpanFinishingSubscription implements Subscription { - private final BaseDecorator decorator; - private final AtomicReference spanRef; - - public SpanFinishingSubscription( - final BaseDecorator decorator, final AtomicReference spanRef) { - this.decorator = decorator; - this.spanRef = spanRef; - } - - @Override - public void unsubscribe() { - final Span span = spanRef.getAndSet(null); - if (span != null) { - decorator.beforeFinish(span); - span.finish(); - } - } - - @Override - public boolean isUnsubscribed() { - return spanRef.get() == null; - } -} diff --git a/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedOnSubscribe.java b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedOnSubscribe.java deleted file mode 100644 index 8428fa72b1d..00000000000 --- a/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedOnSubscribe.java +++ /dev/null @@ -1,58 +0,0 @@ -package datadog.trace.instrumentation.rxjava; - -import datadog.trace.agent.decorator.BaseDecorator; -import datadog.trace.context.TraceScope; -import io.opentracing.Scope; -import io.opentracing.Span; -import io.opentracing.Tracer; -import io.opentracing.util.GlobalTracer; -import rx.DDTracingUtil; -import rx.Observable; -import rx.Subscriber; - -public class TracedOnSubscribe implements Observable.OnSubscribe { - - private final Observable.OnSubscribe delegate; - private final String operationName; - private final TraceScope.Continuation continuation; - private final BaseDecorator decorator; - - public TracedOnSubscribe( - final Observable originalObservable, - final String operationName, - final BaseDecorator decorator) { - this.delegate = DDTracingUtil.extractOnSubscribe(originalObservable); - this.operationName = operationName; - this.decorator = decorator; - - final Scope parentScope = GlobalTracer.get().scopeManager().active(); - - continuation = parentScope instanceof TraceScope ? ((TraceScope) parentScope).capture() : null; - } - - @Override - public void call(final Subscriber subscriber) { - final Tracer tracer = GlobalTracer.get(); - final Span span; // span finished by TracedSubscriber - if (continuation != null) { - try (final TraceScope scope = continuation.activate()) { - span = tracer.buildSpan(operationName).start(); - } - } else { - span = tracer.buildSpan(operationName).start(); - } - - afterStart(span); - - try (final Scope scope = tracer.scopeManager().activate(span, false)) { - if (!((TraceScope) scope).isAsyncPropagating()) { - ((TraceScope) scope).setAsyncPropagation(true); - } - delegate.call(new TracedSubscriber(span, subscriber, decorator)); - } - } - - protected void afterStart(final Span span) { - decorator.afterStart(span); - } -} diff --git a/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedSubscriber.java b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedSubscriber.java deleted file mode 100644 index 8cefff4381c..00000000000 --- a/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedSubscriber.java +++ /dev/null @@ -1,109 +0,0 @@ -package datadog.trace.instrumentation.rxjava; - -import datadog.trace.agent.decorator.BaseDecorator; -import datadog.trace.context.TraceScope; -import io.opentracing.Scope; -import io.opentracing.ScopeManager; -import io.opentracing.Span; -import io.opentracing.util.GlobalTracer; -import java.util.concurrent.atomic.AtomicReference; -import rx.Subscriber; - -public class TracedSubscriber extends Subscriber { - - private final ScopeManager scopeManager = GlobalTracer.get().scopeManager(); - private final AtomicReference spanRef; - private final Subscriber delegate; - private final BaseDecorator decorator; - - public TracedSubscriber( - final Span span, final Subscriber delegate, final BaseDecorator decorator) { - spanRef = new AtomicReference<>(span); - this.delegate = delegate; - this.decorator = decorator; - final SpanFinishingSubscription subscription = - new SpanFinishingSubscription(decorator, spanRef); - delegate.add(subscription); - } - - @Override - public void onStart() { - final Span span = spanRef.get(); - if (span != null) { - try (final Scope scope = scopeManager.activate(span, false)) { - if (scope instanceof TraceScope) { - ((TraceScope) scope).setAsyncPropagation(true); - } - delegate.onStart(); - } - } else { - delegate.onStart(); - } - } - - @Override - public void onNext(final T value) { - final Span span = spanRef.get(); - if (span != null) { - try (final Scope scope = scopeManager.activate(span, false)) { - if (scope instanceof TraceScope) { - ((TraceScope) scope).setAsyncPropagation(true); - } - delegate.onNext(value); - } catch (final Throwable e) { - onError(e); - } - } else { - delegate.onNext(value); - } - } - - @Override - public void onCompleted() { - final Span span = spanRef.getAndSet(null); - if (span != null) { - boolean errored = false; - try (final Scope scope = scopeManager.activate(span, false)) { - if (scope instanceof TraceScope) { - ((TraceScope) scope).setAsyncPropagation(true); - } - delegate.onCompleted(); - } catch (final Throwable e) { - // Repopulate the spanRef for onError - spanRef.compareAndSet(null, span); - onError(e); - errored = true; - } finally { - // finish called by onError, so don't finish again. - if (!errored) { - decorator.beforeFinish(span); - span.finish(); - } - } - } else { - delegate.onCompleted(); - } - } - - @Override - public void onError(final Throwable e) { - final Span span = spanRef.getAndSet(null); - if (span != null) { - try (final Scope scope = scopeManager.activate(span, false)) { - if (scope instanceof TraceScope) { - ((TraceScope) scope).setAsyncPropagation(true); - } - decorator.onError(span, e); - delegate.onError(e); - } catch (final Throwable e2) { - decorator.onError(span, e2); - throw e2; - } finally { - decorator.beforeFinish(span); - span.finish(); - } - } else { - delegate.onError(e); - } - } -} diff --git a/settings.gradle b/settings.gradle index 8dbd7246e0d..fd2d31072cc 100644 --- a/settings.gradle +++ b/settings.gradle @@ -80,7 +80,6 @@ include ':dd-java-agent:instrumentation:play-2.4' include ':dd-java-agent:instrumentation:play-2.6' include ':dd-java-agent:instrumentation:rabbitmq-amqp-2.7' include ':dd-java-agent:instrumentation:ratpack-1.4' -include ':dd-java-agent:instrumentation:rxjava-1' include ':dd-java-agent:instrumentation:reactor-core-3.1' include ':dd-java-agent:instrumentation:servlet-2' include ':dd-java-agent:instrumentation:servlet-3'