From df0838070a77a65b78fe962f325562a901ef1e20 Mon Sep 17 00:00:00 2001 From: Laplie Anderson Date: Mon, 9 Sep 2019 10:18:58 -0400 Subject: [PATCH 1/5] Extract span assertion from couchbase tests --- .../test/groovy/CouchbaseClientTest.groovy | 96 ++++--------------- .../groovy/util/AbstractCouchbaseTest.groovy | 23 +++++ 2 files changed, 42 insertions(+), 77 deletions(-) diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy index 872a46da3d2..604f79f1e5d 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy @@ -2,37 +2,31 @@ import com.couchbase.client.java.Bucket import com.couchbase.client.java.document.JsonDocument import com.couchbase.client.java.document.json.JsonObject import com.couchbase.client.java.query.N1qlQuery -import datadog.trace.api.DDSpanTypes -import io.opentracing.tag.Tags import util.AbstractCouchbaseTest class CouchbaseClientTest extends AbstractCouchbaseTest { - def "test client #type"() { + def "test hasBucket #type"() { when: - manager.hasBucket(bucketSettings.name()) + def hasBucket = manager.hasBucket(bucketSettings.name()) then: + assert hasBucket assertTraces(1) { trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "ClusterManager.hasBucket" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - defaultTags() - } - } + assertCouchbaseCall(it, 0, "ClusterManager.hasBucket") } } - TEST_WRITER.clear() + where: + manager | cluster | bucketSettings + couchbaseManager | couchbaseCluster | bucketCouchbase + memcacheManager | memcacheCluster | bucketMemcache + + type = bucketSettings.type().name() + } + + def "test upsert and get #type"() { when: // Connect to the bucket and open it Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password()) @@ -40,58 +34,20 @@ class CouchbaseClientTest extends AbstractCouchbaseTest { // Create a JSON document and store it with the ID "helloworld" JsonObject content = JsonObject.create().put("hello", "world") def inserted = bkt.upsert(JsonDocument.create("helloworld", content)) - - then: - assertTraces(1) { - trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.upsert" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bkt.name() - defaultTags() - } - } - } - } - TEST_WRITER.clear() - - when: def found = bkt.get("helloworld") then: found == inserted found.content().getString("hello") == "world" - and: - assertTraces(1) { + assertTraces(2) { trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.get" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bkt.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.upsert", bucketSettings.name()) + } + trace(1, 1) { + assertCouchbaseCall(it, 0, "Bucket.get", bucketSettings.name()) } } - TEST_WRITER.clear() - where: manager | cluster | bucketSettings @@ -118,21 +74,7 @@ class CouchbaseClientTest extends AbstractCouchbaseTest { and: assertTraces(1) { trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.query" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bkt.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.query", bucketSettings.name()) } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy index b5e3f250874..6fb22b8d2bf 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy @@ -15,8 +15,11 @@ import com.couchbase.mock.BucketConfiguration import com.couchbase.mock.CouchbaseMock import com.couchbase.mock.http.query.QueryServer import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.agent.test.asserts.TraceAssert import datadog.trace.agent.test.utils.PortUtils import datadog.trace.api.Config +import datadog.trace.api.DDSpanTypes +import io.opentracing.tag.Tags import spock.lang.Shared import java.util.concurrent.RejectedExecutionException @@ -141,4 +144,24 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner { .analyticsTimeout(timeout) .socketConnectTimeout(timeout.intValue()) } + + void assertCouchbaseCall(TraceAssert trace, int index, String name, String bucketName = null) { + trace.span(index) { + serviceName "couchbase" + resourceName name + operationName "couchbase.call" + spanType DDSpanTypes.COUCHBASE + errored false + parent() + tags { + "$Tags.COMPONENT.key" "couchbase-client" + "$Tags.DB_TYPE.key" "couchbase" + "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT + if (bucketName) { + "bucket" bucketName + } + defaultTags() + } + } + } } From a5b5b0c307bb9803ecb1761d80c2f1867fef1df2 Mon Sep 17 00:00:00 2001 From: Laplie Anderson Date: Mon, 9 Sep 2019 12:40:43 -0400 Subject: [PATCH 2/5] Failing tests WIP --- .../groovy/CouchbaseAsyncClientTest.groovy | 145 ++++++++++++++++++ .../groovy/util/AbstractCouchbaseTest.groovy | 26 +++- 2 files changed, 167 insertions(+), 4 deletions(-) create mode 100644 dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy new file mode 100644 index 00000000000..40e6d85998f --- /dev/null +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy @@ -0,0 +1,145 @@ +import com.couchbase.client.java.document.JsonDocument +import com.couchbase.client.java.document.json.JsonObject +import com.couchbase.client.java.query.N1qlQuery +import spock.util.concurrent.BlockingVariable +import util.AbstractCouchbaseTest + +import static datadog.trace.agent.test.utils.TraceUtils.basicSpan +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace + +class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { + def "test hasBucket #type"() { + setup: + def hasBucket = new BlockingVariable() + + when: + manager.hasBucket(bucketSettings.name()).subscribe({ result -> hasBucket.set(result) }) + + then: + assert hasBucket.get() + assertTraces(1) { + trace(0, 1) { + assertCouchbaseCall(it, 0, "ClusterManager.hasBucket") + } + } + + where: + manager | cluster | bucketSettings + couchbaseAsyncManager | couchbaseAsyncCluster | bucketCouchbase + memcacheAsyncManager | memcacheAsyncCluster | bucketMemcache + + type = bucketSettings.type().name() + } + + def "test upsert #type"() { + setup: + JsonObject content = JsonObject.create().put("hello", "world") + def inserted = new BlockingVariable() + + when: + runUnderTrace("someTrace") { + // Connect to the bucket and open it + cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ bkt -> + bkt.upsert(JsonDocument.create("helloworld", content)).subscribe({ result -> inserted.set(result) }) + }) + } + + then: + inserted.get().content().getString("hello") == "world" + + assertTraces(1) { + trace(0, 3) { + basicSpan(it, 0, "someTrace") + + assertCouchbaseCall(it, 0, "Cluster.openBucket", null, span(0)) + assertCouchbaseCall(it, 0, "Bucket.upsert", bucketSettings.name(), span(0)) + } + } + + where: + manager | cluster | bucketSettings + couchbaseAsyncManager | couchbaseAsyncCluster | bucketCouchbase + memcacheAsyncManager | memcacheAsyncCluster | bucketMemcache + + type = bucketSettings.type().name() + } + + def "test upsert and get #type"() { + setup: + JsonObject content = JsonObject.create().put("hello", "world") + def inserted = new BlockingVariable() + def found = new BlockingVariable() + + when: + runUnderTrace("someTrace") { + cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ bkt -> + bkt.upsert(JsonDocument.create("helloworld", content)) + .subscribe({ result -> + inserted.set(result) + bkt.get("helloworld") + .subscribe({ searchResult -> found.set(searchResult) + }) + }) + }) + } + + // Create a JSON document and store it with the ID "helloworld" + then: + found.get() == inserted.get() + found.get().content().getString("hello") == "world" + + assertTraces(1) { + trace(0, 4) { + basicSpan(it, 0, "someTrace") + + assertCouchbaseCall(it, 0, "Cluster.openBucket", null, span(0)) + assertCouchbaseCall(it, 0, "Bucket.upsert", bucketSettings.name(), span(0)) + assertCouchbaseCall(it, 0, "Bucket.get", bucketSettings.name(), span(0)) + } + } + + where: + manager | cluster | bucketSettings + couchbaseAsyncManager | couchbaseAsyncCluster | bucketCouchbase + memcacheAsyncManager | memcacheAsyncCluster | bucketMemcache + + type = bucketSettings.type().name() + } + + def "test query"() { + setup: + def queryResult = new BlockingVariable() + + when: + // Mock expects this specific query. + // See com.couchbase.mock.http.query.QueryServer.handleString. + runUnderTrace("someTrace") { + cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ + bkt -> + bkt.query(N1qlQuery.simple("SELECT mockrow")) + .flatMap({ query -> query.rows() }) + .single() + .subscribe({ row -> queryResult.set(row.value()) }) + }) + } + + then: + queryResult.get().get("row") == "value" + + assertTraces(1) { + trace(0, 3) { + basicSpan(it, 0, "someTrace") + + assertCouchbaseCall(it, 0, "Cluster.openBucket", null, span(0)) + assertCouchbaseCall(it, 0, "Bucket.query", bucketSettings.name(), span(0)) + } + } + + where: + manager | cluster | bucketSettings + couchbaseAsyncManager | couchbaseAsyncCluster | bucketCouchbase + // Only couchbase buckets support queries. + + type = bucketSettings.type().name() + } +} diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy index 6fb22b8d2bf..040ae8097e5 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy @@ -1,10 +1,11 @@ package util - import com.couchbase.client.core.metrics.DefaultLatencyMetricsCollectorConfig import com.couchbase.client.core.metrics.DefaultMetricsCollectorConfig +import com.couchbase.client.java.CouchbaseAsyncCluster import com.couchbase.client.java.CouchbaseCluster import com.couchbase.client.java.bucket.BucketType +import com.couchbase.client.java.cluster.AsyncClusterManager import com.couchbase.client.java.cluster.BucketSettings import com.couchbase.client.java.cluster.ClusterManager import com.couchbase.client.java.cluster.DefaultBucketSettings @@ -14,6 +15,7 @@ import com.couchbase.mock.Bucket import com.couchbase.mock.BucketConfiguration import com.couchbase.mock.CouchbaseMock import com.couchbase.mock.http.query.QueryServer +import datadog.opentracing.DDSpan import datadog.trace.agent.test.AgentTestRunner import datadog.trace.agent.test.asserts.TraceAssert import datadog.trace.agent.test.utils.PortUtils @@ -59,15 +61,23 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner { @Shared protected CouchbaseCluster couchbaseCluster @Shared + protected CouchbaseAsyncCluster couchbaseAsyncCluster + @Shared protected CouchbaseCluster memcacheCluster @Shared + protected CouchbaseAsyncCluster memcacheAsyncCluster + @Shared protected CouchbaseEnvironment couchbaseEnvironment @Shared protected CouchbaseEnvironment memcacheEnvironment @Shared protected ClusterManager couchbaseManager @Shared + protected AsyncClusterManager couchbaseAsyncManager + @Shared protected ClusterManager memcacheManager + @Shared + protected AsyncClusterManager memcacheAsyncManager def setupSpec() { @@ -80,13 +90,17 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner { couchbaseEnvironment = envBuilder(bucketCouchbase).build() couchbaseCluster = CouchbaseCluster.create(couchbaseEnvironment, Arrays.asList("127.0.0.1")) + couchbaseAsyncCluster = CouchbaseAsyncCluster.create(couchbaseEnvironment, Arrays.asList("127.0.0.1")) couchbaseManager = couchbaseCluster.clusterManager(USERNAME, PASSWORD) + couchbaseAsyncManager = couchbaseAsyncCluster.clusterManager(USERNAME, PASSWORD).toBlocking().single() mock.createBucket(convert(bucketMemcache)) memcacheEnvironment = envBuilder(bucketMemcache).build() memcacheCluster = CouchbaseCluster.create(memcacheEnvironment, Arrays.asList("127.0.0.1")) + memcacheAsyncCluster = CouchbaseAsyncCluster.create(memcacheEnvironment, Arrays.asList("127.0.0.1")) memcacheManager = memcacheCluster.clusterManager(USERNAME, PASSWORD) + memcacheAsyncManager = memcacheAsyncCluster.clusterManager(USERNAME, PASSWORD).toBlocking().single() // Cache buckets: couchbaseCluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()) @@ -145,19 +159,23 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner { .socketConnectTimeout(timeout.intValue()) } - void assertCouchbaseCall(TraceAssert trace, int index, String name, String bucketName = null) { + void assertCouchbaseCall(TraceAssert trace, int index, String name, String bucketName = null, Object parentSpan = null) { trace.span(index) { serviceName "couchbase" resourceName name operationName "couchbase.call" spanType DDSpanTypes.COUCHBASE errored false - parent() + if (parentSpan == null) { + parent() + } else { + childOf((DDSpan) parentSpan) + } tags { "$Tags.COMPONENT.key" "couchbase-client" "$Tags.DB_TYPE.key" "couchbase" "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - if (bucketName) { + if (bucketName != null) { "bucket" bucketName } defaultTags() From 6c445ad03038a55ded4f1d2a7a2ce5c342d8d24b Mon Sep 17 00:00:00 2001 From: Laplie Anderson Date: Wed, 11 Sep 2019 19:23:46 -0400 Subject: [PATCH 3/5] Extract RxJava instrumentation from Hystrix and add to Couchbase --- .../couchbase-2.0/couchbase-2.0.gradle | 4 +- .../CouchbaseBucketInstrumentation.java | 100 +--------- .../CouchbaseClusterInstrumentation.java | 96 +-------- .../client/CouchbaseOnSubscribe.java | 36 ++++ .../groovy/CouchbaseAsyncClientTest.groovy | 22 ++- .../test/groovy/CouchbaseClientTest.groovy | 39 ++++ .../hystrix-1.4/hystrix-1.4.gradle | 3 + .../hystrix/HystrixInstrumentation.java | 187 ++---------------- .../instrumentation/rxjava-1/rxjava-1.gradle | 13 ++ .../rxjava/SpanFinishingSubscription.java | 31 +++ .../rxjava/TracedOnSubscribe.java | 58 ++++++ .../rxjava/TracedSubscriber.java | 109 ++++++++++ .../src/main/java/rx/DDTracingUtil.java | 0 settings.gradle | 1 + 14 files changed, 337 insertions(+), 362 deletions(-) create mode 100644 dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseOnSubscribe.java create mode 100644 dd-java-agent/instrumentation/rxjava-1/rxjava-1.gradle create mode 100644 dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/SpanFinishingSubscription.java create mode 100644 dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedOnSubscribe.java create mode 100644 dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedSubscriber.java rename dd-java-agent/instrumentation/{hystrix-1.4 => rxjava-1}/src/main/java/rx/DDTracingUtil.java (100%) diff --git a/dd-java-agent/instrumentation/couchbase-2.0/couchbase-2.0.gradle b/dd-java-agent/instrumentation/couchbase-2.0/couchbase-2.0.gradle index 95869236063..0aeb5d3960d 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/couchbase-2.0.gradle +++ b/dd-java-agent/instrumentation/couchbase-2.0/couchbase-2.0.gradle @@ -44,8 +44,10 @@ muzzle { } dependencies { + compile project(':dd-java-agent:instrumentation:rxjava-1') + compileOnly group: 'com.couchbase.client', name: 'java-client', version: '2.0.0' - + testCompile group: 'com.couchbase.mock', name: 'CouchbaseMock', version: '1.5.19' testCompile group: 'org.springframework.data', name: 'spring-data-couchbase', version: '2.0.0.RELEASE' diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseBucketInstrumentation.java b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseBucketInstrumentation.java index 50ed987a319..770bcc0df24 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseBucketInstrumentation.java +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseBucketInstrumentation.java @@ -1,6 +1,5 @@ package datadog.trace.instrumentation.couchbase.client; -import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isInterface; import static net.bytebuddy.matcher.ElementMatchers.isMethod; @@ -12,22 +11,14 @@ import com.couchbase.client.java.CouchbaseCluster; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.api.DDTags; import datadog.trace.bootstrap.CallDepthThreadLocalMap; -import io.opentracing.Scope; -import io.opentracing.Span; -import io.opentracing.noop.NoopSpan; -import io.opentracing.util.GlobalTracer; import java.lang.reflect.Method; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import rx.Observable; -import rx.functions.Action0; -import rx.functions.Action1; @AutoService(Instrumenter.class) public class CouchbaseBucketInstrumentation extends Instrumenter.Default { @@ -47,13 +38,15 @@ public ElementMatcher typeMatcher() { @Override public String[] helperClassNames() { return new String[] { + "rx.DDTracingUtil", "datadog.trace.agent.decorator.BaseDecorator", "datadog.trace.agent.decorator.ClientDecorator", "datadog.trace.agent.decorator.DatabaseClientDecorator", + "datadog.trace.instrumentation.rxjava.SpanFinishingSubscription", + "datadog.trace.instrumentation.rxjava.TracedSubscriber", + "datadog.trace.instrumentation.rxjava.TracedOnSubscribe", packageName + ".CouchbaseClientDecorator", - getClass().getName() + "$TraceSpanStart", - getClass().getName() + "$TraceSpanFinish", - getClass().getName() + "$TraceSpanError", + packageName + ".CouchbaseOnSubscribe", }; } @@ -76,94 +69,13 @@ public static void subscribeResult( @Advice.Enter final int callDepth, @Advice.Origin final Method method, @Advice.FieldValue("bucket") final String bucket, - @Advice.AllArguments final Object[] args, @Advice.Return(readOnly = false) Observable result) { if (callDepth > 0) { return; } CallDepthThreadLocalMap.reset(CouchbaseCluster.class); - final AtomicReference spanRef = new AtomicReference<>(); - result = - result - .doOnSubscribe(new TraceSpanStart(method, bucket, spanRef)) - .doOnCompleted(new TraceSpanFinish(spanRef)) - .doOnError(new TraceSpanError(spanRef)); - } - } - - public static class TraceSpanStart implements Action0 { - private final Method method; - private final String bucket; - private final AtomicReference spanRef; - - public TraceSpanStart( - final Method method, final String bucket, final AtomicReference spanRef) { - this.method = method; - this.bucket = bucket; - this.spanRef = spanRef; - } - - @Override - public void call() { - // This is called each time an observer has a new subscriber, but we should only time it once. - if (!spanRef.compareAndSet(null, NoopSpan.INSTANCE)) { - return; - } - final Class declaringClass = method.getDeclaringClass(); - final String className = - declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", ""); - final String resourceName = className + "." + method.getName(); - - final Span span = - GlobalTracer.get() - .buildSpan("couchbase.call") - .withTag(DDTags.RESOURCE_NAME, resourceName) - .withTag("bucket", bucket) - .start(); - try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { - // just replace the no-op span. - spanRef.set(DECORATE.afterStart(span)); - } - } - } - public static class TraceSpanFinish implements Action0 { - private final AtomicReference spanRef; - - public TraceSpanFinish(final AtomicReference spanRef) { - this.spanRef = spanRef; - } - - @Override - public void call() { - final Span span = spanRef.getAndSet(null); - - if (span != null) { - try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { - DECORATE.beforeFinish(span); - span.finish(); - } - } - } - } - - public static class TraceSpanError implements Action1 { - private final AtomicReference spanRef; - - public TraceSpanError(final AtomicReference spanRef) { - this.spanRef = spanRef; - } - - @Override - public void call(final Throwable throwable) { - final Span span = spanRef.getAndSet(null); - if (span != null) { - try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { - DECORATE.onError(span, throwable); - DECORATE.beforeFinish(span); - span.finish(); - } - } + result = Observable.create(new CouchbaseOnSubscribe(result, method, bucket)); } } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java index b5549d014a2..e9b555e641d 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java @@ -1,6 +1,5 @@ package datadog.trace.instrumentation.couchbase.client; -import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE; import static java.util.Collections.singletonMap; import static net.bytebuddy.matcher.ElementMatchers.isInterface; import static net.bytebuddy.matcher.ElementMatchers.isMethod; @@ -12,22 +11,14 @@ import com.couchbase.client.java.CouchbaseCluster; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.api.DDTags; import datadog.trace.bootstrap.CallDepthThreadLocalMap; -import io.opentracing.Scope; -import io.opentracing.Span; -import io.opentracing.noop.NoopSpan; -import io.opentracing.util.GlobalTracer; import java.lang.reflect.Method; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; import rx.Observable; -import rx.functions.Action0; -import rx.functions.Action1; @AutoService(Instrumenter.class) public class CouchbaseClusterInstrumentation extends Instrumenter.Default { @@ -47,13 +38,15 @@ public ElementMatcher typeMatcher() { @Override public String[] helperClassNames() { return new String[] { + "rx.DDTracingUtil", "datadog.trace.agent.decorator.BaseDecorator", "datadog.trace.agent.decorator.ClientDecorator", "datadog.trace.agent.decorator.DatabaseClientDecorator", + "datadog.trace.instrumentation.rxjava.SpanFinishingSubscription", + "datadog.trace.instrumentation.rxjava.TracedSubscriber", + "datadog.trace.instrumentation.rxjava.TracedOnSubscribe", packageName + ".CouchbaseClientDecorator", - getClass().getName() + "$TraceSpanStart", - getClass().getName() + "$TraceSpanFinish", - getClass().getName() + "$TraceSpanError", + packageName + ".CouchbaseOnSubscribe", }; } @@ -80,84 +73,7 @@ public static void subscribeResult( return; } CallDepthThreadLocalMap.reset(CouchbaseCluster.class); - final AtomicReference spanRef = new AtomicReference<>(); - result = - result - .doOnSubscribe(new TraceSpanStart(method, spanRef)) - .doOnCompleted(new TraceSpanFinish(spanRef)) - .doOnError(new TraceSpanError(spanRef)); - } - } - - public static class TraceSpanStart implements Action0 { - private final Method method; - private final AtomicReference spanRef; - - public TraceSpanStart(final Method method, final AtomicReference spanRef) { - this.method = method; - this.spanRef = spanRef; - } - - @Override - public void call() { - // This is called each time an observer has a new subscriber, but we should only time it once. - if (!spanRef.compareAndSet(null, NoopSpan.INSTANCE)) { - return; - } - final Class declaringClass = method.getDeclaringClass(); - final String className = - declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", ""); - final String resourceName = className + "." + method.getName(); - - final Span span = - GlobalTracer.get() - .buildSpan("couchbase.call") - .withTag(DDTags.RESOURCE_NAME, resourceName) - .start(); - try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { - // just replace the no-op span. - spanRef.set(DECORATE.afterStart(scope.span())); - } - } - } - - public static class TraceSpanFinish implements Action0 { - private final AtomicReference spanRef; - - public TraceSpanFinish(final AtomicReference spanRef) { - this.spanRef = spanRef; - } - - @Override - public void call() { - final Span span = spanRef.getAndSet(null); - - if (span != null) { - try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { - DECORATE.beforeFinish(span); - span.finish(); - } - } - } - } - - public static class TraceSpanError implements Action1 { - private final AtomicReference spanRef; - - public TraceSpanError(final AtomicReference spanRef) { - this.spanRef = spanRef; - } - - @Override - public void call(final Throwable throwable) { - final Span span = spanRef.getAndSet(null); - if (span != null) { - try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) { - DECORATE.onError(span, throwable); - DECORATE.beforeFinish(span); - span.finish(); - } - } + result = Observable.create(new CouchbaseOnSubscribe(result, method, null)); } } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseOnSubscribe.java b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseOnSubscribe.java new file mode 100644 index 00000000000..b20fcee0ab4 --- /dev/null +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseOnSubscribe.java @@ -0,0 +1,36 @@ +package datadog.trace.instrumentation.couchbase.client; + +import static datadog.trace.instrumentation.couchbase.client.CouchbaseClientDecorator.DECORATE; + +import datadog.trace.api.DDTags; +import datadog.trace.instrumentation.rxjava.TracedOnSubscribe; +import io.opentracing.Span; +import java.lang.reflect.Method; +import rx.Observable; + +public class CouchbaseOnSubscribe extends TracedOnSubscribe { + private final String resourceName; + private final String bucket; + + public CouchbaseOnSubscribe( + final Observable originalObservable, final Method method, final String bucket) { + super(originalObservable, "couchbase.call", DECORATE); + + final Class declaringClass = method.getDeclaringClass(); + final String className = + declaringClass.getSimpleName().replace("CouchbaseAsync", "").replace("DefaultAsync", ""); + resourceName = className + "." + method.getName(); + this.bucket = bucket; + } + + @Override + protected void afterStart(final Span span) { + super.afterStart(span); + + span.setTag(DDTags.RESOURCE_NAME, resourceName); + + if (bucket != null) { + span.setTag("bucket", bucket); + } + } +} diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy index 40e6d85998f..c8b8e3ca4c8 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy @@ -42,6 +42,8 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ bkt -> bkt.upsert(JsonDocument.create("helloworld", content)).subscribe({ result -> inserted.set(result) }) }) + + blockUntilChildSpansFinished(2) // Improve span ordering consistency } then: @@ -51,8 +53,8 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { trace(0, 3) { basicSpan(it, 0, "someTrace") - assertCouchbaseCall(it, 0, "Cluster.openBucket", null, span(0)) - assertCouchbaseCall(it, 0, "Bucket.upsert", bucketSettings.name(), span(0)) + assertCouchbaseCall(it, 2, "Cluster.openBucket", null, span(0)) + assertCouchbaseCall(it, 1, "Bucket.upsert", bucketSettings.name(), span(2)) } } @@ -81,6 +83,8 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { }) }) }) + + blockUntilChildSpansFinished(3) // Improve span ordering consistency } // Create a JSON document and store it with the ID "helloworld" @@ -91,10 +95,10 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { assertTraces(1) { trace(0, 4) { basicSpan(it, 0, "someTrace") - - assertCouchbaseCall(it, 0, "Cluster.openBucket", null, span(0)) - assertCouchbaseCall(it, 0, "Bucket.upsert", bucketSettings.name(), span(0)) - assertCouchbaseCall(it, 0, "Bucket.get", bucketSettings.name(), span(0)) + + assertCouchbaseCall(it, 3, "Cluster.openBucket", null, span(0)) + assertCouchbaseCall(it, 2, "Bucket.upsert", bucketSettings.name(), span(3)) + assertCouchbaseCall(it, 1, "Bucket.get", bucketSettings.name(), span(2)) } } @@ -121,6 +125,8 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { .single() .subscribe({ row -> queryResult.set(row.value()) }) }) + + blockUntilChildSpansFinished(2) // Improve span ordering consistency } then: @@ -130,8 +136,8 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { trace(0, 3) { basicSpan(it, 0, "someTrace") - assertCouchbaseCall(it, 0, "Cluster.openBucket", null, span(0)) - assertCouchbaseCall(it, 0, "Bucket.query", bucketSettings.name(), span(0)) + assertCouchbaseCall(it, 2, "Cluster.openBucket", null, span(0)) + assertCouchbaseCall(it, 1, "Bucket.query", bucketSettings.name(), span(2)) } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy index 604f79f1e5d..abff11344b1 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy @@ -4,6 +4,9 @@ import com.couchbase.client.java.document.json.JsonObject import com.couchbase.client.java.query.N1qlQuery import util.AbstractCouchbaseTest +import static datadog.trace.agent.test.utils.TraceUtils.basicSpan +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace + class CouchbaseClientTest extends AbstractCouchbaseTest { def "test hasBucket #type"() { @@ -57,6 +60,42 @@ class CouchbaseClientTest extends AbstractCouchbaseTest { type = bucketSettings.type().name() } + def "test upsert and get #type under trace"() { + when: + // Connect to the bucket and open it + Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password()) + + // Create a JSON document and store it with the ID "helloworld" + JsonObject content = JsonObject.create().put("hello", "world") + + def inserted + def found + + runUnderTrace("someTrace") { + inserted = bkt.upsert(JsonDocument.create("helloworld", content)) + found = bkt.get("helloworld") + } + + then: + found == inserted + found.content().getString("hello") == "world" + + assertTraces(1) { + trace(0, 3) { + basicSpan(it, 0, "someTrace") + assertCouchbaseCall(it, 2, "Bucket.upsert", bucketSettings.name(), span(0)) + assertCouchbaseCall(it, 1, "Bucket.get", bucketSettings.name(), span(0)) + } + } + + where: + manager | cluster | bucketSettings + couchbaseManager | couchbaseCluster | bucketCouchbase + memcacheManager | memcacheCluster | bucketMemcache + + type = bucketSettings.type().name() + } + def "test query"() { setup: Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password()) diff --git a/dd-java-agent/instrumentation/hystrix-1.4/hystrix-1.4.gradle b/dd-java-agent/instrumentation/hystrix-1.4/hystrix-1.4.gradle index 60ae7f63460..a2b6b8a474b 100644 --- a/dd-java-agent/instrumentation/hystrix-1.4/hystrix-1.4.gradle +++ b/dd-java-agent/instrumentation/hystrix-1.4/hystrix-1.4.gradle @@ -17,11 +17,14 @@ testSets { } dependencies { + compile project(':dd-java-agent:instrumentation:rxjava-1') + compileOnly group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.4.0' compileOnly group: 'io.reactivex', name: 'rxjava', version: '1.0.7' testCompile project(':dd-java-agent:instrumentation:java-concurrent') testCompile project(':dd-java-agent:instrumentation:trace-annotation') + testCompile group: 'io.reactivex', name: 'rxjava', version: '1.0.7' testCompile group: 'com.netflix.hystrix', name: 'hystrix-core', version: '1.4.0' diff --git a/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/datadog/trace/instrumentation/hystrix/HystrixInstrumentation.java b/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/datadog/trace/instrumentation/hystrix/HystrixInstrumentation.java index dada3995196..897ccce5816 100644 --- a/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/datadog/trace/instrumentation/hystrix/HystrixInstrumentation.java +++ b/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/datadog/trace/instrumentation/hystrix/HystrixInstrumentation.java @@ -8,23 +8,15 @@ import com.google.auto.service.AutoService; import com.netflix.hystrix.HystrixInvokableInfo; import datadog.trace.agent.tooling.Instrumenter; -import datadog.trace.context.TraceScope; -import io.opentracing.Scope; -import io.opentracing.ScopeManager; +import datadog.trace.instrumentation.rxjava.TracedOnSubscribe; import io.opentracing.Span; -import io.opentracing.Tracer; -import io.opentracing.util.GlobalTracer; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -import rx.DDTracingUtil; import rx.Observable; -import rx.Subscriber; -import rx.Subscription; @AutoService(Instrumenter.class) public class HystrixInstrumentation extends Instrumenter.Default { @@ -47,10 +39,11 @@ public String[] helperClassNames() { return new String[] { "rx.DDTracingUtil", "datadog.trace.agent.decorator.BaseDecorator", + "datadog.trace.instrumentation.rxjava.SpanFinishingSubscription", + "datadog.trace.instrumentation.rxjava.TracedSubscriber", + "datadog.trace.instrumentation.rxjava.TracedOnSubscribe", packageName + ".HystrixDecorator", - packageName + ".HystrixInstrumentation$SpanFinishingSubscription", - packageName + ".HystrixInstrumentation$TracedSubscriber", - packageName + ".HystrixInstrumentation$TracedOnSubscribe", + packageName + ".HystrixInstrumentation$HystrixOnSubscribe", }; } @@ -73,11 +66,8 @@ public static void stopSpan( @Advice.This final HystrixInvokableInfo command, @Advice.Return(readOnly = false) Observable result, @Advice.Thrown final Throwable throwable) { - final Observable.OnSubscribe onSubscribe = DDTracingUtil.extractOnSubscribe(result); - result = - Observable.create( - new TracedOnSubscribe( - onSubscribe, command, "execute", GlobalTracer.get().scopeManager().active())); + + result = Observable.create(new HystrixOnSubscribe(result, command, "execute")); } } @@ -88,171 +78,30 @@ public static void stopSpan( @Advice.This final HystrixInvokableInfo command, @Advice.Return(readOnly = false) Observable result, @Advice.Thrown final Throwable throwable) { - final Observable.OnSubscribe onSubscribe = DDTracingUtil.extractOnSubscribe(result); - result = - Observable.create( - new TracedOnSubscribe( - onSubscribe, command, "fallback", GlobalTracer.get().scopeManager().active())); + + result = Observable.create(new HystrixOnSubscribe(result, command, "fallback")); } } - public static class TracedOnSubscribe implements Observable.OnSubscribe { - - private final Observable.OnSubscribe delegate; + public static class HystrixOnSubscribe extends TracedOnSubscribe { private final HystrixInvokableInfo command; private final String methodName; - private final TraceScope.Continuation continuation; - public TracedOnSubscribe( - final Observable.OnSubscribe delegate, + public HystrixOnSubscribe( + final Observable originalObservable, final HystrixInvokableInfo command, - final String methodName, - final Scope parentScope) { - this.delegate = delegate; + final String methodName) { + super(originalObservable, OPERATION_NAME, DECORATE); + this.command = command; this.methodName = methodName; - continuation = - parentScope instanceof TraceScope ? ((TraceScope) parentScope).capture() : null; - } - - @Override - public void call(final Subscriber subscriber) { - final Tracer tracer = GlobalTracer.get(); - final Span span; // span finished by TracedSubscriber - if (continuation != null) { - try (final TraceScope scope = continuation.activate()) { - span = tracer.buildSpan(OPERATION_NAME).start(); - } - } else { - span = tracer.buildSpan(OPERATION_NAME).start(); - } - DECORATE.afterStart(span); - DECORATE.onCommand(span, command, methodName); - - try (final Scope scope = tracer.scopeManager().activate(span, false)) { - if (!((TraceScope) scope).isAsyncPropagating()) { - ((TraceScope) scope).setAsyncPropagation(true); - } - delegate.call(new TracedSubscriber(span, subscriber)); - } - } - } - - public static class TracedSubscriber extends Subscriber { - - private final ScopeManager scopeManager = GlobalTracer.get().scopeManager(); - private final AtomicReference spanRef; - private final Subscriber delegate; - - public TracedSubscriber(final Span span, final Subscriber delegate) { - spanRef = new AtomicReference<>(span); - this.delegate = delegate; - final SpanFinishingSubscription subscription = new SpanFinishingSubscription(spanRef); - delegate.add(subscription); - } - - @Override - public void onStart() { - final Span span = spanRef.get(); - if (span != null) { - try (final Scope scope = scopeManager.activate(span, false)) { - if (scope instanceof TraceScope) { - ((TraceScope) scope).setAsyncPropagation(true); - } - delegate.onStart(); - } - } else { - delegate.onStart(); - } } @Override - public void onNext(final T value) { - final Span span = spanRef.get(); - if (span != null) { - try (final Scope scope = scopeManager.activate(span, false)) { - if (scope instanceof TraceScope) { - ((TraceScope) scope).setAsyncPropagation(true); - } - delegate.onNext(value); - } catch (final Throwable e) { - onError(e); - } - } else { - delegate.onNext(value); - } - } - - @Override - public void onCompleted() { - final Span span = spanRef.getAndSet(null); - if (span != null) { - boolean errored = false; - try (final Scope scope = scopeManager.activate(span, false)) { - if (scope instanceof TraceScope) { - ((TraceScope) scope).setAsyncPropagation(true); - } - delegate.onCompleted(); - } catch (final Throwable e) { - // Repopulate the spanRef for onError - spanRef.compareAndSet(null, span); - onError(e); - errored = true; - } finally { - // finish called by onError, so don't finish again. - if (!errored) { - DECORATE.beforeFinish(span); - span.finish(); - } - } - } else { - delegate.onCompleted(); - } - } - - @Override - public void onError(final Throwable e) { - final Span span = spanRef.getAndSet(null); - if (span != null) { - try (final Scope scope = scopeManager.activate(span, false)) { - if (scope instanceof TraceScope) { - ((TraceScope) scope).setAsyncPropagation(true); - } - DECORATE.onError(span, e); - delegate.onError(e); - } catch (final Throwable e2) { - DECORATE.onError(span, e2); - throw e2; - } finally { - DECORATE.beforeFinish(span); - span.finish(); - } - } else { - delegate.onError(e); - } - } - } - - public static class SpanFinishingSubscription implements Subscription { + protected void afterStart(final Span span) { + super.afterStart(span); - private final AtomicReference spanRef; - - public SpanFinishingSubscription(final AtomicReference spanRef) { - this.spanRef = spanRef; - } - - @Override - public void unsubscribe() { - final Span span = spanRef.getAndSet(null); - if (span != null) { - DECORATE.beforeFinish(span); - span.finish(); - } - } - - @Override - public boolean isUnsubscribed() { - return spanRef.get() == null; + DECORATE.onCommand(span, command, methodName); } } } diff --git a/dd-java-agent/instrumentation/rxjava-1/rxjava-1.gradle b/dd-java-agent/instrumentation/rxjava-1/rxjava-1.gradle new file mode 100644 index 00000000000..d9e224bf770 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava-1/rxjava-1.gradle @@ -0,0 +1,13 @@ +apply from: "${rootDir}/gradle/java.gradle" + +apply plugin: 'org.unbroken-dome.test-sets' + +testSets { + latestDepTest { + dirName = 'test' + } +} + +dependencies { + compileOnly group: 'io.reactivex', name: 'rxjava', version: '1.0.7' +} diff --git a/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/SpanFinishingSubscription.java b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/SpanFinishingSubscription.java new file mode 100644 index 00000000000..888d8ded9f2 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/SpanFinishingSubscription.java @@ -0,0 +1,31 @@ +package datadog.trace.instrumentation.rxjava; + +import datadog.trace.agent.decorator.BaseDecorator; +import io.opentracing.Span; +import java.util.concurrent.atomic.AtomicReference; +import rx.Subscription; + +public class SpanFinishingSubscription implements Subscription { + private final BaseDecorator decorator; + private final AtomicReference spanRef; + + public SpanFinishingSubscription( + final BaseDecorator decorator, final AtomicReference spanRef) { + this.decorator = decorator; + this.spanRef = spanRef; + } + + @Override + public void unsubscribe() { + final Span span = spanRef.getAndSet(null); + if (span != null) { + decorator.beforeFinish(span); + span.finish(); + } + } + + @Override + public boolean isUnsubscribed() { + return spanRef.get() == null; + } +} diff --git a/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedOnSubscribe.java b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedOnSubscribe.java new file mode 100644 index 00000000000..8428fa72b1d --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedOnSubscribe.java @@ -0,0 +1,58 @@ +package datadog.trace.instrumentation.rxjava; + +import datadog.trace.agent.decorator.BaseDecorator; +import datadog.trace.context.TraceScope; +import io.opentracing.Scope; +import io.opentracing.Span; +import io.opentracing.Tracer; +import io.opentracing.util.GlobalTracer; +import rx.DDTracingUtil; +import rx.Observable; +import rx.Subscriber; + +public class TracedOnSubscribe implements Observable.OnSubscribe { + + private final Observable.OnSubscribe delegate; + private final String operationName; + private final TraceScope.Continuation continuation; + private final BaseDecorator decorator; + + public TracedOnSubscribe( + final Observable originalObservable, + final String operationName, + final BaseDecorator decorator) { + this.delegate = DDTracingUtil.extractOnSubscribe(originalObservable); + this.operationName = operationName; + this.decorator = decorator; + + final Scope parentScope = GlobalTracer.get().scopeManager().active(); + + continuation = parentScope instanceof TraceScope ? ((TraceScope) parentScope).capture() : null; + } + + @Override + public void call(final Subscriber subscriber) { + final Tracer tracer = GlobalTracer.get(); + final Span span; // span finished by TracedSubscriber + if (continuation != null) { + try (final TraceScope scope = continuation.activate()) { + span = tracer.buildSpan(operationName).start(); + } + } else { + span = tracer.buildSpan(operationName).start(); + } + + afterStart(span); + + try (final Scope scope = tracer.scopeManager().activate(span, false)) { + if (!((TraceScope) scope).isAsyncPropagating()) { + ((TraceScope) scope).setAsyncPropagation(true); + } + delegate.call(new TracedSubscriber(span, subscriber, decorator)); + } + } + + protected void afterStart(final Span span) { + decorator.afterStart(span); + } +} diff --git a/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedSubscriber.java b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedSubscriber.java new file mode 100644 index 00000000000..8cefff4381c --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava-1/src/main/java/datadog/trace/instrumentation/rxjava/TracedSubscriber.java @@ -0,0 +1,109 @@ +package datadog.trace.instrumentation.rxjava; + +import datadog.trace.agent.decorator.BaseDecorator; +import datadog.trace.context.TraceScope; +import io.opentracing.Scope; +import io.opentracing.ScopeManager; +import io.opentracing.Span; +import io.opentracing.util.GlobalTracer; +import java.util.concurrent.atomic.AtomicReference; +import rx.Subscriber; + +public class TracedSubscriber extends Subscriber { + + private final ScopeManager scopeManager = GlobalTracer.get().scopeManager(); + private final AtomicReference spanRef; + private final Subscriber delegate; + private final BaseDecorator decorator; + + public TracedSubscriber( + final Span span, final Subscriber delegate, final BaseDecorator decorator) { + spanRef = new AtomicReference<>(span); + this.delegate = delegate; + this.decorator = decorator; + final SpanFinishingSubscription subscription = + new SpanFinishingSubscription(decorator, spanRef); + delegate.add(subscription); + } + + @Override + public void onStart() { + final Span span = spanRef.get(); + if (span != null) { + try (final Scope scope = scopeManager.activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + delegate.onStart(); + } + } else { + delegate.onStart(); + } + } + + @Override + public void onNext(final T value) { + final Span span = spanRef.get(); + if (span != null) { + try (final Scope scope = scopeManager.activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + delegate.onNext(value); + } catch (final Throwable e) { + onError(e); + } + } else { + delegate.onNext(value); + } + } + + @Override + public void onCompleted() { + final Span span = spanRef.getAndSet(null); + if (span != null) { + boolean errored = false; + try (final Scope scope = scopeManager.activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + delegate.onCompleted(); + } catch (final Throwable e) { + // Repopulate the spanRef for onError + spanRef.compareAndSet(null, span); + onError(e); + errored = true; + } finally { + // finish called by onError, so don't finish again. + if (!errored) { + decorator.beforeFinish(span); + span.finish(); + } + } + } else { + delegate.onCompleted(); + } + } + + @Override + public void onError(final Throwable e) { + final Span span = spanRef.getAndSet(null); + if (span != null) { + try (final Scope scope = scopeManager.activate(span, false)) { + if (scope instanceof TraceScope) { + ((TraceScope) scope).setAsyncPropagation(true); + } + decorator.onError(span, e); + delegate.onError(e); + } catch (final Throwable e2) { + decorator.onError(span, e2); + throw e2; + } finally { + decorator.beforeFinish(span); + span.finish(); + } + } else { + delegate.onError(e); + } + } +} diff --git a/dd-java-agent/instrumentation/hystrix-1.4/src/main/java/rx/DDTracingUtil.java b/dd-java-agent/instrumentation/rxjava-1/src/main/java/rx/DDTracingUtil.java similarity index 100% rename from dd-java-agent/instrumentation/hystrix-1.4/src/main/java/rx/DDTracingUtil.java rename to dd-java-agent/instrumentation/rxjava-1/src/main/java/rx/DDTracingUtil.java diff --git a/settings.gradle b/settings.gradle index 5e3510a75c9..312f0c2f1aa 100644 --- a/settings.gradle +++ b/settings.gradle @@ -84,6 +84,7 @@ include ':dd-java-agent:instrumentation:play-2.4' include ':dd-java-agent:instrumentation:play-2.6' include ':dd-java-agent:instrumentation:rabbitmq-amqp-2.7' include ':dd-java-agent:instrumentation:ratpack-1.4' +include ':dd-java-agent:instrumentation:rxjava-1' include ':dd-java-agent:instrumentation:reactor-core-3.1' include ':dd-java-agent:instrumentation:servlet-2' include ':dd-java-agent:instrumentation:servlet-3' From 4e5e75ebffe7c0bf5a16bc3727973c09042e60d0 Mon Sep 17 00:00:00 2001 From: Laplie Anderson Date: Thu, 12 Sep 2019 20:51:50 -0400 Subject: [PATCH 4/5] Fix tests --- .../CouchbaseClusterInstrumentation.java | 2 +- .../groovy/CouchbaseAsyncClientTest.groovy | 66 +++++--- .../test/groovy/CouchbaseClientTest.groovy | 73 +++++---- .../CouchbaseSpringRepositoryTest.groovy | 141 ++---------------- .../CouchbaseSpringTemplateTest.groovy | 111 ++++++-------- .../groovy/util/AbstractCouchbaseTest.groovy | 84 +++++------ 6 files changed, 185 insertions(+), 292 deletions(-) diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java index e9b555e641d..090672c3598 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/main/java/datadog/trace/instrumentation/couchbase/client/CouchbaseClusterInstrumentation.java @@ -53,7 +53,7 @@ public String[] helperClassNames() { @Override public Map, String> transformers() { return singletonMap( - isMethod().and(isPublic()).and(returns(named("rx.Observable"))), + isMethod().and(isPublic()).and(returns(named("rx.Observable"))).and(not(named("core"))), CouchbaseClientAdvice.class.getName()); } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy index c8b8e3ca4c8..0c79367f0ae 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy @@ -1,9 +1,13 @@ +import com.couchbase.client.java.AsyncCluster +import com.couchbase.client.java.CouchbaseAsyncCluster import com.couchbase.client.java.document.JsonDocument import com.couchbase.client.java.document.json.JsonObject import com.couchbase.client.java.query.N1qlQuery import spock.util.concurrent.BlockingVariable import util.AbstractCouchbaseTest +import java.util.concurrent.TimeUnit + import static datadog.trace.agent.test.utils.TraceUtils.basicSpan import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace @@ -13,21 +17,29 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { def hasBucket = new BlockingVariable() when: - manager.hasBucket(bucketSettings.name()).subscribe({ result -> hasBucket.set(result) }) + cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ bkt -> + manager.hasBucket(bucketSettings.name()).subscribe({ result -> hasBucket.set(result) }) + }) then: assert hasBucket.get() - assertTraces(1) { - trace(0, 1) { - assertCouchbaseCall(it, 0, "ClusterManager.hasBucket") + sortAndAssertTraces(1) { + trace(0, 2) { + assertCouchbaseCall(it, 0, "Cluster.openBucket", null) + assertCouchbaseCall(it, 1, "ClusterManager.hasBucket", null, span(0)) } } + cleanup: + cluster?.disconnect()?.timeout(5, TimeUnit.SECONDS)?.toBlocking()?.single() + where: - manager | cluster | bucketSettings - couchbaseAsyncManager | couchbaseAsyncCluster | bucketCouchbase - memcacheAsyncManager | memcacheAsyncCluster | bucketMemcache + environment | bucketSettings + couchbaseEnvironment | bucketCouchbase + memcacheEnvironment | bucketMemcache + cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1")) + manager = cluster.clusterManager(USERNAME, PASSWORD).toBlocking().single() type = bucketSettings.type().name() } @@ -49,7 +61,7 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { then: inserted.get().content().getString("hello") == "world" - assertTraces(1) { + sortAndAssertTraces(1) { trace(0, 3) { basicSpan(it, 0, "someTrace") @@ -58,11 +70,15 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { } } + cleanup: + cluster?.disconnect()?.timeout(5, TimeUnit.SECONDS)?.toBlocking()?.single() + where: - manager | cluster | bucketSettings - couchbaseAsyncManager | couchbaseAsyncCluster | bucketCouchbase - memcacheAsyncManager | memcacheAsyncCluster | bucketMemcache + environment | bucketSettings + couchbaseEnvironment | bucketCouchbase + memcacheEnvironment | bucketMemcache + cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1")) type = bucketSettings.type().name() } @@ -92,7 +108,7 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { found.get() == inserted.get() found.get().content().getString("hello") == "world" - assertTraces(1) { + sortAndAssertTraces(1) { trace(0, 4) { basicSpan(it, 0, "someTrace") @@ -102,23 +118,29 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { } } + cleanup: + cluster?.disconnect()?.timeout(5, TimeUnit.SECONDS)?.toBlocking()?.single() + where: - manager | cluster | bucketSettings - couchbaseAsyncManager | couchbaseAsyncCluster | bucketCouchbase - memcacheAsyncManager | memcacheAsyncCluster | bucketMemcache + environment | bucketSettings + couchbaseEnvironment | bucketCouchbase + memcacheEnvironment | bucketMemcache + cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1")) type = bucketSettings.type().name() } def "test query"() { setup: + // Only couchbase buckets support queries. + AsyncCluster cluster = CouchbaseAsyncCluster.create(couchbaseEnvironment, Arrays.asList("127.0.0.1")) def queryResult = new BlockingVariable() when: // Mock expects this specific query. // See com.couchbase.mock.http.query.QueryServer.handleString. runUnderTrace("someTrace") { - cluster.openBucket(bucketSettings.name(), bucketSettings.password()).subscribe({ + cluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()).subscribe({ bkt -> bkt.query(N1qlQuery.simple("SELECT mockrow")) .flatMap({ query -> query.rows() }) @@ -132,20 +154,16 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { then: queryResult.get().get("row") == "value" - assertTraces(1) { + sortAndAssertTraces(1) { trace(0, 3) { basicSpan(it, 0, "someTrace") assertCouchbaseCall(it, 2, "Cluster.openBucket", null, span(0)) - assertCouchbaseCall(it, 1, "Bucket.query", bucketSettings.name(), span(2)) + assertCouchbaseCall(it, 1, "Bucket.query", bucketCouchbase.name(), span(2)) } } - where: - manager | cluster | bucketSettings - couchbaseAsyncManager | couchbaseAsyncCluster | bucketCouchbase - // Only couchbase buckets support queries. - - type = bucketSettings.type().name() + cleanup: + cluster?.disconnect()?.timeout(5, TimeUnit.SECONDS)?.toBlocking()?.single() } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy index abff11344b1..284f3083a06 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy @@ -1,4 +1,6 @@ import com.couchbase.client.java.Bucket +import com.couchbase.client.java.Cluster +import com.couchbase.client.java.CouchbaseCluster import com.couchbase.client.java.document.JsonDocument import com.couchbase.client.java.document.json.JsonObject import com.couchbase.client.java.query.N1qlQuery @@ -8,24 +10,28 @@ import static datadog.trace.agent.test.utils.TraceUtils.basicSpan import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace class CouchbaseClientTest extends AbstractCouchbaseTest { - def "test hasBucket #type"() { when: def hasBucket = manager.hasBucket(bucketSettings.name()) then: assert hasBucket - assertTraces(1) { + sortAndAssertTraces(1) { trace(0, 1) { assertCouchbaseCall(it, 0, "ClusterManager.hasBucket") } } + cleanup: + cluster?.disconnect() + where: - manager | cluster | bucketSettings - couchbaseManager | couchbaseCluster | bucketCouchbase - memcacheManager | memcacheCluster | bucketMemcache + environment | bucketSettings + couchbaseEnvironment | bucketCouchbase + memcacheEnvironment | bucketMemcache + cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1")) + manager = cluster.clusterManager(USERNAME, PASSWORD) type = bucketSettings.type().name() } @@ -43,20 +49,27 @@ class CouchbaseClientTest extends AbstractCouchbaseTest { found == inserted found.content().getString("hello") == "world" - assertTraces(2) { + sortAndAssertTraces(3) { trace(0, 1) { - assertCouchbaseCall(it, 0, "Bucket.upsert", bucketSettings.name()) + assertCouchbaseCall(it, 0, "Cluster.openBucket") } trace(1, 1) { + assertCouchbaseCall(it, 0, "Bucket.upsert", bucketSettings.name()) + } + trace(2, 1) { assertCouchbaseCall(it, 0, "Bucket.get", bucketSettings.name()) } } + cleanup: + cluster?.disconnect() + where: - manager | cluster | bucketSettings - couchbaseManager | couchbaseCluster | bucketCouchbase - memcacheManager | memcacheCluster | bucketMemcache + environment | bucketSettings + couchbaseEnvironment | bucketCouchbase + memcacheEnvironment | bucketMemcache + cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1")) type = bucketSettings.type().name() } @@ -74,31 +87,42 @@ class CouchbaseClientTest extends AbstractCouchbaseTest { runUnderTrace("someTrace") { inserted = bkt.upsert(JsonDocument.create("helloworld", content)) found = bkt.get("helloworld") + + blockUntilChildSpansFinished(2) } then: found == inserted found.content().getString("hello") == "world" - assertTraces(1) { - trace(0, 3) { + sortAndAssertTraces(2) { + trace(0, 1) { + assertCouchbaseCall(it, 0, "Cluster.openBucket") + } + trace(1, 3) { basicSpan(it, 0, "someTrace") assertCouchbaseCall(it, 2, "Bucket.upsert", bucketSettings.name(), span(0)) assertCouchbaseCall(it, 1, "Bucket.get", bucketSettings.name(), span(0)) } } + cleanup: + cluster?.disconnect() + where: - manager | cluster | bucketSettings - couchbaseManager | couchbaseCluster | bucketCouchbase - memcacheManager | memcacheCluster | bucketMemcache + environment | bucketSettings + couchbaseEnvironment | bucketCouchbase + memcacheEnvironment | bucketMemcache + cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1")) type = bucketSettings.type().name() } - + def "test query"() { setup: - Bucket bkt = cluster.openBucket(bucketSettings.name(), bucketSettings.password()) + // Only couchbase buckets support queries. + Cluster cluster = CouchbaseCluster.create(couchbaseEnvironment, Arrays.asList("127.0.0.1")) + Bucket bkt = cluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()) when: // Mock expects this specific query. @@ -111,17 +135,16 @@ class CouchbaseClientTest extends AbstractCouchbaseTest { result.first().value().get("row") == "value" and: - assertTraces(1) { + sortAndAssertTraces(2) { trace(0, 1) { - assertCouchbaseCall(it, 0, "Bucket.query", bucketSettings.name()) + assertCouchbaseCall(it, 0, "Cluster.openBucket") + } + trace(1, 1) { + assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name()) } } - where: - manager | cluster | bucketSettings - couchbaseManager | couchbaseCluster | bucketCouchbase - // Only couchbase buckets support queries. - - type = bucketSettings.type().name() + cleanup: + cluster?.disconnect() } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringRepositoryTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringRepositoryTest.groovy index 75c139e7d09..e3b90d35bc8 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringRepositoryTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringRepositoryTest.groovy @@ -1,9 +1,9 @@ package springdata +import com.couchbase.client.java.Cluster +import com.couchbase.client.java.CouchbaseCluster import com.couchbase.client.java.view.DefaultView import com.couchbase.client.java.view.DesignDocument -import datadog.trace.api.DDSpanTypes -import io.opentracing.tag.Tags import org.springframework.context.ConfigurableApplicationContext import org.springframework.context.annotation.AnnotationConfigApplicationContext import org.springframework.data.repository.CrudRepository @@ -31,20 +31,21 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { DocRepository repo def setupSpec() { + Cluster couchbaseCluster = CouchbaseCluster.create(memcacheEnvironment, Arrays.asList("127.0.0.1")) // Create view for SpringRepository's findAll() couchbaseCluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()).bucketManager() .insertDesignDocument( - DesignDocument.create("doc", Collections.singletonList(DefaultView.create("all", - ''' + DesignDocument.create("doc", Collections.singletonList(DefaultView.create("all", + ''' function (doc, meta) { if (doc._class == "springdata.Doc") { emit(meta.id, null); } } '''.stripIndent() - ))) - ) + ))) + ) CouchbaseConfig.setEnvironment(couchbaseEnvironment) CouchbaseConfig.setBucketSettings(bucketCouchbase) @@ -75,21 +76,7 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { and: assertTraces(1) { trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.query" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bucketCouchbase.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name()) } } @@ -107,21 +94,7 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { and: assertTraces(1) { trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.upsert" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bucketCouchbase.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.upsert", bucketCouchbase.name()) } } TEST_WRITER.clear() @@ -132,21 +105,7 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { and: assertTraces(1) { trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.get" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bucketCouchbase.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.get", bucketCouchbase.name()) } } TEST_WRITER.clear() @@ -160,55 +119,13 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { assertTraces(3) { trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.upsert" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bucketCouchbase.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.upsert", bucketCouchbase.name()) } trace(1, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.query" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bucketCouchbase.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name()) } trace(2, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.get" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bucketCouchbase.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.get", bucketCouchbase.name()) } } TEST_WRITER.clear() @@ -222,38 +139,10 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { and: assertTraces(2) { trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.remove" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bucketCouchbase.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.remove", bucketCouchbase.name()) } trace(1, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.query" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" bucketCouchbase.name() - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.query", bucketCouchbase.name()) } } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringTemplateTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringTemplateTest.groovy index 157e2876bbc..bd36700d527 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringTemplateTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringTemplateTest.groovy @@ -1,18 +1,32 @@ package springdata import com.couchbase.client.java.Bucket -import datadog.trace.api.DDSpanTypes -import io.opentracing.tag.Tags +import com.couchbase.client.java.Cluster +import com.couchbase.client.java.CouchbaseCluster +import com.couchbase.client.java.cluster.ClusterManager import org.springframework.data.couchbase.core.CouchbaseTemplate import spock.lang.Shared import util.AbstractCouchbaseTest +import java.util.concurrent.TimeUnit + class CouchbaseSpringTemplateTest extends AbstractCouchbaseTest { @Shared List templates + @Shared + Cluster couchbaseCluster + + @Shared + Cluster memcacheCluster + def setupSpec() { + couchbaseCluster = CouchbaseCluster.create(couchbaseEnvironment, Arrays.asList("127.0.0.1")) + memcacheCluster = CouchbaseCluster.create(memcacheEnvironment, Arrays.asList("127.0.0.1")) + ClusterManager couchbaseManager = couchbaseCluster.clusterManager(USERNAME, PASSWORD) + ClusterManager memcacheManager = memcacheCluster.clusterManager(USERNAME, PASSWORD) + Bucket bucketCouchbase = couchbaseCluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()) Bucket bucketMemcache = memcacheCluster.openBucket(bucketMemcache.name(), bucketMemcache.password()) @@ -20,8 +34,12 @@ class CouchbaseSpringTemplateTest extends AbstractCouchbaseTest { new CouchbaseTemplate(memcacheManager.info(), bucketMemcache)] } + def cleanupSpec() { + couchbaseCluster?.disconnect(5, TimeUnit.SECONDS) + memcacheCluster?.disconnect(5, TimeUnit.SECONDS) + } - def "test write/read #name"() { + def "test write #name"() { setup: def doc = new Doc() @@ -31,81 +49,42 @@ class CouchbaseSpringTemplateTest extends AbstractCouchbaseTest { then: template.findById("1", Doc) != null + and: + assertTraces(2) { + trace(0, 1) { + assertCouchbaseCall(it, 0, "Bucket.upsert", name) + } + trace(1, 1) { + assertCouchbaseCall(it, 0, "Bucket.get", name) + } + } + + where: + template << templates + name = template.couchbaseBucket.name() + } + + def "test remove #name"() { + setup: + def doc = new Doc() + when: + template.save(doc) template.remove(doc) then: template.findById("1", Doc) == null and: - assertTraces(4) { + assertTraces(3) { trace(0, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.upsert" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" name - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.upsert", name) } trace(1, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.get" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" name - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.remove", name) } trace(2, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.remove" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" name - defaultTags() - } - } - } - trace(3, 1) { - span(0) { - serviceName "couchbase" - resourceName "Bucket.get" - operationName "couchbase.call" - spanType DDSpanTypes.COUCHBASE - errored false - parent() - tags { - "$Tags.COMPONENT.key" "couchbase-client" - "$Tags.DB_TYPE.key" "couchbase" - "$Tags.SPAN_KIND.key" Tags.SPAN_KIND_CLIENT - "bucket" name - defaultTags() - } - } + assertCouchbaseCall(it, 0, "Bucket.get", name) } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy index 040ae8097e5..50a836e5841 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy @@ -2,12 +2,8 @@ package util import com.couchbase.client.core.metrics.DefaultLatencyMetricsCollectorConfig import com.couchbase.client.core.metrics.DefaultMetricsCollectorConfig -import com.couchbase.client.java.CouchbaseAsyncCluster -import com.couchbase.client.java.CouchbaseCluster import com.couchbase.client.java.bucket.BucketType -import com.couchbase.client.java.cluster.AsyncClusterManager import com.couchbase.client.java.cluster.BucketSettings -import com.couchbase.client.java.cluster.ClusterManager import com.couchbase.client.java.cluster.DefaultBucketSettings import com.couchbase.client.java.env.CouchbaseEnvironment import com.couchbase.client.java.env.DefaultCouchbaseEnvironment @@ -17,20 +13,22 @@ import com.couchbase.mock.CouchbaseMock import com.couchbase.mock.http.query.QueryServer import datadog.opentracing.DDSpan import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.agent.test.asserts.ListWriterAssert import datadog.trace.agent.test.asserts.TraceAssert import datadog.trace.agent.test.utils.PortUtils import datadog.trace.api.Config import datadog.trace.api.DDSpanTypes +import groovy.transform.stc.ClosureParams +import groovy.transform.stc.SimpleType import io.opentracing.tag.Tags import spock.lang.Shared -import java.util.concurrent.RejectedExecutionException import java.util.concurrent.TimeUnit abstract class AbstractCouchbaseTest extends AgentTestRunner { - private static final USERNAME = "Administrator" - private static final PASSWORD = "password" + static final USERNAME = "Administrator" + static final PASSWORD = "password" @Shared private int port = PortUtils.randomOpenPort() @@ -58,53 +56,23 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner { @Shared CouchbaseMock mock - @Shared - protected CouchbaseCluster couchbaseCluster - @Shared - protected CouchbaseAsyncCluster couchbaseAsyncCluster - @Shared - protected CouchbaseCluster memcacheCluster - @Shared - protected CouchbaseAsyncCluster memcacheAsyncCluster + @Shared protected CouchbaseEnvironment couchbaseEnvironment @Shared protected CouchbaseEnvironment memcacheEnvironment - @Shared - protected ClusterManager couchbaseManager - @Shared - protected AsyncClusterManager couchbaseAsyncManager - @Shared - protected ClusterManager memcacheManager - @Shared - protected AsyncClusterManager memcacheAsyncManager def setupSpec() { - mock = new CouchbaseMock("127.0.0.1", port, 1, 1) mock.httpServer.register("/query", new QueryServer()) mock.start() println "CouchbaseMock listening on localhost:$port" mock.createBucket(convert(bucketCouchbase)) - - couchbaseEnvironment = envBuilder(bucketCouchbase).build() - couchbaseCluster = CouchbaseCluster.create(couchbaseEnvironment, Arrays.asList("127.0.0.1")) - couchbaseAsyncCluster = CouchbaseAsyncCluster.create(couchbaseEnvironment, Arrays.asList("127.0.0.1")) - couchbaseManager = couchbaseCluster.clusterManager(USERNAME, PASSWORD) - couchbaseAsyncManager = couchbaseAsyncCluster.clusterManager(USERNAME, PASSWORD).toBlocking().single() - mock.createBucket(convert(bucketMemcache)) + couchbaseEnvironment = envBuilder(bucketCouchbase).build() memcacheEnvironment = envBuilder(bucketMemcache).build() - memcacheCluster = CouchbaseCluster.create(memcacheEnvironment, Arrays.asList("127.0.0.1")) - memcacheAsyncCluster = CouchbaseAsyncCluster.create(memcacheEnvironment, Arrays.asList("127.0.0.1")) - memcacheManager = memcacheCluster.clusterManager(USERNAME, PASSWORD) - memcacheAsyncManager = memcacheAsyncCluster.clusterManager(USERNAME, PASSWORD).toBlocking().single() - - // Cache buckets: - couchbaseCluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()) - memcacheCluster.openBucket(bucketMemcache.name(), bucketMemcache.password()) // This setting should have no effect since decorator returns null for the instance. System.setProperty(Config.PREFIX + Config.DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "true") @@ -121,17 +89,6 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner { } def cleanupSpec() { - try { - couchbaseCluster?.disconnect() - } catch (RejectedExecutionException e) { - // already closed by a test? - } - try { - memcacheCluster?.disconnect() - } catch (RejectedExecutionException e) { - // already closed by a test? - } - mock?.stop() System.clearProperty(Config.PREFIX + Config.DB_CLIENT_HOST_SPLIT_BY_INSTANCE) @@ -159,6 +116,33 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner { .socketConnectTimeout(timeout.intValue()) } + void sortAndAssertTraces( + final int size, + @ClosureParams(value = SimpleType, options = "datadog.trace.agent.test.asserts.ListWriterAssert") + @DelegatesTo(value = ListWriterAssert, strategy = Closure.DELEGATE_FIRST) + final Closure spec) { + + TEST_WRITER.waitForTraces(size) + + TEST_WRITER.each { + it.sort({ + a, b -> + boolean aIsCouchbaseOperation = a.operationName == "couchbase.call" + boolean bIsCouchbaseOperation = b.operationName == "couchbase.call" + + if (aIsCouchbaseOperation && !bIsCouchbaseOperation) { + return 1 + } else if (!aIsCouchbaseOperation && bIsCouchbaseOperation) { + return -1 + } + + return a.resourceName.compareTo(b.resourceName) + }) + } + + assertTraces(size, spec) + } + void assertCouchbaseCall(TraceAssert trace, int index, String name, String bucketName = null, Object parentSpan = null) { trace.span(index) { serviceName "couchbase" From e8d6928a8cf2baf8575b110a3d7051d96fe6f773 Mon Sep 17 00:00:00 2001 From: Laplie Anderson Date: Thu, 12 Sep 2019 21:48:36 -0400 Subject: [PATCH 5/5] Dont share environment. Slower but more deterministic --- .../groovy/CouchbaseAsyncClientTest.groovy | 31 ++++++++++--------- .../test/groovy/CouchbaseClientTest.groovy | 25 ++++++++------- .../CouchbaseSpringRepositoryTest.groovy | 6 ++-- .../CouchbaseSpringTemplateTest.groovy | 17 +++++++--- .../groovy/util/AbstractCouchbaseTest.groovy | 11 +------ 5 files changed, 49 insertions(+), 41 deletions(-) diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy index 0c79367f0ae..fe0bc662eee 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseAsyncClientTest.groovy @@ -2,6 +2,7 @@ import com.couchbase.client.java.AsyncCluster import com.couchbase.client.java.CouchbaseAsyncCluster import com.couchbase.client.java.document.JsonDocument import com.couchbase.client.java.document.json.JsonObject +import com.couchbase.client.java.env.CouchbaseEnvironment import com.couchbase.client.java.query.N1qlQuery import spock.util.concurrent.BlockingVariable import util.AbstractCouchbaseTest @@ -31,13 +32,13 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { } cleanup: - cluster?.disconnect()?.timeout(5, TimeUnit.SECONDS)?.toBlocking()?.single() + cluster?.disconnect()?.timeout(10, TimeUnit.SECONDS)?.toBlocking()?.single() + environment.shutdown() where: - environment | bucketSettings - couchbaseEnvironment | bucketCouchbase - memcacheEnvironment | bucketMemcache + bucketSettings << [bucketCouchbase, bucketMemcache] + environment = envBuilder(bucketSettings).build() cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1")) manager = cluster.clusterManager(USERNAME, PASSWORD).toBlocking().single() type = bucketSettings.type().name() @@ -71,13 +72,13 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { } cleanup: - cluster?.disconnect()?.timeout(5, TimeUnit.SECONDS)?.toBlocking()?.single() + cluster?.disconnect()?.timeout(10, TimeUnit.SECONDS)?.toBlocking()?.single() + environment.shutdown() where: - environment | bucketSettings - couchbaseEnvironment | bucketCouchbase - memcacheEnvironment | bucketMemcache + bucketSettings << [bucketCouchbase, bucketMemcache] + environment = envBuilder(bucketSettings).build() cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1")) type = bucketSettings.type().name() } @@ -119,13 +120,13 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { } cleanup: - cluster?.disconnect()?.timeout(5, TimeUnit.SECONDS)?.toBlocking()?.single() + cluster?.disconnect()?.timeout(10, TimeUnit.SECONDS)?.toBlocking()?.single() + environment.shutdown() where: - environment | bucketSettings - couchbaseEnvironment | bucketCouchbase - memcacheEnvironment | bucketMemcache + bucketSettings << [bucketCouchbase, bucketMemcache] + environment = envBuilder(bucketSettings).build() cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1")) type = bucketSettings.type().name() } @@ -133,7 +134,8 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { def "test query"() { setup: // Only couchbase buckets support queries. - AsyncCluster cluster = CouchbaseAsyncCluster.create(couchbaseEnvironment, Arrays.asList("127.0.0.1")) + CouchbaseEnvironment environment = envBuilder(bucketCouchbase).build() + AsyncCluster cluster = CouchbaseAsyncCluster.create(environment, Arrays.asList("127.0.0.1")) def queryResult = new BlockingVariable() when: @@ -164,6 +166,7 @@ class CouchbaseAsyncClientTest extends AbstractCouchbaseTest { } cleanup: - cluster?.disconnect()?.timeout(5, TimeUnit.SECONDS)?.toBlocking()?.single() + cluster?.disconnect()?.timeout(10, TimeUnit.SECONDS)?.toBlocking()?.single() + environment.shutdown() } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy index 284f3083a06..62f901b6f1a 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/CouchbaseClientTest.groovy @@ -3,6 +3,7 @@ import com.couchbase.client.java.Cluster import com.couchbase.client.java.CouchbaseCluster import com.couchbase.client.java.document.JsonDocument import com.couchbase.client.java.document.json.JsonObject +import com.couchbase.client.java.env.CouchbaseEnvironment import com.couchbase.client.java.query.N1qlQuery import util.AbstractCouchbaseTest @@ -24,12 +25,12 @@ class CouchbaseClientTest extends AbstractCouchbaseTest { cleanup: cluster?.disconnect() + environment.shutdown() where: - environment | bucketSettings - couchbaseEnvironment | bucketCouchbase - memcacheEnvironment | bucketMemcache + bucketSettings << [bucketCouchbase, bucketMemcache] + environment = envBuilder(bucketSettings).build() cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1")) manager = cluster.clusterManager(USERNAME, PASSWORD) type = bucketSettings.type().name() @@ -63,12 +64,12 @@ class CouchbaseClientTest extends AbstractCouchbaseTest { cleanup: cluster?.disconnect() + environment.shutdown() where: - environment | bucketSettings - couchbaseEnvironment | bucketCouchbase - memcacheEnvironment | bucketMemcache + bucketSettings << [bucketCouchbase, bucketMemcache] + environment = envBuilder(bucketSettings).build() cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1")) type = bucketSettings.type().name() } @@ -108,20 +109,21 @@ class CouchbaseClientTest extends AbstractCouchbaseTest { cleanup: cluster?.disconnect() + environment.shutdown() where: - environment | bucketSettings - couchbaseEnvironment | bucketCouchbase - memcacheEnvironment | bucketMemcache + bucketSettings << [bucketCouchbase, bucketMemcache] + environment = envBuilder(bucketSettings).build() cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1")) type = bucketSettings.type().name() } - + def "test query"() { setup: // Only couchbase buckets support queries. - Cluster cluster = CouchbaseCluster.create(couchbaseEnvironment, Arrays.asList("127.0.0.1")) + CouchbaseEnvironment environment = envBuilder(bucketCouchbase).build() + Cluster cluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1")) Bucket bkt = cluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()) when: @@ -146,5 +148,6 @@ class CouchbaseClientTest extends AbstractCouchbaseTest { cleanup: cluster?.disconnect() + environment.shutdown() } } diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringRepositoryTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringRepositoryTest.groovy index e3b90d35bc8..19c210f3686 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringRepositoryTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringRepositoryTest.groovy @@ -2,6 +2,7 @@ package springdata import com.couchbase.client.java.Cluster import com.couchbase.client.java.CouchbaseCluster +import com.couchbase.client.java.env.CouchbaseEnvironment import com.couchbase.client.java.view.DefaultView import com.couchbase.client.java.view.DesignDocument import org.springframework.context.ConfigurableApplicationContext @@ -31,7 +32,8 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { DocRepository repo def setupSpec() { - Cluster couchbaseCluster = CouchbaseCluster.create(memcacheEnvironment, Arrays.asList("127.0.0.1")) + CouchbaseEnvironment environment = envBuilder(bucketCouchbase).build() + Cluster couchbaseCluster = CouchbaseCluster.create(environment, Arrays.asList("127.0.0.1")) // Create view for SpringRepository's findAll() couchbaseCluster.openBucket(bucketCouchbase.name(), bucketCouchbase.password()).bucketManager() @@ -46,7 +48,7 @@ class CouchbaseSpringRepositoryTest extends AbstractCouchbaseTest { '''.stripIndent() ))) ) - CouchbaseConfig.setEnvironment(couchbaseEnvironment) + CouchbaseConfig.setEnvironment(environment) CouchbaseConfig.setBucketSettings(bucketCouchbase) // Close all buckets and disconnect diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringTemplateTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringTemplateTest.groovy index bd36700d527..5ec6e6072c3 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringTemplateTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/springdata/CouchbaseSpringTemplateTest.groovy @@ -4,12 +4,11 @@ import com.couchbase.client.java.Bucket import com.couchbase.client.java.Cluster import com.couchbase.client.java.CouchbaseCluster import com.couchbase.client.java.cluster.ClusterManager +import com.couchbase.client.java.env.CouchbaseEnvironment import org.springframework.data.couchbase.core.CouchbaseTemplate import spock.lang.Shared import util.AbstractCouchbaseTest -import java.util.concurrent.TimeUnit - class CouchbaseSpringTemplateTest extends AbstractCouchbaseTest { @Shared @@ -20,8 +19,16 @@ class CouchbaseSpringTemplateTest extends AbstractCouchbaseTest { @Shared Cluster memcacheCluster + + @Shared + protected CouchbaseEnvironment couchbaseEnvironment + @Shared + protected CouchbaseEnvironment memcacheEnvironment def setupSpec() { + couchbaseEnvironment = envBuilder(bucketCouchbase).build() + memcacheEnvironment = envBuilder(bucketMemcache).build() + couchbaseCluster = CouchbaseCluster.create(couchbaseEnvironment, Arrays.asList("127.0.0.1")) memcacheCluster = CouchbaseCluster.create(memcacheEnvironment, Arrays.asList("127.0.0.1")) ClusterManager couchbaseManager = couchbaseCluster.clusterManager(USERNAME, PASSWORD) @@ -35,8 +42,10 @@ class CouchbaseSpringTemplateTest extends AbstractCouchbaseTest { } def cleanupSpec() { - couchbaseCluster?.disconnect(5, TimeUnit.SECONDS) - memcacheCluster?.disconnect(5, TimeUnit.SECONDS) + couchbaseCluster?.disconnect() + memcacheCluster?.disconnect() + couchbaseEnvironment.shutdown() + memcacheEnvironment.shutdown() } def "test write #name"() { diff --git a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy index 50a836e5841..5f82218d947 100644 --- a/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy +++ b/dd-java-agent/instrumentation/couchbase-2.0/src/test/groovy/util/AbstractCouchbaseTest.groovy @@ -5,7 +5,6 @@ import com.couchbase.client.core.metrics.DefaultMetricsCollectorConfig import com.couchbase.client.java.bucket.BucketType import com.couchbase.client.java.cluster.BucketSettings import com.couchbase.client.java.cluster.DefaultBucketSettings -import com.couchbase.client.java.env.CouchbaseEnvironment import com.couchbase.client.java.env.DefaultCouchbaseEnvironment import com.couchbase.mock.Bucket import com.couchbase.mock.BucketConfiguration @@ -57,11 +56,6 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner { @Shared CouchbaseMock mock - @Shared - protected CouchbaseEnvironment couchbaseEnvironment - @Shared - protected CouchbaseEnvironment memcacheEnvironment - def setupSpec() { mock = new CouchbaseMock("127.0.0.1", port, 1, 1) mock.httpServer.register("/query", new QueryServer()) @@ -71,9 +65,6 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner { mock.createBucket(convert(bucketCouchbase)) mock.createBucket(convert(bucketMemcache)) - couchbaseEnvironment = envBuilder(bucketCouchbase).build() - memcacheEnvironment = envBuilder(bucketMemcache).build() - // This setting should have no effect since decorator returns null for the instance. System.setProperty(Config.PREFIX + Config.DB_CLIENT_HOST_SPLIT_BY_INSTANCE, "true") } @@ -94,7 +85,7 @@ abstract class AbstractCouchbaseTest extends AgentTestRunner { System.clearProperty(Config.PREFIX + Config.DB_CLIENT_HOST_SPLIT_BY_INSTANCE) } - private DefaultCouchbaseEnvironment.Builder envBuilder(BucketSettings bucketSettings) { + protected DefaultCouchbaseEnvironment.Builder envBuilder(BucketSettings bucketSettings) { // Couchbase seems to be really slow to start sometimes def timeout = TimeUnit.SECONDS.toMillis(20) return DefaultCouchbaseEnvironment.builder()