diff --git a/dd-java-agent/instrumentation/couchbase-2.0/couchbase-2.0.gradle b/dd-java-agent/instrumentation/couchbase-2.0/couchbase-2.0.gradle index 95869236063..0aeb5d3960d 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/couchbase-2.0.gradle +++ b/dd-java-agent/instrumentation/couchbase-2.0/couchbase-2.0.gradle @@ -44,8 +44,10 @@ muzzle { } dependencies { + compile project(':dd-java-agent:instrumentation:rxjava-1') + compileOnly group: 'com.couchbase.client', name: 'java-client', version: '2.0.0' - + testCompile group: 'com.couchbase.mock', name: 'CouchbaseMock', version: '1.5.19' testCompile group: 'org.springframework.data', name: 'spring-data-couchbase', version: '2.0.0.RELEASE' diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseBucketInstrumentation.java b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseBucketInstrumentation.java index 50ed987a319..770bcc0df24 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseBucketInstrumentation.java +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseBucketInstrumentation.java @@ -1,6 +1,5 @@ package datadog.trace.instrumentation.couchbase.client; -import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isInterface; import static net.bytebuddy.matcher.ElementMatchers.isMethod; @@ -12,22 +11,14 @@ import com.couchbase.client.java.CouchbaseCluster; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.api.DDTags; import datadog.trace.bootstrap.CallDepthThreadLocalMap; -import io.opentracing.Scope; -import io.opentracing.Span; -import io.opentracing.noop.NoopSpan; -import io.opentracing.util.GlobalTracer; import java.lang.reflect.Method; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import rx.Observable; -import rx.functions.Action0; -import rx.functions.Action1; @AutoService(Instrumenter.class) public class CouchbaseBucketInstrumentation extends Instrumenter.Default { @@ -47,13 +38,15 @@ public ElementMatcher typeMatcher() { @Override public String[] helperClassNames() { return new String[] { + "rx.DDTracingUtil", "datadog.trace.agent.decorator.BaseDecorator", "datadog.trace.agent.decorator.ClientDecorator", "datadog.trace.agent.decorator.DatabaseClientDecorator", + "datadog.trace.instrumentation.rxjava.SpanFinishingSubscription", + "datadog.trace.instrumentation.rxjava.TracedSubscriber", + "datadog.trace.instrumentation.rxjava.TracedOnSubscribe", packageName + ".CouchbaseClientDecorator", - getClass().getName() + "$TraceSpanStart", - getClass().getName() + "$TraceSpanFinish", - getClass().getName() + "$TraceSpanError", + packageName + ".CouchbaseOnSubscribe", }; } @@ -76,94 +69,13 @@ public static void subscribeResult( @Advice.Enter final int callDepth, @Advice.Origin final Method method, @Advice.FieldValue("bucket") final String bucket, - @Advice.AllArguments final Object[] args, @Advice.Return(readOnly = false) Observable result) { if (callDepth > 0) { return; } CallDepthThreadLocalMap.reset(CouchbaseCluster.class); - final AtomicReference spanRef = new AtomicReference<>(); - result = - result - .doOnSubscribe(new TraceSpanStart(method, bucket, spanRef)) - .doOnCompleted(new TraceSpanFinish(spanRef)) - .doOnError(new TraceSpanError(spanRef)); - } - } - - public static class TraceSpanStart implements Action0 { - private final Method method; - private final String bucket; - private final AtomicReference spanRef; - - public TraceSpanStart( - final Method method, final String bucket, final AtomicReference spanRef) { - this.method = method; - this.bucket = bucket; - this.spanRef = spanRef; - } - - @Override - public void call() { - // This is called each time an observer has a new subscriber, but we should only time it once. - if (!spanRef.compareAndSet(null, NoopSpan.INSTANCE)) { - return; - } - final Class declaringClass = method.getDeclaringClass(); - final String className = - declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", ""); - final String resourceName = className + "." + method.getName(); - - final Span span = - GlobalTracer.get() - .buildSpan("couchbase.call") - .withTag(DDTags.RESOURCE_NAME, resourceName) - .withTag("bucket", bucket) - .start(); - try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { - // just replace the no-op span. - spanRef.set(DECORATE.afterStart(span)); - } - } - } - public static class TraceSpanFinish implements Action0 { - private final AtomicReference spanRef; - - public TraceSpanFinish(final AtomicReference spanRef) { - this.spanRef = spanRef; - } - - @Override - public void call() { - final Span span = spanRef.getAndSet(null); - - if (span != null) { - try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { - DECORATE.beforeFinish(span); - span.finish(); - } - } - } - } - - public static class TraceSpanError implements Action1 { - private final AtomicReference spanRef; - - public TraceSpanError(final AtomicReference spanRef) { - this.spanRef = spanRef; - } - - @Override - public void call(final Throwable throwable) { - final Span span = spanRef.getAndSet(null); - if (span != null) { - try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { - DECORATE.onError(span, throwable); - DECORATE.beforeFinish(span); - span.finish(); - } - } + result = Observable.create(new CouchbaseOnSubscribe(result, method, bucket)); } } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java index b5549d014a2..090672c3598 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java @@ -1,6 +1,5 @@ package datadog.trace.instrumentation.couchbase.client; -import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isInterface; import static net.bytebuddy.matcher.ElementMatchers.isMethod; @@ -12,22 +11,14 @@ import com.couchbase.client.java.CouchbaseCluster; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.api.DDTags; import datadog.trace.bootstrap.CallDepthThreadLocalMap; -import io.opentracing.Scope; -import io.opentracing.Span; -import io.opentracing.noop.NoopSpan; -import io.opentracing.util.GlobalTracer; import java.lang.reflect.Method; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import rx.Observable; -import rx.functions.Action0; -import rx.functions.Action1; @AutoService(Instrumenter.class) public class CouchbaseClusterInstrumentation extends Instrumenter.Default { @@ -47,20 +38,22 @@ public ElementMatcher typeMatcher() { @Override public String[] helperClassNames() { return new String[] { + "rx.DDTracingUtil", "datadog.trace.agent.decorator.BaseDecorator", "datadog.trace.agent.decorator.ClientDecorator", "datadog.trace.agent.decorator.DatabaseClientDecorator", + "datadog.trace.instrumentation.rxjava.SpanFinishingSubscription", + "datadog.trace.instrumentation.rxjava.TracedSubscriber", + "datadog.trace.instrumentation.rxjava.TracedOnSubscribe", packageName + ".CouchbaseClientDecorator", - getClass().getName() + "$TraceSpanStart", - getClass().getName() + "$TraceSpanFinish", - getClass().getName() + "$TraceSpanError", + packageName + ".CouchbaseOnSubscribe", }; } @Override public Map, String> transformers() { return singletonMap( - isMethod().and(isPublic()).and(returns(named("rx.Observable"))), + isMethod().and(isPublic()).and(returns(named("rx.Observable"))).and(not(named("core"))), CouchbaseClientAdvice.class.getName()); } @@ -80,84 +73,7 @@ public static void subscribeResult( return; } CallDepthThreadLocalMap.reset(CouchbaseCluster.class); - final AtomicReference spanRef = new AtomicReference<>(); - result = - result - .doOnSubscribe(new TraceSpanStart(method, spanRef)) - .doOnCompleted(new TraceSpanFinish(spanRef)) - .doOnError(new TraceSpanError(spanRef)); - } - } - - public static class TraceSpanStart implements Action0 { - private final Method method; - private final AtomicReference spanRef; - - public TraceSpanStart(final Method method, final AtomicReference spanRef) { - this.method = method; - this.spanRef = spanRef; - } - - @Override - public void call() { - // This is called each time an observer has a new subscriber, but we should only time it once. - if (!spanRef.compareAndSet(null, NoopSpan.INSTANCE)) { - return; - } - final Class declaringClass = method.getDeclaringClass(); - final String className = - declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", ""); - final String resourceName = className + "." + method.getName(); - - final Span span = - GlobalTracer.get() - .buildSpan("couchbase.call") - .withTag(DDTags.RESOURCE_NAME, resourceName) - .start(); - try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { - // just replace the no-op span. - spanRef.set(DECORATE.afterStart(scope.span())); - } - } - } - - public static class TraceSpanFinish implements Action0 { - private final AtomicReference spanRef; - - public TraceSpanFinish(final AtomicReference spanRef) { - this.spanRef = spanRef; - } - - @Override - public void call() { - final Span span = spanRef.getAndSet(null); - - if (span != null) { - try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { - DECORATE.beforeFinish(span); - span.finish(); - } - } - } - } - - public static class TraceSpanError implements Action1 { - private final AtomicReference spanRef; - - public TraceSpanError(final AtomicReference spanRef) { - this.spanRef = spanRef; - } - - @Override - public void call(final Throwable throwable) { - final Span span = spanRef.getAndSet(null); - if (span != null) { - try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { - DECORATE.onError(span, throwable); - DECORATE.beforeFinish(span); - span.finish(); - } - } + result = Observable.create(new CouchbaseOnSubscribe(result, method, null)); } } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseOnSubscribe.java b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseOnSubscribe.java new file mode 100644 index 00000000000..b20fcee0ab4 --- /dev/null +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseOnSubscribe.java @@ -0,0 +1,36 @@ +package datadog.trace.instrumentation.couchbase.client; + +import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE; + +import datadog.trace.api.DDTags; +import datadog.trace.instrumentation.rxjava.TracedOnSubscribe; +import io.opentracing.Span; +import java.lang.reflect.Method; +import rx.Observable; + +public class CouchbaseOnSubscribe extends TracedOnSubscribe { + private final String resourceName; + private final String bucket; + + public CouchbaseOnSubscribe( + final Observable originalObservable, final Method method, final String bucket) { + super(originalObservable, "couchbase.call", DECORATE); + + final Class declaringClass = method.getDeclaringClass(); + final String className = + declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", ""); + resourceName = className + "." + method.getName(); + this.bucket = bucket; + } + + @Override + protected void afterStart(final Span span) { + super.afterStart(span); + + span.setTag(DDTags.RESOURCE_NAME, resourceName); + + if (bucket != null) { + span.setTag("bucket", bucket); + } + } +} diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy new file mode 100644 index 00000000000..5b0d2b98622 --- /dev/null +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy @@ -0,0 +1,174 @@ +import com.couchbase.client.java.AsyncCluster +import com.couchbase.client.java.CouchbaseAsyncCluster +import com.couchbase.client.java.document.JsonDocument +import com.couchbase.client.java.document.json.JsonObject +import com.couchbase.client.java.env.CouchbaseEnvironment +import com.couchbase.client.java.query.N1qlQuery +import spock.util.concurrent.BlockingVariable +import util.AbstractCouchbaseTest + +import java.util.concurrent.TimeUnit + +import static datadog.trace.agent.test.utils.TraceUtils.basicSpan +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace + +class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { + static final int TIMEOUT = 10 + + def "test hasBucket #type"() { + setup: + def hasBucket = new BlockingVariable(TIMEOUT) + + when: + cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ bkt -> + manager.hasBucket(bucketSettings.name()).subscribe({ result -> hasBucket.set(result) }) + }) + + then: + assert hasBucket.get() + sortAndAssertTraces(1) { + trace(0, 2) { + assertCouchbaseCall(it, 0, "Cluster.openBucket", null) + assertCouchbaseCall(it, 1, "ClusterManager.hasBucket", null, span(0)) + } + } + + cleanup: + cluster?.disconnect()?.timeout(TIMEOUT, TimeUnit.SECONDS)?.toBlocking()?.single() + environment.shutdown() + + where: + bucketSettings << [bucketCouchbase, bucketMemcache] + + environment = envBuilder(bucketSettings).build() + cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1")) + manager = cluster.clusterManager(USERNAME, PASSWORD).toBlocking().single() + type = bucketSettings.type().name() + } + + def "test upsert #type"() { + setup: + JsonObject content = JsonObject.create().put("hello", "world") + def inserted = new BlockingVariable(TIMEOUT) + + when: + runUnderTrace("someTrace") { + // Connect to the bucket and open it + cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ bkt -> + bkt.upsert(JsonDocument.create("helloworld", content)).subscribe({ result -> inserted.set(result) }) + }) + + blockUntilChildSpansFinished(2) // Improve span ordering consistency + } + + then: + inserted.get().content().getString("hello") == "world" + + sortAndAssertTraces(1) { + trace(0, 3) { + basicSpan(it, 0, "someTrace") + + assertCouchbaseCall(it, 2, "Cluster.openBucket", null, span(0)) + assertCouchbaseCall(it, 1, "Bucket.upsert", bucketSettings.name(), span(2)) + } + } + + cleanup: + cluster?.disconnect()?.timeout(TIMEOUT, TimeUnit.SECONDS)?.toBlocking()?.single() + environment.shutdown() + + where: + bucketSettings << [bucketCouchbase, bucketMemcache] + + environment = envBuilder(bucketSettings).build() + cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1")) + type = bucketSettings.type().name() + } + + def "test upsert and get #type"() { + setup: + JsonObject content = JsonObject.create().put("hello", "world") + def inserted = new BlockingVariable(TIMEOUT) + def found = new BlockingVariable(TIMEOUT) + + when: + runUnderTrace("someTrace") { + cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ bkt -> + bkt.upsert(JsonDocument.create("helloworld", content)) + .subscribe({ result -> + inserted.set(result) + bkt.get("helloworld") + .subscribe({ searchResult -> found.set(searchResult) + }) + }) + }) + + blockUntilChildSpansFinished(3) // Improve span ordering consistency + } + + // Create a JSON document and store it with the ID "helloworld" + then: + found.get() == inserted.get() + found.get().content().getString("hello") == "world" + + sortAndAssertTraces(1) { + trace(0, 4) { + basicSpan(it, 0, "someTrace") + + assertCouchbaseCall(it, 3, "Cluster.openBucket", null, span(0)) + assertCouchbaseCall(it, 2, "Bucket.upsert", bucketSettings.name(), span(3)) + assertCouchbaseCall(it, 1, "Bucket.get", bucketSettings.name(), span(2)) + } + } + + cleanup: + cluster?.disconnect()?.timeout(TIMEOUT, TimeUnit.SECONDS)?.toBlocking()?.single() + environment.shutdown() + + where: + bucketSettings << [bucketCouchbase, bucketMemcache] + + environment = envBuilder(bucketSettings).build() + cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1")) + type = bucketSettings.type().name() + } + + def "test query"() { + setup: + // Only couchbase buckets support queries. + CouchbaseEnvironment environment = envBuilder(bucketCouchbase).build() + AsyncCluster cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1")) + def queryResult = new BlockingVariable(TIMEOUT) + + when: + // Mock expects this specific query. + // See com.couchbase.mock.http.query.QueryServer.handleString. + runUnderTrace("someTrace") { + cluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()).subscribe({ + bkt -> + bkt.query(N1qlQuery.simple("SELECT mockrow")) + .flatMap({ query -> query.rows() }) + .single() + .subscribe({ row -> queryResult.set(row.value()) }) + }) + + blockUntilChildSpansFinished(2) // Improve span ordering consistency + } + + then: + queryResult.get().get("row") == "value" + + sortAndAssertTraces(1) { + trace(0, 3) { + basicSpan(it, 0, "someTrace") + + assertCouchbaseCall(it, 2, "Cluster.openBucket", null, span(0)) + assertCouchbaseCall(it, 1, "Bucket.query", bucketCouchbase.name(), span(2)) + } + } + + cleanup: + cluster?.disconnect()?.timeout(TIMEOUT, TimeUnit.SECONDS)?.toBlocking()?.single() + environment.shutdown() + } +} diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy index 872a46da3d2..62f901b6f1a 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy @@ -1,38 +1,42 @@ import com.couchbase.client.java.Bucket +import com.couchbase.client.java.Cluster +import com.couchbase.client.java.CouchbaseCluster import com.couchbase.client.java.document.JsonDocument import com.couchbase.client.java.document.json.JsonObject +import com.couchbase.client.java.env.CouchbaseEnvironment import com.couchbase.client.java.query.N1qlQuery -import datadog.trace.api.DDSpanTypes -import io.opentracing.tag.Tags import util.AbstractCouchbaseTest -class CouchbaseClientTest extends AbstractCouchbaseTest { +import static datadog.trace.agent.test.utils.TraceUtils.basicSpan +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace - def "test client #type"() { +class CouchbaseClientTest extends AbstractCouchbaseTest { + def "test hasBucket #type"() { when: - manager.hasBucket(bucketSettings.name()) + def hasBucket = manager.hasBucket(bucketSettings.name()) then: - assertTraces(1) { + assert hasBucket + sortAndAssertTraces(1) { trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "ClusterManager.hasBucket" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - defaultTags() - } - } + assertCouchbaseCall(it, 0, "ClusterManager.hasBucket") } } - TEST_WRITER.clear() + cleanup: + cluster?.disconnect() + environment.shutdown() + + where: + bucketSettings << [bucketCouchbase, bucketMemcache] + + environment = envBuilder(bucketSettings).build() + cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1")) + manager = cluster.clusterManager(USERNAME, PASSWORD) + type = bucketSettings.type().name() + } + + def "test upsert and get #type"() { when: // Connect to the bucket and open it Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password()) @@ -40,70 +44,87 @@ class CouchbaseClientTest extends AbstractCouchbaseTest { // Create a JSON document and store it with the ID "helloworld" JsonObject content = JsonObject.create().put("hello", "world") def inserted = bkt.upsert(JsonDocument.create("helloworld", content)) + def found = bkt.get("helloworld") then: - assertTraces(1) { + found == inserted + found.content().getString("hello") == "world" + + sortAndAssertTraces(3) { trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.upsert" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bkt.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Cluster.openBucket") + } + trace(1, 1) { + assertCouchbaseCall(it, 0, "Bucket.upsert", bucketSettings.name()) + } + trace(2, 1) { + assertCouchbaseCall(it, 0, "Bucket.get", bucketSettings.name()) } } - TEST_WRITER.clear() + cleanup: + cluster?.disconnect() + environment.shutdown() + + where: + bucketSettings << [bucketCouchbase, bucketMemcache] + + environment = envBuilder(bucketSettings).build() + cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1")) + type = bucketSettings.type().name() + } + + def "test upsert and get #type under trace"() { when: - def found = bkt.get("helloworld") + // Connect to the bucket and open it + Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password()) + + // Create a JSON document and store it with the ID "helloworld" + JsonObject content = JsonObject.create().put("hello", "world") + + def inserted + def found + + runUnderTrace("someTrace") { + inserted = bkt.upsert(JsonDocument.create("helloworld", content)) + found = bkt.get("helloworld") + + blockUntilChildSpansFinished(2) + } then: found == inserted found.content().getString("hello") == "world" - and: - assertTraces(1) { + sortAndAssertTraces(2) { trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.get" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bkt.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Cluster.openBucket") + } + trace(1, 3) { + basicSpan(it, 0, "someTrace") + assertCouchbaseCall(it, 2, "Bucket.upsert", bucketSettings.name(), span(0)) + assertCouchbaseCall(it, 1, "Bucket.get", bucketSettings.name(), span(0)) } } - TEST_WRITER.clear() + cleanup: + cluster?.disconnect() + environment.shutdown() where: - manager | cluster | bucketSettings - couchbaseManager | couchbaseCluster | bucketCouchbase - memcacheManager | memcacheCluster | bucketMemcache + bucketSettings << [bucketCouchbase, bucketMemcache] + environment = envBuilder(bucketSettings).build() + cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1")) type = bucketSettings.type().name() } def "test query"() { setup: - Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password()) + // Only couchbase buckets support queries. + CouchbaseEnvironment environment = envBuilder(bucketCouchbase).build() + Cluster cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1")) + Bucket bkt = cluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()) when: // Mock expects this specific query. @@ -116,31 +137,17 @@ class CouchbaseClientTest extends AbstractCouchbaseTest { result.first().value().get("row") == "value" and: - assertTraces(1) { + sortAndAssertTraces(2) { trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.query" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bkt.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Cluster.openBucket") + } + trace(1, 1) { + assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name()) } } - where: - manager | cluster | bucketSettings - couchbaseManager | couchbaseCluster | bucketCouchbase - // Only couchbase buckets support queries. - - type = bucketSettings.type().name() + cleanup: + cluster?.disconnect() + environment.shutdown() } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringRepositoryTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringRepositoryTest.groovy index 75c139e7d09..19c210f3686 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringRepositoryTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringRepositoryTest.groovy @@ -1,9 +1,10 @@ package springdata +import com.couchbase.client.java.Cluster +import com.couchbase.client.java.CouchbaseCluster +import com.couchbase.client.java.env.CouchbaseEnvironment import com.couchbase.client.java.view.DefaultView import com.couchbase.client.java.view.DesignDocument -import datadog.trace.api.DDSpanTypes -import io.opentracing.tag.Tags import org.springframework.context.ConfigurableApplicationContext import org.springframework.context.annotation.AnnotationConfigApplicationContext import org.springframework.data.repository.CrudRepository @@ -31,21 +32,23 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { DocRepository repo def setupSpec() { + CouchbaseEnvironment environment = envBuilder(bucketCouchbase).build() + Cluster couchbaseCluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1")) // Create view for SpringRepository's findAll() couchbaseCluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()).bucketManager() .insertDesignDocument( - DesignDocument.create("doc", Collections.singletonList(DefaultView.create("all", - ''' + DesignDocument.create("doc", Collections.singletonList(DefaultView.create("all", + ''' function (doc, meta) { if (doc._class == "springdata.Doc") { emit(meta.id, null); } } '''.stripIndent() - ))) - ) - CouchbaseConfig.setEnvironment(couchbaseEnvironment) + ))) + ) + CouchbaseConfig.setEnvironment(environment) CouchbaseConfig.setBucketSettings(bucketCouchbase) // Close all buckets and disconnect @@ -75,21 +78,7 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { and: assertTraces(1) { trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.query" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bucketCouchbase.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name()) } } @@ -107,21 +96,7 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { and: assertTraces(1) { trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.upsert" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bucketCouchbase.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.upsert", bucketCouchbase.name()) } } TEST_WRITER.clear() @@ -132,21 +107,7 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { and: assertTraces(1) { trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.get" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bucketCouchbase.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.get", bucketCouchbase.name()) } } TEST_WRITER.clear() @@ -160,55 +121,13 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { assertTraces(3) { trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.upsert" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bucketCouchbase.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.upsert", bucketCouchbase.name()) } trace(1, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.query" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bucketCouchbase.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name()) } trace(2, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.get" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bucketCouchbase.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.get", bucketCouchbase.name()) } } TEST_WRITER.clear() @@ -222,38 +141,10 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { and: assertTraces(2) { trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.remove" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bucketCouchbase.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.remove", bucketCouchbase.name()) } trace(1, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.query" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bucketCouchbase.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name()) } } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringTemplateTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringTemplateTest.groovy index 157e2876bbc..ead64dbab18 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringTemplateTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringTemplateTest.groovy @@ -1,8 +1,10 @@ package springdata import com.couchbase.client.java.Bucket -import datadog.trace.api.DDSpanTypes -import io.opentracing.tag.Tags +import com.couchbase.client.java.Cluster +import com.couchbase.client.java.CouchbaseCluster +import com.couchbase.client.java.cluster.ClusterManager +import com.couchbase.client.java.env.CouchbaseEnvironment import org.springframework.data.couchbase.core.CouchbaseTemplate import spock.lang.Shared import util.AbstractCouchbaseTest @@ -12,7 +14,26 @@ class CouchbaseSpringTemplateTest extends AbstractCouchbaseTest { @Shared List templates + @Shared + Cluster couchbaseCluster + + @Shared + Cluster memcacheCluster + + @Shared + protected CouchbaseEnvironment couchbaseEnvironment + @Shared + protected CouchbaseEnvironment memcacheEnvironment + def setupSpec() { + couchbaseEnvironment = envBuilder(bucketCouchbase).build() + memcacheEnvironment = envBuilder(bucketMemcache).build() + + couchbaseCluster = CouchbaseCluster.create(couchbaseEnvironment, Arrays.asList("127.0.0.1")) + memcacheCluster = CouchbaseCluster.create(memcacheEnvironment, Arrays.asList("127.0.0.1")) + ClusterManager couchbaseManager = couchbaseCluster.clusterManager(USERNAME, PASSWORD) + ClusterManager memcacheManager = memcacheCluster.clusterManager(USERNAME, PASSWORD) + Bucket bucketCouchbase = couchbaseCluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()) Bucket bucketMemcache = memcacheCluster.openBucket(bucketMemcache.name(), bucketMemcache.password()) @@ -20,92 +41,64 @@ class CouchbaseSpringTemplateTest extends AbstractCouchbaseTest { new CouchbaseTemplate(memcacheManager.info(), bucketMemcache)] } + def cleanupSpec() { + couchbaseCluster?.disconnect() + memcacheCluster?.disconnect() + couchbaseEnvironment.shutdown() + memcacheEnvironment.shutdown() + } - def "test write/read #name"() { + def "test write #name"() { setup: def doc = new Doc() when: template.save(doc) + def result = template.findById("1", Doc) then: - template.findById("1", Doc) != null + result != null + assertTraces(2) { + trace(0, 1) { + assertCouchbaseCall(it, 0, "Bucket.upsert", name) + } + trace(1, 1) { + assertCouchbaseCall(it, 0, "Bucket.get", name) + } + } + + where: + template << templates + name = template.couchbaseBucket.name() + } + + def "test remove #name"() { + setup: + def doc = new Doc() when: + template.save(doc) template.remove(doc) then: - template.findById("1", Doc) == null - - and: - assertTraces(4) { + assertTraces(2) { trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.upsert" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" name - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.upsert", name) } trace(1, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.get" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" name - defaultTags() - } - } - } - trace(2, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.remove" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" name - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.remove", name) } - trace(3, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.get" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" name - defaultTags() - } - } + } + + when: + TEST_WRITER.clear() + def result = template.findById("1", Doc) + + then: + result == null + assertTraces(1) { + trace(0, 1) { + assertCouchbaseCall(it, 0, "Bucket.get", name) } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy index b5e3f250874..5f82218d947 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy @@ -1,31 +1,33 @@ package util - import com.couchbase.client.core.metrics.DefaultLatencyMetricsCollectorConfig import com.couchbase.client.core.metrics.DefaultMetricsCollectorConfig -import com.couchbase.client.java.CouchbaseCluster import com.couchbase.client.java.bucket.BucketType import com.couchbase.client.java.cluster.BucketSettings -import com.couchbase.client.java.cluster.ClusterManager import com.couchbase.client.java.cluster.DefaultBucketSettings -import com.couchbase.client.java.env.CouchbaseEnvironment import com.couchbase.client.java.env.DefaultCouchbaseEnvironment import com.couchbase.mock.Bucket import com.couchbase.mock.BucketConfiguration import com.couchbase.mock.CouchbaseMock import com.couchbase.mock.http.query.QueryServer +import datadog.opentracing.DDSpan import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.agent.test.asserts.ListWriterAssert +import datadog.trace.agent.test.asserts.TraceAssert import datadog.trace.agent.test.utils.PortUtils import datadog.trace.api.Config +import datadog.trace.api.DDSpanTypes +import groovy.transform.stc.ClosureParams +import groovy.transform.stc.SimpleType +import io.opentracing.tag.Tags import spock.lang.Shared -import java.util.concurrent.RejectedExecutionException import java.util.concurrent.TimeUnit abstract class AbstractCouchbaseTest extends AgentTestRunner { - private static final USERNAME = "Administrator" - private static final PASSWORD = "password" + static final USERNAME = "Administrator" + static final PASSWORD = "password" @Shared private int port = PortUtils.randomOpenPort() @@ -53,42 +55,16 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner { @Shared CouchbaseMock mock - @Shared - protected CouchbaseCluster couchbaseCluster - @Shared - protected CouchbaseCluster memcacheCluster - @Shared - protected CouchbaseEnvironment couchbaseEnvironment - @Shared - protected CouchbaseEnvironment memcacheEnvironment - @Shared - protected ClusterManager couchbaseManager - @Shared - protected ClusterManager memcacheManager def setupSpec() { - mock = new CouchbaseMock("127.0.0.1", port, 1, 1) mock.httpServer.register("/query", new QueryServer()) mock.start() println "CouchbaseMock listening on localhost:$port" mock.createBucket(convert(bucketCouchbase)) - - couchbaseEnvironment = envBuilder(bucketCouchbase).build() - couchbaseCluster = CouchbaseCluster.create(couchbaseEnvironment, Arrays.asList("127.0.0.1")) - couchbaseManager = couchbaseCluster.clusterManager(USERNAME, PASSWORD) - mock.createBucket(convert(bucketMemcache)) - memcacheEnvironment = envBuilder(bucketMemcache).build() - memcacheCluster = CouchbaseCluster.create(memcacheEnvironment, Arrays.asList("127.0.0.1")) - memcacheManager = memcacheCluster.clusterManager(USERNAME, PASSWORD) - - // Cache buckets: - couchbaseCluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()) - memcacheCluster.openBucket(bucketMemcache.name(), bucketMemcache.password()) - // This setting should have no effect since decorator returns null for the instance. System.setProperty(Config.PREFIX + Config.DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "true") } @@ -104,23 +80,12 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner { } def cleanupSpec() { - try { - couchbaseCluster?.disconnect() - } catch (RejectedExecutionException e) { - // already closed by a test? - } - try { - memcacheCluster?.disconnect() - } catch (RejectedExecutionException e) { - // already closed by a test? - } - mock?.stop() System.clearProperty(Config.PREFIX + Config.DB_CLIENT_HOST_SPLIT_BY_INSTANCE) } - private DefaultCouchbaseEnvironment.Builder envBuilder(BucketSettings bucketSettings) { + protected DefaultCouchbaseEnvironment.Builder envBuilder(BucketSettings bucketSettings) { // Couchbase seems to be really slow to start sometimes def timeout = TimeUnit.SECONDS.toMillis(20) return DefaultCouchbaseEnvironment.builder() @@ -141,4 +106,55 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner { .analyticsTimeout(timeout) .socketConnectTimeout(timeout.intValue()) } + + void sortAndAssertTraces( + final int size, + @ClosureParams(value = SimpleType, options = "datadog.trace.agent.test.asserts.ListWriterAssert") + @DelegatesTo(value = ListWriterAssert, strategy = Closure.DELEGATE_FIRST) + final Closure spec) { + + TEST_WRITER.waitForTraces(size) + + TEST_WRITER.each { + it.sort({ + a, b -> + boolean aIsCouchbaseOperation = a.operationName == "couchbase.call" + boolean bIsCouchbaseOperation = b.operationName == "couchbase.call" + + if (aIsCouchbaseOperation && !bIsCouchbaseOperation) { + return 1 + } else if (!aIsCouchbaseOperation && bIsCouchbaseOperation) { + return -1 + } + + return a.resourceName.compareTo(b.resourceName) + }) + } + + assertTraces(size, spec) + } + + void assertCouchbaseCall(TraceAssert trace, int index, String name, String bucketName = null, Object parentSpan = null) { + trace.span(index) { + serviceName "couchbase" + resourceName name + operationName "couchbase.call" + spanType DDSpanTypes.COUCHBASE + errored false + if (parentSpan == null) { + parent() + } else { + childOf((DDSpan) parentSpan) + } + tags { + "$Tags.COMPONENT.key" "couchbase-client" + "$Tags.DB_TYPE.key" "couchbase" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + if (bucketName != null) { + "bucket" bucketName + } + defaultTags() + } + } + } } diff --git a/dd-java-agent/instrumentation/hystrix-1.4/hystrix-1.4.gradle b/dd-java-agent/instrumentation/hystrix-1.4/hystrix-1.4.gradle index 60ae7f63460..a2b6b8a474b 100644 --- a/dd-java-agent/instrumentation/hystrix-1.4/hystrix-1.4.gradle +++ b/dd-java-agent/instrumentation/hystrix-1.4/hystrix-1.4.gradle @@ -17,11 +17,14 @@ testSets { } dependencies { + compile project(':dd-java-agent:instrumentation:rxjava-1') + compileOnly group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.4.0' compileOnly group: 'io.reactivex', name: 'rxjava', version: '1.0.7' testCompile project(':dd-java-agent:instrumentation:java-concurrent') testCompile project(':dd-java-agent:instrumentation:trace-annotation') + testCompile group: 'io.reactivex', name: 'rxjava', version: '1.0.7' testCompile group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.4.0' diff --git a/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/datadog/trace/instrumentation/hystrix/HystrixInstrumentation.java b/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/datadog/trace/instrumentation/hystrix/HystrixInstrumentation.java index dada3995196..897ccce5816 100644 --- a/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/datadog/trace/instrumentation/hystrix/HystrixInstrumentation.java +++ b/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/datadog/trace/instrumentation/hystrix/HystrixInstrumentation.java @@ -8,23 +8,15 @@ import com.google.auto.service.AutoService; import com.netflix.hystrix.HystrixInvokableInfo; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.context.TraceScope; -import io.opentracing.Scope; -import io.opentracing.ScopeManager; +import datadog.trace.instrumentation.rxjava.TracedOnSubscribe; import io.opentracing.Span; -import io.opentracing.Tracer; -import io.opentracing.util.GlobalTracer; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -import rx.DDTracingUtil; import rx.Observable; -import rx.Subscriber; -import rx.Subscription; @AutoService(Instrumenter.class) public class HystrixInstrumentation extends Instrumenter.Default { @@ -47,10 +39,11 @@ public String[] helperClassNames() { return new String[] { "rx.DDTracingUtil", "datadog.trace.agent.decorator.BaseDecorator", + "datadog.trace.instrumentation.rxjava.SpanFinishingSubscription", + "datadog.trace.instrumentation.rxjava.TracedSubscriber", + "datadog.trace.instrumentation.rxjava.TracedOnSubscribe", packageName + ".HystrixDecorator", - packageName + ".HystrixInstrumentation$SpanFinishingSubscription", - packageName + ".HystrixInstrumentation$TracedSubscriber", - packageName + ".HystrixInstrumentation$TracedOnSubscribe", + packageName + ".HystrixInstrumentation$HystrixOnSubscribe", }; } @@ -73,11 +66,8 @@ public static void stopSpan( @Advice.This final HystrixInvokableInfo command, @Advice.Return(readOnly = false) Observable result, @Advice.Thrown final Throwable throwable) { - final Observable.OnSubscribe onSubscribe = DDTracingUtil.extractOnSubscribe(result); - result = - Observable.create( - new TracedOnSubscribe( - onSubscribe, command, "execute", GlobalTracer.get().scopeManager().active())); + + result = Observable.create(new HystrixOnSubscribe(result, command, "execute")); } } @@ -88,171 +78,30 @@ public static void stopSpan( @Advice.This final HystrixInvokableInfo command, @Advice.Return(readOnly = false) Observable result, @Advice.Thrown final Throwable throwable) { - final Observable.OnSubscribe onSubscribe = DDTracingUtil.extractOnSubscribe(result); - result = - Observable.create( - new TracedOnSubscribe( - onSubscribe, command, "fallback", GlobalTracer.get().scopeManager().active())); + + result = Observable.create(new HystrixOnSubscribe(result, command, "fallback")); } } - public static class TracedOnSubscribe implements Observable.OnSubscribe { - - private final Observable.OnSubscribe delegate; + public static class HystrixOnSubscribe extends TracedOnSubscribe { private final HystrixInvokableInfo command; private final String methodName; - private final TraceScope.Continuation continuation; - public TracedOnSubscribe( - final Observable.OnSubscribe delegate, + public HystrixOnSubscribe( + final Observable originalObservable, final HystrixInvokableInfo command, - final String methodName, - final Scope parentScope) { - this.delegate = delegate; + final String methodName) { + super(originalObservable, OPERATION_NAME, DECORATE); + this.command = command; this.methodName = methodName; - continuation = - parentScope instanceof TraceScope ? ((TraceScope) parentScope).capture() : null; - } - - @Override - public void call(final Subscriber subscriber) { - final Tracer tracer = GlobalTracer.get(); - final Span span; // span finished by TracedSubscriber - if (continuation != null) { - try (final TraceScope scope = continuation.activate()) { - span = tracer.buildSpan(OPERATION_NAME).start(); - } - } else { - span = tracer.buildSpan(OPERATION_NAME).start(); - } - DECORATE.afterStart(span); - DECORATE.onCommand(span, command, methodName); - - try (final Scope scope = tracer.scopeManager().activate(span, false)) { - if (!((TraceScope) scope).isAsyncPropagating()) { - ((TraceScope) scope).setAsyncPropagation(true); - } - delegate.call(new TracedSubscriber(span, subscriber)); - } - } - } - - public static class TracedSubscriber extends Subscriber { - - private final ScopeManager scopeManager = GlobalTracer.get().scopeManager(); - private final AtomicReference spanRef; - private final Subscriber delegate; - - public TracedSubscriber(final Span span, final Subscriber delegate) { - spanRef = new AtomicReference<>(span); - this.delegate = delegate; - final SpanFinishingSubscription subscription = new SpanFinishingSubscription(spanRef); - delegate.add(subscription); - } - - @Override - public void onStart() { - final Span span = spanRef.get(); - if (span != null) { - try (final Scope scope = scopeManager.activate(span, false)) { - if (scope instanceof TraceScope) { - ((TraceScope) scope).setAsyncPropagation(true); - } - delegate.onStart(); - } - } else { - delegate.onStart(); - } } @Override - public void onNext(final T value) { - final Span span = spanRef.get(); - if (span != null) { - try (final Scope scope = scopeManager.activate(span, false)) { - if (scope instanceof TraceScope) { - ((TraceScope) scope).setAsyncPropagation(true); - } - delegate.onNext(value); - } catch (final Throwable e) { - onError(e); - } - } else { - delegate.onNext(value); - } - } - - @Override - public void onCompleted() { - final Span span = spanRef.getAndSet(null); - if (span != null) { - boolean errored = false; - try (final Scope scope = scopeManager.activate(span, false)) { - if (scope instanceof TraceScope) { - ((TraceScope) scope).setAsyncPropagation(true); - } - delegate.onCompleted(); - } catch (final Throwable e) { - // Repopulate the spanRef for onError - spanRef.compareAndSet(null, span); - onError(e); - errored = true; - } finally { - // finish called by onError, so don't finish again. - if (!errored) { - DECORATE.beforeFinish(span); - span.finish(); - } - } - } else { - delegate.onCompleted(); - } - } - - @Override - public void onError(final Throwable e) { - final Span span = spanRef.getAndSet(null); - if (span != null) { - try (final Scope scope = scopeManager.activate(span, false)) { - if (scope instanceof TraceScope) { - ((TraceScope) scope).setAsyncPropagation(true); - } - DECORATE.onError(span, e); - delegate.onError(e); - } catch (final Throwable e2) { - DECORATE.onError(span, e2); - throw e2; - } finally { - DECORATE.beforeFinish(span); - span.finish(); - } - } else { - delegate.onError(e); - } - } - } - - public static class SpanFinishingSubscription implements Subscription { + protected void afterStart(final Span span) { + super.afterStart(span); - private final AtomicReference spanRef; - - public SpanFinishingSubscription(final AtomicReference spanRef) { - this.spanRef = spanRef; - } - - @Override - public void unsubscribe() { - final Span span = spanRef.getAndSet(null); - if (span != null) { - DECORATE.beforeFinish(span); - span.finish(); - } - } - - @Override - public boolean isUnsubscribed() { - return spanRef.get() == null; + DECORATE.onCommand(span, command, methodName); } } } diff --git a/dd-java-agent/instrumentation/rxjava-1/rxjava-1.gradle b/dd-java-agent/instrumentation/rxjava-1/rxjava-1.gradle new file mode 100644 index 00000000000..d9e224bf770 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava-1/rxjava-1.gradle @@ -0,0 +1,13 @@ +apply from: "${rootDir}/gradle/java.gradle" + +apply plugin: 'org.unbroken-dome.test-sets' + +testSets { + latestDepTest { + dirName = 'test' + } +} + +dependencies { + compileOnly group: 'io.reactivex', name: 'rxjava', version: '1.0.7' +} diff --git a/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/SpanFinishingSubscription.java b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/SpanFinishingSubscription.java new file mode 100644 index 00000000000..888d8ded9f2 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/SpanFinishingSubscription.java @@ -0,0 +1,31 @@ +package datadog.trace.instrumentation.rxjava; + +import datadog.trace.agent.decorator.BaseDecorator; +import io.opentracing.Span; +import java.util.concurrent.atomic.AtomicReference; +import rx.Subscription; + +public class SpanFinishingSubscription implements Subscription { + private final BaseDecorator decorator; + private final AtomicReference spanRef; + + public SpanFinishingSubscription( + final BaseDecorator decorator, final AtomicReference spanRef) { + this.decorator = decorator; + this.spanRef = spanRef; + } + + @Override + public void unsubscribe() { + final Span span = spanRef.getAndSet(null); + if (span != null) { + decorator.beforeFinish(span); + span.finish(); + } + } + + @Override + public boolean isUnsubscribed() { + return spanRef.get() == null; + } +} diff --git a/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedOnSubscribe.java b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedOnSubscribe.java new file mode 100644 index 00000000000..8428fa72b1d --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedOnSubscribe.java @@ -0,0 +1,58 @@ +package datadog.trace.instrumentation.rxjava; + +import datadog.trace.agent.decorator.BaseDecorator; +import datadog.trace.context.TraceScope; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.opentracing.util.GlobalTracer; +import rx.DDTracingUtil; +import rx.Observable; +import rx.Subscriber; + +public class TracedOnSubscribe implements Observable.OnSubscribe { + + private final Observable.OnSubscribe delegate; + private final String operationName; + private final TraceScope.Continuation continuation; + private final BaseDecorator decorator; + + public TracedOnSubscribe( + final Observable originalObservable, + final String operationName, + final BaseDecorator decorator) { + this.delegate = DDTracingUtil.extractOnSubscribe(originalObservable); + this.operationName = operationName; + this.decorator = decorator; + + final Scope parentScope = GlobalTracer.get().scopeManager().active(); + + continuation = parentScope instanceof TraceScope ? ((TraceScope) parentScope).capture() : null; + } + + @Override + public void call(final Subscriber subscriber) { + final Tracer tracer = GlobalTracer.get(); + final Span span; // span finished by TracedSubscriber + if (continuation != null) { + try (final TraceScope scope = continuation.activate()) { + span = tracer.buildSpan(operationName).start(); + } + } else { + span = tracer.buildSpan(operationName).start(); + } + + afterStart(span); + + try (final Scope scope = tracer.scopeManager().activate(span, false)) { + if (!((TraceScope) scope).isAsyncPropagating()) { + ((TraceScope) scope).setAsyncPropagation(true); + } + delegate.call(new TracedSubscriber(span, subscriber, decorator)); + } + } + + protected void afterStart(final Span span) { + decorator.afterStart(span); + } +} diff --git a/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedSubscriber.java b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedSubscriber.java new file mode 100644 index 00000000000..8cefff4381c --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedSubscriber.java @@ -0,0 +1,109 @@ +package datadog.trace.instrumentation.rxjava; + +import datadog.trace.agent.decorator.BaseDecorator; +import datadog.trace.context.TraceScope; +import io.opentracing.Scope; +import io.opentracing.ScopeManager; +import io.opentracing.Span; +import io.opentracing.util.GlobalTracer; +import java.util.concurrent.atomic.AtomicReference; +import rx.Subscriber; + +public class TracedSubscriber extends Subscriber { + + private final ScopeManager scopeManager = GlobalTracer.get().scopeManager(); + private final AtomicReference spanRef; + private final Subscriber delegate; + private final BaseDecorator decorator; + + public TracedSubscriber( + final Span span, final Subscriber delegate, final BaseDecorator decorator) { + spanRef = new AtomicReference<>(span); + this.delegate = delegate; + this.decorator = decorator; + final SpanFinishingSubscription subscription = + new SpanFinishingSubscription(decorator, spanRef); + delegate.add(subscription); + } + + @Override + public void onStart() { + final Span span = spanRef.get(); + if (span != null) { + try (final Scope scope = scopeManager.activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + delegate.onStart(); + } + } else { + delegate.onStart(); + } + } + + @Override + public void onNext(final T value) { + final Span span = spanRef.get(); + if (span != null) { + try (final Scope scope = scopeManager.activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + delegate.onNext(value); + } catch (final Throwable e) { + onError(e); + } + } else { + delegate.onNext(value); + } + } + + @Override + public void onCompleted() { + final Span span = spanRef.getAndSet(null); + if (span != null) { + boolean errored = false; + try (final Scope scope = scopeManager.activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + delegate.onCompleted(); + } catch (final Throwable e) { + // Repopulate the spanRef for onError + spanRef.compareAndSet(null, span); + onError(e); + errored = true; + } finally { + // finish called by onError, so don't finish again. + if (!errored) { + decorator.beforeFinish(span); + span.finish(); + } + } + } else { + delegate.onCompleted(); + } + } + + @Override + public void onError(final Throwable e) { + final Span span = spanRef.getAndSet(null); + if (span != null) { + try (final Scope scope = scopeManager.activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + decorator.onError(span, e); + delegate.onError(e); + } catch (final Throwable e2) { + decorator.onError(span, e2); + throw e2; + } finally { + decorator.beforeFinish(span); + span.finish(); + } + } else { + delegate.onError(e); + } + } +} diff --git a/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/rx/DDTracingUtil.java b/dd-java-agent/instrumentation/rxjava-1/src/main/java/rx/DDTracingUtil.java similarity index 100% rename from dd-java-agent/instrumentation/hystrix-1.4/src/main/java/rx/DDTracingUtil.java rename to dd-java-agent/instrumentation/rxjava-1/src/main/java/rx/DDTracingUtil.java diff --git a/settings.gradle b/settings.gradle index fd2d31072cc..8dbd7246e0d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -80,6 +80,7 @@ include ':dd-java-agent:instrumentation:play-2.4' include ':dd-java-agent:instrumentation:play-2.6' include ':dd-java-agent:instrumentation:rabbitmq-amqp-2.7' include ':dd-java-agent:instrumentation:ratpack-1.4' +include ':dd-java-agent:instrumentation:rxjava-1' include ':dd-java-agent:instrumentation:reactor-core-3.1' include ':dd-java-agent:instrumentation:servlet-2' include ':dd-java-agent:instrumentation:servlet-3'