diff --git a/.circleci/config.continue.yml.j2 b/.circleci/config.continue.yml.j2 index b0d6de96190..0702fce232d 100644 --- a/.circleci/config.continue.yml.j2 +++ b/.circleci/config.continue.yml.j2 @@ -36,7 +36,7 @@ instrumentation_modules: &instrumentation_modules "dd-java-agent/instrumentation debugger_modules: &debugger_modules "dd-java-agent/agent-debugger|dd-java-agent/agent-bootstrap|dd-java-agent/agent-builder|internal-api|communication|dd-trace-core" profiling_modules: &profiling_modules "dd-java-agent/agent-profiling" -default_system_tests_commit: &default_system_tests_commit 69a5e874384dd256e2e3f42fc1c95807a67efbe6 +default_system_tests_commit: &default_system_tests_commit 1ef00a34ad1f83ae999887e510ef1ea1c27b151b parameters: nightly: diff --git a/dd-java-agent/appsec/build.gradle b/dd-java-agent/appsec/build.gradle index 38f999b62aa..6d9856e93d6 100644 --- a/dd-java-agent/appsec/build.gradle +++ b/dd-java-agent/appsec/build.gradle @@ -82,6 +82,7 @@ ext { 'com.datadog.appsec.config.AppSecFeatures.ApiSecurity', 'com.datadog.appsec.config.AppSecFeatures.AutoUserInstrum', 'com.datadog.appsec.event.ReplaceableEventProducerService', + 'com.datadog.appsec.api.security.ApiSecuritySampler.NoOp', ] excludedClassesBranchCoverage = [ 'com.datadog.appsec.gateway.GatewayBridge', diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/AppSecSystem.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/AppSecSystem.java index 9d86b681cc1..a8d18c2f515 100644 --- a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/AppSecSystem.java +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/AppSecSystem.java @@ -1,6 +1,8 @@ package com.datadog.appsec; -import com.datadog.appsec.api.security.ApiSecurityRequestSampler; +import com.datadog.appsec.api.security.ApiSecuritySampler; +import com.datadog.appsec.api.security.ApiSecuritySamplerImpl; +import com.datadog.appsec.api.security.AppSecSpanPostProcessor; import com.datadog.appsec.blocking.BlockingServiceImpl; import com.datadog.appsec.config.AppSecConfigService; import com.datadog.appsec.config.AppSecConfigServiceImpl; @@ -21,6 +23,7 @@ import datadog.trace.api.telemetry.ProductChange; import datadog.trace.api.telemetry.ProductChangeCollector; import datadog.trace.bootstrap.ActiveSubsystems; +import datadog.trace.bootstrap.instrumentation.api.SpanPostProcessor; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -66,7 +69,17 @@ private static void doStart(SubscriptionService gw, SharedCommunicationObjects s EventDispatcher eventDispatcher = new EventDispatcher(); REPLACEABLE_EVENT_PRODUCER.replaceEventProducerService(eventDispatcher); - ApiSecurityRequestSampler requestSampler = new ApiSecurityRequestSampler(config); + ApiSecuritySampler requestSampler; + if (Config.get().isApiSecurityEnabled()) { + requestSampler = new ApiSecuritySamplerImpl(); + // When DD_API_SECURITY_ENABLED=true, ths post-processor is set even when AppSec is inactive. + // This should be low overhead since the post-processor exits early if there's no AppSec + // context. + SpanPostProcessor.Holder.INSTANCE = + new AppSecSpanPostProcessor(requestSampler, REPLACEABLE_EVENT_PRODUCER); + } else { + requestSampler = new ApiSecuritySampler.NoOp(); + } ConfigurationPoller configurationPoller = sco.configurationPoller(config); // may throw and abort startup diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/api/security/ApiSecurityRequestSampler.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/api/security/ApiSecurityRequestSampler.java deleted file mode 100644 index 38b5c631a4e..00000000000 --- a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/api/security/ApiSecurityRequestSampler.java +++ /dev/null @@ -1,56 +0,0 @@ -package com.datadog.appsec.api.security; - -import datadog.trace.api.Config; -import java.util.concurrent.atomic.AtomicLong; - -public class ApiSecurityRequestSampler { - - private volatile int sampling; - private final AtomicLong cumulativeCounter = new AtomicLong(); - - public ApiSecurityRequestSampler(final Config config) { - sampling = computeSamplingParameter(config.getApiSecurityRequestSampleRate()); - } - - /** - * Sets the new sampling parameter - * - * @return {@code true} if the value changed - */ - public boolean setSampling(final float newSamplingFloat) { - int newSampling = computeSamplingParameter(newSamplingFloat); - if (newSampling != sampling) { - sampling = newSampling; - cumulativeCounter.set(0); // Reset current sampling counter - return true; - } - return false; - } - - public int getSampling() { - return sampling; - } - - public boolean sampleRequest() { - long prevValue = cumulativeCounter.getAndAdd(sampling); - long newValue = prevValue + sampling; - if (newValue / 100 == prevValue / 100 + 1) { - // Sample request - return true; - } - // Skipped by sampling - return false; - } - - static int computeSamplingParameter(final float pct) { - if (pct >= 1) { - return 100; - } - if (pct < 0) { - // Api security can only be disabled by setting the sampling to zero, so we set it to 100%. - // TODO: We probably want a warning here. - return 100; - } - return (int) (pct * 100); - } -} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/api/security/ApiSecuritySampler.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/api/security/ApiSecuritySampler.java new file mode 100644 index 00000000000..4412a5d6303 --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/api/security/ApiSecuritySampler.java @@ -0,0 +1,35 @@ +package com.datadog.appsec.api.security; + +import com.datadog.appsec.gateway.AppSecRequestContext; +import javax.annotation.Nonnull; + +public interface ApiSecuritySampler { + /** + * Prepare a request context for later sampling decision. This method should be called at request + * end, and is thread-safe. If a request can potentially be sampled, this method will return true. + * If this method returns true, the caller MUST call {@link #releaseOne()} once the context is not + * needed anymore. + */ + boolean preSampleRequest(final @Nonnull AppSecRequestContext ctx); + + /** Get the final sampling decision. This method is NOT required to be thread-safe. */ + boolean sampleRequest(AppSecRequestContext ctx); + + /** Release one permit for the sampler. This must be called after processing a span. */ + void releaseOne(); + + final class NoOp implements ApiSecuritySampler { + @Override + public boolean preSampleRequest(@Nonnull AppSecRequestContext ctx) { + return false; + } + + @Override + public boolean sampleRequest(AppSecRequestContext ctx) { + return false; + } + + @Override + public void releaseOne() {} + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/api/security/ApiSecuritySamplerImpl.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/api/security/ApiSecuritySamplerImpl.java new file mode 100644 index 00000000000..c51bd46ef44 --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/api/security/ApiSecuritySamplerImpl.java @@ -0,0 +1,168 @@ +package com.datadog.appsec.api.security; + +import com.datadog.appsec.gateway.AppSecRequestContext; +import datadog.trace.api.Config; +import datadog.trace.api.time.SystemTimeSource; +import datadog.trace.api.time.TimeSource; +import java.util.Deque; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Semaphore; +import javax.annotation.Nonnull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ApiSecuritySamplerImpl implements ApiSecuritySampler { + + private static final Logger log = LoggerFactory.getLogger(ApiSecuritySamplerImpl.class); + + /** + * A maximum number of request contexts we'll keep open past the end of request at any given time. + * This will avoid excessive memory usage in case of a high number of concurrent requests, and + * should also prevent memory leaks. + */ + private static final int MAX_POST_PROCESSING_TASKS = 4; + /** Maximum number of entries in the access map. */ + private static final int MAX_SIZE = 4096; + /** Mapping from endpoint hash to last access timestamp in millis. */ + private final ConcurrentHashMap accessMap; + /** Deque of endpoint hashes ordered by access time. Oldest is always first. */ + private final Deque accessDeque; + + private final long expirationTimeInMs; + private final int capacity; + private final TimeSource timeSource; + private final Semaphore counter = new Semaphore(MAX_POST_PROCESSING_TASKS); + + public ApiSecuritySamplerImpl() { + this( + MAX_SIZE, + (long) (Config.get().getApiSecuritySampleDelay() * 1_000), + SystemTimeSource.INSTANCE); + } + + public ApiSecuritySamplerImpl( + int capacity, long expirationTimeInMs, @Nonnull TimeSource timeSource) { + this.capacity = capacity; + this.expirationTimeInMs = expirationTimeInMs; + this.accessMap = new ConcurrentHashMap<>(); + this.accessDeque = new ConcurrentLinkedDeque<>(); + this.timeSource = timeSource; + } + + @Override + public boolean preSampleRequest(final @Nonnull AppSecRequestContext ctx) { + final String route = ctx.getRoute(); + if (route == null) { + return false; + } + final String method = ctx.getMethod(); + if (method == null) { + return false; + } + final int statusCode = ctx.getResponseStatus(); + if (statusCode <= 0) { + return false; + } + long hash = computeApiHash(route, method, statusCode); + ctx.setApiSecurityEndpointHash(hash); + if (!isApiAccessExpired(hash)) { + return false; + } + if (counter.tryAcquire()) { + log.debug("API security sampling is required for this request (presampled)"); + ctx.setKeepOpenForApiSecurityPostProcessing(true); + return true; + } + return false; + } + + /** Get the final sampling decision. This method is NOT thread-safe. */ + @Override + public boolean sampleRequest(AppSecRequestContext ctx) { + if (ctx == null) { + return false; + } + final Long hash = ctx.getApiSecurityEndpointHash(); + if (hash == null) { + // This should never happen, it should have been short-circuited before. + return false; + } + return updateApiAccessIfExpired(hash); + } + + @Override + public void releaseOne() { + counter.release(); + } + + private boolean updateApiAccessIfExpired(final long hash) { + final long currentTime = timeSource.getCurrentTimeMillis(); + + Long lastAccess = accessMap.get(hash); + if (lastAccess != null && currentTime - lastAccess < expirationTimeInMs) { + return false; + } + + if (accessMap.put(hash, currentTime) == null) { + accessDeque.addLast(hash); + // If we added a new entry, we perform purging. + cleanupExpiredEntries(currentTime); + } else { + // This is now the most recently accessed entry. + accessDeque.remove(hash); + accessDeque.addLast(hash); + } + + return true; + } + + private boolean isApiAccessExpired(final long hash) { + final long currentTime = timeSource.getCurrentTimeMillis(); + final Long lastAccess = accessMap.get(hash); + return lastAccess == null || currentTime - lastAccess >= expirationTimeInMs; + } + + private void cleanupExpiredEntries(final long currentTime) { + // Purge all expired entries. + while (!accessDeque.isEmpty()) { + final Long oldestHash = accessDeque.peekFirst(); + if (oldestHash == null) { + // Should never happen + continue; + } + + final Long lastAccessTime = accessMap.get(oldestHash); + if (lastAccessTime == null) { + // Should never happen + continue; + } + + if (currentTime - lastAccessTime < expirationTimeInMs) { + // The oldest hash is up-to-date, so stop here. + break; + } + + accessDeque.pollFirst(); + accessMap.remove(oldestHash); + } + + // If we went over capacity, remove the oldest entries until we are within the limit. + // This should never be more than 1. + final int toRemove = accessMap.size() - this.capacity; + for (int i = 0; i < toRemove; i++) { + Long oldestHash = accessDeque.pollFirst(); + if (oldestHash != null) { + accessMap.remove(oldestHash); + } + } + } + + private long computeApiHash(final String route, final String method, final int statusCode) { + long result = 17; + result = 31 * result + route.hashCode(); + result = 31 * result + method.hashCode(); + result = 31 * result + statusCode; + return result; + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/api/security/AppSecSpanPostProcessor.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/api/security/AppSecSpanPostProcessor.java new file mode 100644 index 00000000000..cb87b572137 --- /dev/null +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/api/security/AppSecSpanPostProcessor.java @@ -0,0 +1,92 @@ +package com.datadog.appsec.api.security; + +import com.datadog.appsec.event.EventProducerService; +import com.datadog.appsec.event.ExpiredSubscriberInfoException; +import com.datadog.appsec.event.data.DataBundle; +import com.datadog.appsec.event.data.KnownAddresses; +import com.datadog.appsec.event.data.SingletonDataBundle; +import com.datadog.appsec.gateway.AppSecRequestContext; +import com.datadog.appsec.gateway.GatewayContext; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; +import datadog.trace.api.internal.TraceSegment; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.SpanPostProcessor; +import java.util.Collections; +import java.util.function.BooleanSupplier; +import javax.annotation.Nonnull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AppSecSpanPostProcessor implements SpanPostProcessor { + + private static final Logger log = LoggerFactory.getLogger(AppSecSpanPostProcessor.class); + private final ApiSecuritySampler sampler; + private final EventProducerService producerService; + + public AppSecSpanPostProcessor(ApiSecuritySampler sampler, EventProducerService producerService) { + this.sampler = sampler; + this.producerService = producerService; + } + + @Override + public void process(@Nonnull AgentSpan span, @Nonnull BooleanSupplier timeoutCheck) { + final RequestContext ctx_ = span.getRequestContext(); + if (ctx_ == null) { + return; + } + final AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); + if (ctx == null) { + return; + } + + if (!ctx.isKeepOpenForApiSecurityPostProcessing()) { + return; + } + + try { + if (timeoutCheck.getAsBoolean()) { + log.debug("Timeout detected, skipping API security post-processing"); + return; + } + if (!sampler.sampleRequest(ctx)) { + log.debug("Request not sampled, skipping API security post-processing"); + return; + } + log.debug("Request sampled, processing API security post-processing"); + extractSchemas(ctx, ctx_.getTraceSegment()); + } finally { + ctx.setKeepOpenForApiSecurityPostProcessing(false); + try { + // XXX: Close the additive first. This is not strictly needed, but it'll prevent getting it + // detected as a + // missed request-ended event. + ctx.closeAdditive(); + ctx.close(); + } catch (Exception e) { + log.debug("Error closing AppSecRequestContext", e); + } + sampler.releaseOne(); + } + } + + private void extractSchemas(final AppSecRequestContext ctx, final TraceSegment traceSegment) { + final EventProducerService.DataSubscriberInfo sub = + producerService.getDataSubscribers(KnownAddresses.WAF_CONTEXT_PROCESSOR); + if (sub == null || sub.isEmpty()) { + log.debug("No subscribers for schema extraction"); + return; + } + + final DataBundle bundle = + new SingletonDataBundle<>( + KnownAddresses.WAF_CONTEXT_PROCESSOR, Collections.singletonMap("extract-schema", true)); + try { + GatewayContext gwCtx = new GatewayContext(false); + producerService.publishDataEvent(sub, ctx, bundle, gwCtx); + ctx.commitDerivatives(traceSegment); + } catch (ExpiredSubscriberInfoException e) { + log.debug("Subscriber info expired", e); + } + } +} diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/config/AppSecConfigServiceImpl.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/config/AppSecConfigServiceImpl.java index 069733a03fd..0ebf33bb137 100644 --- a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/config/AppSecConfigServiceImpl.java +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/config/AppSecConfigServiceImpl.java @@ -2,7 +2,6 @@ import static com.datadog.appsec.util.StandardizedLogging.RulesInvalidReason.INVALID_JSON_FILE; import static datadog.remoteconfig.Capabilities.CAPABILITY_ASM_ACTIVATION; -import static datadog.remoteconfig.Capabilities.CAPABILITY_ASM_API_SECURITY_SAMPLE_RATE; import static datadog.remoteconfig.Capabilities.CAPABILITY_ASM_AUTO_USER_INSTRUM_MODE; import static datadog.remoteconfig.Capabilities.CAPABILITY_ASM_CUSTOM_BLOCKING_RESPONSE; import static datadog.remoteconfig.Capabilities.CAPABILITY_ASM_CUSTOM_RULES; @@ -200,7 +199,6 @@ private void subscribeAsmFeatures() { log.debug("Will not subscribe report CAPABILITY_ASM_ACTIVATION (AppSec explicitly enabled)"); } this.configurationPoller.addCapabilities(CAPABILITY_ASM_AUTO_USER_INSTRUM_MODE); - this.configurationPoller.addCapabilities(CAPABILITY_ASM_API_SECURITY_SAMPLE_RATE); } private void distributeSubConfigurations( @@ -362,7 +360,6 @@ public void close() { | CAPABILITY_ASM_CUSTOM_RULES | CAPABILITY_ASM_CUSTOM_BLOCKING_RESPONSE | CAPABILITY_ASM_TRUSTED_IPS - | CAPABILITY_ASM_API_SECURITY_SAMPLE_RATE | CAPABILITY_ASM_RASP_SQLI | CAPABILITY_ASM_RASP_SSRF | CAPABILITY_ASM_RASP_LFI diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/AppSecRequestContext.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/AppSecRequestContext.java index 34536773602..ad43e6bfec3 100644 --- a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/AppSecRequestContext.java +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/AppSecRequestContext.java @@ -10,8 +10,6 @@ import datadog.trace.api.Config; import datadog.trace.api.http.StoredBodySupplier; import datadog.trace.api.internal.TraceSegment; -import datadog.trace.bootstrap.instrumentation.api.AgentSpan; -import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.util.stacktrace.StackTraceEvent; import io.sqreen.powerwaf.Additive; import io.sqreen.powerwaf.PowerwafContext; @@ -92,6 +90,7 @@ public class AppSecRequestContext implements DataBundle, Closeable { private String scheme; private String method; private String savedRawURI; + private String route; private final Map> requestHeaders = new LinkedHashMap<>(); private final Map> responseHeaders = new LinkedHashMap<>(); private volatile Map> collectedCookies; @@ -143,6 +142,9 @@ public class AppSecRequestContext implements DataBundle, Closeable { // Used to detect missing request-end event at close. private volatile boolean requestEndCalled; + private volatile boolean keepOpenForApiSecurityPostProcessing; + private volatile Long apiSecurityEndpointHash; + private static final AtomicIntegerFieldUpdater WAF_TIMEOUTS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(AppSecRequestContext.class, "wafTimeouts"); private static final AtomicIntegerFieldUpdater RASP_TIMEOUTS_UPDATER = @@ -362,7 +364,7 @@ void setScheme(String scheme) { this.scheme = scheme; } - String getMethod() { + public String getMethod() { return method; } @@ -382,6 +384,30 @@ void setRawURI(String savedRawURI) { this.savedRawURI = savedRawURI; } + public String getRoute() { + return route; + } + + public void setRoute(String route) { + this.route = route; + } + + public void setKeepOpenForApiSecurityPostProcessing(final boolean flag) { + this.keepOpenForApiSecurityPostProcessing = flag; + } + + public boolean isKeepOpenForApiSecurityPostProcessing() { + return this.keepOpenForApiSecurityPostProcessing; + } + + public void setApiSecurityEndpointHash(long hash) { + this.apiSecurityEndpointHash = hash; + } + + public Long getApiSecurityEndpointHash() { + return this.apiSecurityEndpointHash; + } + void addRequestHeader(String name, String value) { if (finishedRequestHeaders) { throw new IllegalStateException("Request headers were said to be finished before"); @@ -567,37 +593,34 @@ public String getSessionId() { return sessionId; } + /** + * Close the context and release all resources. This method is idempotent and can be called + * multiple times. For each root span, this method is always called from + * CoreTracer#onRootSpanPublished. + */ @Override public void close() { - final AgentSpan span = AgentTracer.activeSpan(); - close(span != null && span.isRequiresPostProcessing()); - } - - /* end interface for GatewayBridge */ - - /* Should be accessible from the modules */ - - public void close(boolean requiresPostProcessing) { if (!requestEndCalled) { log.debug(SEND_TELEMETRY, "Request end event was not called before close"); } - if (additive != null) { - log.debug( - SEND_TELEMETRY, "WAF object had not been closed (probably missed request-end event)"); - closeAdditive(); - } - derivatives = null; - - // check if we might need to further post process data related to the span in order to not free - // related data - if (requiresPostProcessing) { - return; + // For API Security, we sometimes keep contexts open for late processing. In that case, this + // flag needs to be + // later reset by the API Security post-processor and close must be called again. + if (!keepOpenForApiSecurityPostProcessing) { + if (additive != null) { + log.debug( + SEND_TELEMETRY, "WAF object had not been closed (probably missed request-end event)"); + closeAdditive(); + } + collectedCookies = null; + requestHeaders.clear(); + responseHeaders.clear(); + persistentData.clear(); + if (derivatives != null) { + derivatives.clear(); + derivatives = null; + } } - - collectedCookies = null; - requestHeaders.clear(); - responseHeaders.clear(); - persistentData.clear(); } /** @return the portion of the body read so far, if any */ @@ -663,6 +686,7 @@ List getStackTraces() { } public void reportDerivatives(Map data) { + log.debug("Reporting derivatives: {}", data); if (data == null || data.isEmpty()) return; if (derivatives == null) { @@ -672,11 +696,13 @@ public void reportDerivatives(Map data) { } } - boolean commitDerivatives(TraceSegment traceSegment) { + public boolean commitDerivatives(TraceSegment traceSegment) { + log.debug("Committing derivatives: {} for {}", derivatives, traceSegment); if (traceSegment == null || derivatives == null) { return false; } derivatives.forEach(traceSegment::setTagTop); + log.debug("Committed derivatives: {} for {}", derivatives, traceSegment); derivatives = null; return true; } diff --git a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java index 54665d12ca9..95831f90c53 100644 --- a/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java +++ b/dd-java-agent/appsec/src/main/java/com/datadog/appsec/gateway/GatewayBridge.java @@ -7,7 +7,7 @@ import static com.datadog.appsec.gateway.AppSecRequestContext.RESPONSE_HEADERS_ALLOW_LIST; import com.datadog.appsec.AppSecSystem; -import com.datadog.appsec.api.security.ApiSecurityRequestSampler; +import com.datadog.appsec.api.security.ApiSecuritySampler; import com.datadog.appsec.config.TraceSegmentPostProcessor; import com.datadog.appsec.event.EventProducerService; import com.datadog.appsec.event.EventProducerService.DataSubscriberInfo; @@ -20,7 +20,6 @@ import com.datadog.appsec.event.data.SingletonDataBundle; import com.datadog.appsec.report.AppSecEvent; import com.datadog.appsec.report.AppSecEventWrapper; -import datadog.trace.api.Config; import datadog.trace.api.ProductTraceSource; import datadog.trace.api.gateway.Events; import datadog.trace.api.gateway.Flow; @@ -84,7 +83,7 @@ public class GatewayBridge { private final SubscriptionService subscriptionService; private final EventProducerService producerService; - private final ApiSecurityRequestSampler requestSampler; + private final ApiSecuritySampler requestSampler; private final List traceSegmentPostProcessors; // subscriber cache @@ -110,7 +109,7 @@ public class GatewayBridge { public GatewayBridge( SubscriptionService subscriptionService, EventProducerService producerService, - ApiSecurityRequestSampler requestSampler, + ApiSecuritySampler requestSampler, List traceSegmentPostProcessors) { this.subscriptionService = subscriptionService; this.producerService = producerService; @@ -661,12 +660,14 @@ private NoopFlow onRequestEnded(RequestContext ctx_, IGSpanInfo spanInfo) { } ctx.setRequestEndCalled(); - maybeExtractSchemas(ctx); - - // WAF call - ctx.closeAdditive(); - TraceSegment traceSeg = ctx_.getTraceSegment(); + Map tags = spanInfo.getTags(); + + if (maybeSampleForApiSecurity(ctx, spanInfo, tags)) { + ctx.setKeepOpenForApiSecurityPostProcessing(true); + } else { + ctx.closeAdditive(); + } // AppSec report metric and events for web span only if (traceSeg != null) { @@ -688,7 +689,7 @@ private NoopFlow onRequestEnded(RequestContext ctx_, IGSpanInfo spanInfo) { traceSeg.setTagTop("network.client.ip", ctx.getPeerAddress()); // Reflect client_ip as actor.ip for backward compatibility - Object clientIp = spanInfo.getTags().get(Tags.HTTP_CLIENT_IP); + Object clientIp = tags.get(Tags.HTTP_CLIENT_IP); if (clientIp != null) { traceSeg.setTagTop("actor.ip", clientIp); } @@ -728,10 +729,21 @@ private NoopFlow onRequestEnded(RequestContext ctx_, IGSpanInfo spanInfo) { } } - ctx.close(spanInfo.isRequiresPostProcessing()); + ctx.close(); return NoopFlow.INSTANCE; } + private boolean maybeSampleForApiSecurity( + AppSecRequestContext ctx, IGSpanInfo spanInfo, Map tags) { + log.debug("Checking API Security for end of request handler on span: {}", spanInfo.getSpanId()); + // API Security sampling requires http.route tag. + final Object route = tags.get(Tags.HTTP_ROUTE); + if (route != null) { + ctx.setRoute(route.toString()); + } + return requestSampler.preSampleRequest(ctx); + } + private Flow onRequestHeadersDone(RequestContext ctx_) { AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC); if (ctx == null || ctx.isReqDataPublished()) { @@ -952,40 +964,6 @@ private Flow maybePublishResponseData(AppSecRequestContext ctx) { } } - private void maybeExtractSchemas(AppSecRequestContext ctx) { - boolean extractSchema = false; - if (Config.get().isApiSecurityEnabled() && requestSampler != null) { - extractSchema = requestSampler.sampleRequest(); - } - - if (!extractSchema) { - return; - } - - while (true) { - DataSubscriberInfo subInfo = requestEndSubInfo; - if (subInfo == null) { - subInfo = producerService.getDataSubscribers(KnownAddresses.WAF_CONTEXT_PROCESSOR); - requestEndSubInfo = subInfo; - } - if (subInfo == null || subInfo.isEmpty()) { - return; - } - - DataBundle bundle = - new SingletonDataBundle<>( - KnownAddresses.WAF_CONTEXT_PROCESSOR, - Collections.singletonMap("extract-schema", true)); - try { - GatewayContext gwCtx = new GatewayContext(false); - producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx); - return; - } catch (ExpiredSubscriberInfoException e) { - requestEndSubInfo = null; - } - } - } - private static Map> parseQueryStringParams( String queryString, Charset uriEncoding) { if (queryString == null) { diff --git a/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/api/security/ApiSecurityRequestSamplerTest.groovy b/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/api/security/ApiSecurityRequestSamplerTest.groovy deleted file mode 100644 index ff256bc21fb..00000000000 --- a/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/api/security/ApiSecurityRequestSamplerTest.groovy +++ /dev/null @@ -1,55 +0,0 @@ -package com.datadog.appsec.api.security - -import datadog.trace.api.Config -import datadog.trace.test.util.DDSpecification -import spock.lang.Shared - -class ApiSecurityRequestSamplerTest extends DDSpecification { - - @Shared - static final float DEFAULT_SAMPLE_RATE = Config.get().getApiSecurityRequestSampleRate() - - void 'Api Security Request Sample Rate'() { - given: - def config = Spy(Config.get()) - config.getApiSecurityRequestSampleRate() >> sampleRate - def sampler = new ApiSecurityRequestSampler(config) - - when: - def numOfRequest = expectedSampledRequests.size() - def results = new int[numOfRequest] - for (int i = 0; i < numOfRequest; i++) { - results[i] = sampler.sampleRequest() ? 1 : 0 - } - - then: - results == expectedSampledRequests as int[] - - where: - sampleRate | expectedSampledRequests - DEFAULT_SAMPLE_RATE | [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0] // Default sample rate - 10% - 0.0 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] - 0.1 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0] - 0.25 | [0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1] - 0.33 | [0, 0, 0, 1, 0, 0, 1, 0, 0, 1, 0, 0, 1] - 0.5 | [0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1] - 0.75 | [0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1] - 0.9 | [0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0] - 0.99 | [0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] - 1.0 | [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] - 1.25 | [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] // Wrong sample rate - use 100% - -0.5 | [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1] // Wrong sample rate - use 100% - } - - void 'update sample rate'() { - given: - def config = Spy(Config.get()) - def sampler = new ApiSecurityRequestSampler(config) - - when: - sampler.setSampling(0.2) - - then: - sampler.sampling == 20 - } -} diff --git a/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/api/security/ApiSecuritySamplerTest.groovy b/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/api/security/ApiSecuritySamplerTest.groovy new file mode 100644 index 00000000000..a4ef9984786 --- /dev/null +++ b/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/api/security/ApiSecuritySamplerTest.groovy @@ -0,0 +1,219 @@ +package com.datadog.appsec.api.security + +import com.datadog.appsec.gateway.AppSecRequestContext +import datadog.trace.api.time.ControllableTimeSource +import datadog.trace.test.util.DDSpecification + +class ApiSecuritySamplerTest extends DDSpecification { + + void 'happy path with single request'() { + given: + final ctx = createContext('route1', 'GET', 200) + final sampler = new ApiSecuritySamplerImpl() + + when: + final preSampled = sampler.preSampleRequest(ctx) + + then: + preSampled + + when: + ctx.setKeepOpenForApiSecurityPostProcessing(true) + final sampled = sampler.sampleRequest(ctx) + + then: + sampled + } + + void 'second request is not sampled for the same endpoint'() { + given: + AppSecRequestContext ctx1 = createContext('route1', 'GET', 200) + AppSecRequestContext ctx2 = createContext('route1', 'GET', 200) + final sampler = new ApiSecuritySamplerImpl() + + when: + final preSampled1 = sampler.preSampleRequest(ctx1) + ctx1.setKeepOpenForApiSecurityPostProcessing(true) + final sampled1 = sampler.sampleRequest(ctx1) + sampler.releaseOne() + + then: + preSampled1 + sampled1 + + when: + final preSampled2 = sampler.preSampleRequest(ctx2) + + then: + !preSampled2 + } + + void 'preSampleRequest with maximum concurrent contexts'() { + given: + final ctx1 = Spy(createContext('route2', 'GET', 200)) + final ctx2 = Spy(createContext('route3', 'GET', 200)) + final sampler = new ApiSecuritySamplerImpl() + assert sampler.MAX_POST_PROCESSING_TASKS > 0 + + when: 'exhaust the maximum number of concurrent contexts' + final List preSampled1 = (1..sampler.MAX_POST_PROCESSING_TASKS).collect { + sampler.preSampleRequest(createContext('route1', 'GET', 200 + it)) + } + + then: + preSampled1.every { it } + + and: 'try to sample one more' + final preSampled2 = sampler.preSampleRequest(ctx1) + + then: + !preSampled2 + + when: 'release one context' + sampler.releaseOne() + + and: 'next can be sampled' + final preSampled3 = sampler.preSampleRequest(ctx2) + + then: + preSampled3 + } + + void 'preSampleRequest with null route'() { + given: + def ctx = createContext(null, 'GET', 200) + def sampler = new ApiSecuritySamplerImpl() + + when: + def preSampled = sampler.preSampleRequest(ctx) + + then: + !preSampled + } + + void 'preSampleRequest with null method'() { + given: + def ctx = createContext('route1', null, 200) + def sampler = new ApiSecuritySamplerImpl() + + when: + def preSampled = sampler.preSampleRequest(ctx) + + then: + !preSampled + } + + void 'preSampleRequest with 0 status code'() { + given: + def ctx = createContext('route1', 'GET', 0) + def sampler = new ApiSecuritySamplerImpl() + + when: + def preSampled = sampler.preSampleRequest(ctx) + + then: + !preSampled + } + + void 'sampleRequest with null context throws'() { + given: + def sampler = new ApiSecuritySamplerImpl() + + when: + sampler.preSampleRequest(null) + + then: + thrown(NullPointerException) + } + + void 'sampleRequest without prior preSampleRequest never works'() { + given: + def sampler = new ApiSecuritySamplerImpl() + def ctx = createContext('route1', 'GET', 200) + + when: + def sampled = sampler.sampleRequest(ctx) + + then: + !sampled + } + + void 'sampleRequest honors expiration'() { + given: + def ctx = createContext('route1', 'GET', 200) + ctx.setApiSecurityEndpointHash(42L) + ctx.setKeepOpenForApiSecurityPostProcessing(true) + final timeSource = new ControllableTimeSource() + timeSource.set(0) + final long expirationTimeInMs = 10L + final long expirationTimeInNs = expirationTimeInMs * 1_000_000 + def sampler = new ApiSecuritySamplerImpl(10, expirationTimeInMs, timeSource) + + when: + def sampled = sampler.sampleRequest(ctx) + + then: + sampled + + when: + sampled = sampler.sampleRequest(ctx) + + then: 'second request is not sampled' + !sampled + + when: 'expiration time has passed' + timeSource.advance(expirationTimeInNs) + sampled = sampler.sampleRequest(ctx) + + then: 'request is sampled again' + sampled + } + + void 'internal accessMap never goes beyond capacity'() { + given: + final timeSource = new ControllableTimeSource() + timeSource.set(0) + final long expirationTimeInMs = 10_000 + final int maxCapacity = 10 + ApiSecuritySamplerImpl sampler = new ApiSecuritySamplerImpl(10, expirationTimeInMs, timeSource) + + expect: + for (int i = 0; i < maxCapacity * 10; i++) { + timeSource.advance(1_000_000) + final ctx = createContext('route1', 'GET', 200 + 1) + ctx.setApiSecurityEndpointHash(i as long) + ctx.setKeepOpenForApiSecurityPostProcessing(true) + assert sampler.sampleRequest(ctx) + assert sampler.accessMap.size() <= maxCapacity + } + } + + void 'expired entries are purged from internal accessMap'() { + given: + final timeSource = new ControllableTimeSource() + timeSource.set(0) + final long expirationTimeInMs = 10_000 + final int maxCapacity = 10 + ApiSecuritySamplerImpl sampler = new ApiSecuritySamplerImpl(10, expirationTimeInMs, timeSource) + + expect: + for (int i = 0; i < maxCapacity * 10; i++) { + final ctx = createContext('route1', 'GET', 200 + 1) + ctx.setApiSecurityEndpointHash(i as long) + ctx.setKeepOpenForApiSecurityPostProcessing(true) + assert sampler.sampleRequest(ctx) + assert sampler.accessMap.size() <= 2 + if (i % 2) { + timeSource.advance(expirationTimeInMs * 1_000_000) + } + } + } + + private static AppSecRequestContext createContext(final String route, final String method, int statusCode) { + final AppSecRequestContext ctx = new AppSecRequestContext() + ctx.setRoute(route) + ctx.setMethod(method) + ctx.setResponseStatus(statusCode) + ctx + } +} diff --git a/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/api/security/AppSecSpanPostProcessorTest.groovy b/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/api/security/AppSecSpanPostProcessorTest.groovy new file mode 100644 index 00000000000..e2287682bcd --- /dev/null +++ b/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/api/security/AppSecSpanPostProcessorTest.groovy @@ -0,0 +1,251 @@ +package com.datadog.appsec.api.security + +import com.datadog.appsec.event.EventProducerService +import com.datadog.appsec.event.ExpiredSubscriberInfoException +import com.datadog.appsec.event.data.KnownAddresses +import com.datadog.appsec.gateway.AppSecRequestContext +import datadog.trace.api.gateway.RequestContext +import datadog.trace.api.internal.TraceSegment +import datadog.trace.bootstrap.instrumentation.api.AgentSpan +import datadog.trace.test.util.DDSpecification + +class AppSecSpanPostProcessorTest extends DDSpecification { + + void 'schema extracted on happy path'() { + given: + def sampler = Mock(ApiSecuritySamplerImpl) + def producer = Mock(EventProducerService) + def subInfo = Mock(EventProducerService.DataSubscriberInfo) + def span = Mock(AgentSpan) + def reqCtx = Mock(RequestContext) + def traceSegment = Mock(TraceSegment) + def ctx = Mock(AppSecRequestContext) + def processor = new AppSecSpanPostProcessor(sampler, producer) + + when: + processor.process(span, { false }) + + then: + noExceptionThrown() + 1 * span.getRequestContext() >> reqCtx + 1 * reqCtx.getData(_) >> ctx + 1 * ctx.isKeepOpenForApiSecurityPostProcessing() >> true + 1 * sampler.sampleRequest(_) >> true + 1 * reqCtx.getTraceSegment() >> traceSegment + 1 * producer.getDataSubscribers(KnownAddresses.WAF_CONTEXT_PROCESSOR) >> subInfo + 1 * subInfo.isEmpty() >> false + 1 * producer.publishDataEvent(_, ctx, _, _) + 1 * ctx.commitDerivatives(traceSegment) + 1 * ctx.setKeepOpenForApiSecurityPostProcessing(false) + 1 * ctx.closeAdditive() + 1 * ctx.close() + 1 * sampler.releaseOne() + 0 * _ + } + + void 'no schema extracted if sampling is false'() { + given: + def sampler = Mock(ApiSecuritySamplerImpl) + def producer = Mock(EventProducerService) + def span = Mock(AgentSpan) + def reqCtx = Mock(RequestContext) + def ctx = Mock(AppSecRequestContext) + def processor = new AppSecSpanPostProcessor(sampler, producer) + + when: + processor.process(span, { false }) + + then: + noExceptionThrown() + 1 * span.getRequestContext() >> reqCtx + 1 * reqCtx.getData(_) >> ctx + 1 * ctx.isKeepOpenForApiSecurityPostProcessing() >> true + 1 * sampler.sampleRequest(_) >> false + 1 * ctx.setKeepOpenForApiSecurityPostProcessing(false) + 1 * ctx.closeAdditive() + 1 * ctx.close() + 1 * sampler.releaseOne() + 0 * _ + } + + void 'permit is released even if request context close throws'() { + given: + def sampler = Mock(ApiSecuritySamplerImpl) + def producer = Mock(EventProducerService) + def span = Mock(AgentSpan) + def reqCtx = Mock(RequestContext) + def traceSegment = Mock(TraceSegment) + def ctx = Mock(AppSecRequestContext) + def processor = new AppSecSpanPostProcessor(sampler, producer) + + when: + processor.process(span, { false }) + + then: + noExceptionThrown() + 1 * span.getRequestContext() >> reqCtx + 1 * reqCtx.getData(_) >> ctx + 1 * ctx.isKeepOpenForApiSecurityPostProcessing() >> true + 1 * sampler.sampleRequest(_) >> true + 1 * reqCtx.getTraceSegment() >> traceSegment + 1 * producer.getDataSubscribers(_) >> null + 1 * ctx.setKeepOpenForApiSecurityPostProcessing(false) + 1 * ctx.closeAdditive() + 1 * ctx.close() >> { throw new RuntimeException() } + 1 * sampler.releaseOne() + 0 * _ + } + + void 'context is cleaned up on timeout'() { + given: + def sampler = Mock(ApiSecuritySamplerImpl) + def producer = Mock(EventProducerService) + def span = Mock(AgentSpan) + def reqCtx = Mock(RequestContext) + def ctx = Mock(AppSecRequestContext) + def processor = new AppSecSpanPostProcessor(sampler, producer) + + when: + processor.process(span, { true }) + + then: + noExceptionThrown() + 1 * span.getRequestContext() >> reqCtx + 1 * reqCtx.getData(_) >> ctx + 1 * ctx.isKeepOpenForApiSecurityPostProcessing() >> true + 1 * ctx.setKeepOpenForApiSecurityPostProcessing(false) + 1 * ctx.closeAdditive() + 1 * ctx.close() + 1 * sampler.releaseOne() + 0 * _ + } + + void 'process null request context does nothing'() { + given: + def sampler = Mock(ApiSecuritySamplerImpl) + def producer = Mock(EventProducerService) + def span = Mock(AgentSpan) + def processor = new AppSecSpanPostProcessor(sampler, producer) + + when: + processor.process(span, { false }) + + then: + noExceptionThrown() + 1 * span.getRequestContext() >> null + 0 * _ + } + + void 'process null appsec request context does nothing'() { + given: + def sampler = Mock(ApiSecuritySamplerImpl) + def producer = Mock(EventProducerService) + def span = Mock(AgentSpan) + def reqCtx = Mock(RequestContext) + def processor = new AppSecSpanPostProcessor(sampler, producer) + + when: + processor.process(span, { false }) + + then: + noExceptionThrown() + 1 * span.getRequestContext() >> reqCtx + 1 * reqCtx.getData(_) >> null + 0 * _ + } + + void 'process already closed context does nothing'() { + given: + def sampler = Mock(ApiSecuritySamplerImpl) + def producer = Mock(EventProducerService) + def span = Mock(AgentSpan) + def reqCtx = Mock(RequestContext) + def ctx = Mock(AppSecRequestContext) + def processor = new AppSecSpanPostProcessor(sampler, producer) + + when: + processor.process(span, { false }) + + then: + noExceptionThrown() + 1 * span.getRequestContext() >> reqCtx + 1 * reqCtx.getData(_) >> ctx + 1 * ctx.isKeepOpenForApiSecurityPostProcessing() >> false + 0 * _ + } + + void 'process throws on null span'() { + given: + def sampler = Mock(ApiSecuritySamplerImpl) + def producer = Mock(EventProducerService) + def processor = new AppSecSpanPostProcessor(sampler, producer) + + when: + processor.process(null, { false }) + + then: + thrown(NullPointerException) + 0 * _ + } + + void 'empty event subscription does not break the process'() { + given: + def sampler = Mock(ApiSecuritySamplerImpl) + def producer = Mock(EventProducerService) + def subInfo = Mock(EventProducerService.DataSubscriberInfo) + def span = Mock(AgentSpan) + def reqCtx = Mock(RequestContext) + def traceSegment = Mock(TraceSegment) + def ctx = Mock(AppSecRequestContext) + def processor = new AppSecSpanPostProcessor(sampler, producer) + + when: + processor.process(span, { false }) + + then: + noExceptionThrown() + 1 * span.getRequestContext() >> reqCtx + 1 * reqCtx.getData(_) >> ctx + 1 * ctx.isKeepOpenForApiSecurityPostProcessing() >> true + 1 * sampler.sampleRequest(_) >> true + 1 * reqCtx.getTraceSegment() >> traceSegment + 1 * producer.getDataSubscribers(_) >> subInfo + 1 * subInfo.isEmpty() >> true + 1 * ctx.setKeepOpenForApiSecurityPostProcessing(false) + 1 * ctx.closeAdditive() + 1 * ctx.close() + 1 * sampler.releaseOne() + 0 * _ + } + + void 'expired event subscription does not break the process'() { + given: + def sampler = Mock(ApiSecuritySamplerImpl) + def producer = Mock(EventProducerService) + def subInfo = Mock(EventProducerService.DataSubscriberInfo) + def span = Mock(AgentSpan) + def reqCtx = Mock(RequestContext) + def traceSegment = Mock(TraceSegment) + def ctx = Mock(AppSecRequestContext) + def processor = new AppSecSpanPostProcessor(sampler, producer) + + when: + processor.process(span, { false }) + + then: + noExceptionThrown() + 1 * span.getRequestContext() >> reqCtx + 1 * reqCtx.getData(_) >> ctx + 1 * ctx.isKeepOpenForApiSecurityPostProcessing() >> true + 1 * sampler.sampleRequest(_) >> true + 1 * reqCtx.getTraceSegment() >> traceSegment + 1 * producer.getDataSubscribers(_) >> subInfo + 1 * subInfo.isEmpty() >> false + 1 * producer.publishDataEvent(_, ctx, _, _) >> { throw new ExpiredSubscriberInfoException() } + 1 * ctx.setKeepOpenForApiSecurityPostProcessing(false) + 1 * ctx.closeAdditive() + 1 * ctx.close() + 1 * sampler.releaseOne() + 0 * _ + } +} diff --git a/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/config/AppSecConfigServiceImplSpecification.groovy b/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/config/AppSecConfigServiceImplSpecification.groovy index 33f07f611b5..56fd63f89f8 100644 --- a/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/config/AppSecConfigServiceImplSpecification.groovy +++ b/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/config/AppSecConfigServiceImplSpecification.groovy @@ -14,7 +14,6 @@ import datadog.trace.test.util.DDSpecification import java.nio.file.Files import java.nio.file.Path -import static datadog.remoteconfig.Capabilities.CAPABILITY_ASM_API_SECURITY_SAMPLE_RATE import static datadog.remoteconfig.Capabilities.CAPABILITY_ASM_ACTIVATION import static datadog.remoteconfig.Capabilities.CAPABILITY_ASM_AUTO_USER_INSTRUM_MODE import static datadog.remoteconfig.Capabilities.CAPABILITY_ASM_CUSTOM_BLOCKING_RESPONSE @@ -258,7 +257,6 @@ class AppSecConfigServiceImplSpecification extends DDSpecification { } 1 * poller.addConfigurationEndListener(_) >> { listeners.savedConfEndListener = it[0] } 1 * poller.addCapabilities(CAPABILITY_ASM_ACTIVATION) - 1 * poller.addCapabilities(CAPABILITY_ASM_API_SECURITY_SAMPLE_RATE) 1 * poller.addCapabilities(CAPABILITY_ASM_AUTO_USER_INSTRUM_MODE) 1 * poller.addCapabilities(CAPABILITY_ASM_DD_RULES | CAPABILITY_ASM_IP_BLOCKING @@ -411,7 +409,6 @@ class AppSecConfigServiceImplSpecification extends DDSpecification { } 1 * poller.addConfigurationEndListener(_) >> { listeners.savedConfEndListener = it[0] } 1 * poller.addCapabilities(CAPABILITY_ASM_ACTIVATION) - 1 * poller.addCapabilities(CAPABILITY_ASM_API_SECURITY_SAMPLE_RATE) 1 * poller.addCapabilities(CAPABILITY_ASM_AUTO_USER_INSTRUM_MODE) 1 * poller.addCapabilities(CAPABILITY_ASM_DD_RULES | CAPABILITY_ASM_IP_BLOCKING @@ -495,7 +492,6 @@ class AppSecConfigServiceImplSpecification extends DDSpecification { | CAPABILITY_ASM_CUSTOM_RULES | CAPABILITY_ASM_CUSTOM_BLOCKING_RESPONSE | CAPABILITY_ASM_TRUSTED_IPS - | CAPABILITY_ASM_API_SECURITY_SAMPLE_RATE | CAPABILITY_ASM_RASP_SQLI | CAPABILITY_ASM_RASP_SSRF | CAPABILITY_ASM_RASP_CMDI @@ -596,7 +592,6 @@ class AppSecConfigServiceImplSpecification extends DDSpecification { listeners.savedFeaturesListener = it[2] } 1 * poller.addConfigurationEndListener(_) >> { listeners.savedConfEndListener = it[0] } - 1 * poller.addCapabilities(CAPABILITY_ASM_API_SECURITY_SAMPLE_RATE) 1 * poller.addCapabilities(CAPABILITY_ASM_AUTO_USER_INSTRUM_MODE) 1 * poller.addCapabilities(CAPABILITY_ASM_DD_RULES | CAPABILITY_ASM_IP_BLOCKING diff --git a/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/gateway/AppSecRequestContextSpecification.groovy b/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/gateway/AppSecRequestContextSpecification.groovy index 92d22cb6113..a29d68d5715 100644 --- a/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/gateway/AppSecRequestContextSpecification.groovy +++ b/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/gateway/AppSecRequestContextSpecification.groovy @@ -251,7 +251,6 @@ class AppSecRequestContextSpecification extends DDSpecification { void 'test that internal data is cleared on close'() { setup: final ctx = new AppSecRequestContext() - final fullCleanup = !postProcessing when: ctx.requestHeaders.put('Accept', ['*']) @@ -260,19 +259,17 @@ class AppSecRequestContextSpecification extends DDSpecification { ctx.persistentData.put(KnownAddresses.REQUEST_METHOD, 'GET') ctx.derivatives = ['a': 'b'] ctx.additive = createAdditive() - ctx.close(postProcessing) + ctx.close() then: ctx.additive == null ctx.derivatives == null + ctx.additive == null - ctx.requestHeaders.isEmpty() == fullCleanup - ctx.responseHeaders.isEmpty() == fullCleanup - ctx.cookies.isEmpty() == fullCleanup - ctx.persistentData.isEmpty() == fullCleanup - - where: - postProcessing << [true, false] + ctx.requestHeaders.isEmpty() + ctx.responseHeaders.isEmpty() + ctx.cookies.isEmpty() + ctx.persistentData.isEmpty() } def "test increase and get WafTimeouts"() { diff --git a/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/gateway/GatewayBridgeSpecification.groovy b/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/gateway/GatewayBridgeSpecification.groovy index c13156d461b..c2453c52bf2 100644 --- a/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/gateway/GatewayBridgeSpecification.groovy +++ b/dd-java-agent/appsec/src/test/groovy/com/datadog/appsec/gateway/GatewayBridgeSpecification.groovy @@ -1,6 +1,7 @@ package com.datadog.appsec.gateway import com.datadog.appsec.AppSecSystem +import com.datadog.appsec.api.security.ApiSecuritySamplerImpl import com.datadog.appsec.config.TraceSegmentPostProcessor import com.datadog.appsec.event.EventDispatcher import com.datadog.appsec.event.EventProducerService @@ -75,7 +76,8 @@ class GatewayBridgeSpecification extends DDSpecification { } TraceSegmentPostProcessor pp = Mock() - GatewayBridge bridge = new GatewayBridge(ig, eventDispatcher, null, [pp]) + ApiSecuritySamplerImpl requestSampler = Mock(ApiSecuritySamplerImpl) + GatewayBridge bridge = new GatewayBridge(ig, eventDispatcher, requestSampler, [pp]) Supplier> requestStartedCB BiFunction> requestEndedCB @@ -159,7 +161,7 @@ class GatewayBridgeSpecification extends DDSpecification { 1 * spanInfo.getTags() >> ['http.client_ip': '1.1.1.1'] 1 * mockAppSecCtx.transferCollectedEvents() >> [event] 1 * mockAppSecCtx.peerAddress >> '2001::1' - 1 * mockAppSecCtx.close(false) + 1 * mockAppSecCtx.close() 1 * traceSegment.setTagTop("_dd.appsec.enabled", 1) 1 * traceSegment.setTagTop("_dd.runtime_family", "jvm") 1 * traceSegment.setTagTop('appsec.event', true) @@ -167,7 +169,6 @@ class GatewayBridgeSpecification extends DDSpecification { 1 * traceSegment.setTagTop('http.request.headers.accept', 'header_value') 1 * traceSegment.setTagTop('http.response.headers.content-type', 'text/html; charset=UTF-8') 1 * traceSegment.setTagTop('network.client.ip', '2001::1') - 1 * mockAppSecCtx.closeAdditive() flow.result == null flow.action == Flow.Action.Noop.INSTANCE } @@ -969,7 +970,9 @@ class GatewayBridgeSpecification extends DDSpecification { getData(RequestContextSlot.APPSEC) >> mockAppSecCtx getTraceSegment() >> traceSegment } - final spanInfo = Mock(AgentSpan) + final spanInfo = Mock(AgentSpan) { + getTags() >> ['http.route':'/'] + } when: requestEndedCB.apply(mockCtx, spanInfo) diff --git a/dd-smoke-tests/appsec/springboot/src/main/java/datadog/smoketest/appsec/springboot/controller/WebController.java b/dd-smoke-tests/appsec/springboot/src/main/java/datadog/smoketest/appsec/springboot/controller/WebController.java index 8529a048fbd..a71fa52b689 100644 --- a/dd-smoke-tests/appsec/springboot/src/main/java/datadog/smoketest/appsec/springboot/controller/WebController.java +++ b/dd-smoke-tests/appsec/springboot/src/main/java/datadog/smoketest/appsec/springboot/controller/WebController.java @@ -205,6 +205,11 @@ public String shiCmdParamsAndFile( return "EXECUTED"; } + @GetMapping("/api_security/sampling/{status_code}") + public ResponseEntity apiSecuritySampling(@PathVariable("status_code") int statusCode) { + return ResponseEntity.status(statusCode).body("EXECUTED"); + } + private void withProcess(final Operation op) { Process process = null; try { diff --git a/dd-smoke-tests/appsec/springboot/src/test/groovy/datadog/smoketest/appsec/SpringBootSmokeTest.groovy b/dd-smoke-tests/appsec/springboot/src/test/groovy/datadog/smoketest/appsec/SpringBootSmokeTest.groovy index e7aaea58024..42768f998b3 100644 --- a/dd-smoke-tests/appsec/springboot/src/test/groovy/datadog/smoketest/appsec/SpringBootSmokeTest.groovy +++ b/dd-smoke-tests/appsec/springboot/src/test/groovy/datadog/smoketest/appsec/SpringBootSmokeTest.groovy @@ -6,12 +6,18 @@ import okhttp3.FormBody import okhttp3.MediaType import okhttp3.Request import okhttp3.RequestBody +import okhttp3.Response import spock.lang.Shared import java.nio.charset.StandardCharsets class SpringBootSmokeTest extends AbstractAppSecServerSmokeTest { + @Override + def logLevel() { + 'DEBUG' + } + @Shared String buildDir = new File(System.getProperty("datadog.smoketest.builddir")).absolutePath @Shared @@ -650,7 +656,36 @@ class SpringBootSmokeTest extends AbstractAppSecServerSmokeTest { 'cmd' | ['$(cat /etc/passwd 1>&2 ; echo .)'] | null 'cmdWithParams' | ['$(cat /etc/passwd 1>&2 ; echo .)'] | ['param'] 'cmdParamsAndFile' | ['$(cat /etc/passwd 1>&2 ; echo .)'] | ['param'] + } + void 'API Security samples only one request per endpoint'() { + given: + def url = "http://localhost:${httpPort}/api_security/sampling/200?test=value" + def client = OkHttpUtils.clientBuilder().build() + def request = new Request.Builder() + .url(url) + .addHeader('X-My-Header', "value") + .get() + .build() + + when: + List responses = (1..3).collect { + client.newCall(request).execute() + } + + then: + responses.each { + assert it.code() == 200 + } + waitForTraceCount(3) + def spans = rootSpans.toList().toSorted { it.span.duration } + spans.size() == 3 + def sampledSpans = spans.findAll { it.meta.keySet().any { it.startsWith('_dd.appsec.s.req.') } } + sampledSpans.size() == 1 + def span = sampledSpans[0] + span.meta.containsKey('_dd.appsec.s.req.query') + span.meta.containsKey('_dd.appsec.s.req.params') + span.meta.containsKey('_dd.appsec.s.req.headers') } } diff --git a/dd-smoke-tests/appsec/src/main/groovy/datadog/smoketest/appsec/AbstractAppSecServerSmokeTest.groovy b/dd-smoke-tests/appsec/src/main/groovy/datadog/smoketest/appsec/AbstractAppSecServerSmokeTest.groovy index ef240f03752..0d8eb04f393 100644 --- a/dd-smoke-tests/appsec/src/main/groovy/datadog/smoketest/appsec/AbstractAppSecServerSmokeTest.groovy +++ b/dd-smoke-tests/appsec/src/main/groovy/datadog/smoketest/appsec/AbstractAppSecServerSmokeTest.groovy @@ -44,6 +44,10 @@ abstract class AbstractAppSecServerSmokeTest extends AbstractServerSmokeTest { protected String[] defaultAppSecProperties = [ "-Ddd.appsec.enabled=${System.getProperty('smoke_test.appsec.enabled') ?: 'true'}", "-Ddd.profiling.enabled=false", + // TODO: Remove once this is the default value + "-Ddd.api-security.enabled=true", + "-Ddd.appsec.waf.timeout=300000", + "-DPOWERWAF_EXIT_ON_LEAK=true", // disable AppSec rate limit "-Ddd.appsec.trace.rate.limit=-1" ] + (System.getProperty('smoke_test.appsec.enabled') == 'inactive' ? diff --git a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java index 5e620d344e1..172b8f2051b 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/ConfigDefaults.java @@ -114,7 +114,7 @@ public final class ConfigDefaults { static final boolean DEFAULT_APPSEC_WAF_METRICS = true; static final int DEFAULT_APPSEC_WAF_TIMEOUT = 100000; // 0.1 s static final boolean DEFAULT_API_SECURITY_ENABLED = false; - static final float DEFAULT_API_SECURITY_REQUEST_SAMPLE_RATE = 0.1f; // 10 % + static final float DEFAULT_API_SECURITY_SAMPLE_DELAY = 30.0f; static final boolean DEFAULT_APPSEC_RASP_ENABLED = true; static final boolean DEFAULT_APPSEC_STACK_TRACE_ENABLED = true; static final int DEFAULT_APPSEC_MAX_STACK_TRACES = 2; diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/AppSecConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/AppSecConfig.java index 454ef0a5425..a7f71b6f664 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/AppSecConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/AppSecConfig.java @@ -26,7 +26,7 @@ public final class AppSecConfig { public static final String API_SECURITY_ENABLED = "api-security.enabled"; public static final String API_SECURITY_ENABLED_EXPERIMENTAL = "experimental.api-security.enabled"; - public static final String API_SECURITY_REQUEST_SAMPLE_RATE = "api-security.request.sample.rate"; + public static final String API_SECURITY_SAMPLE_DELAY = "api-security.sample.delay"; public static final String APPSEC_SCA_ENABLED = "appsec.sca.enabled"; public static final String APPSEC_RASP_ENABLED = "appsec.rasp.enabled"; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index 153bcca6565..50b9aba2cdb 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -162,8 +162,7 @@ public DDAgentWriter build() { null == prioritization ? FAST_LANE : prioritization, flushIntervalMilliseconds, TimeUnit.MILLISECONDS, - singleSpanSampler, - null); + singleSpanSampler); return new DDAgentWriter( traceProcessingWorker, diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java index 614865a6b5e..f298a5e8f6a 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java @@ -122,8 +122,7 @@ public DDIntakeWriter build() { prioritization, flushIntervalMilliseconds, TimeUnit.MILLISECONDS, - singleSpanSampler, - null); + singleSpanSampler); return new DDIntakeWriter( traceProcessingWorker, diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java index d2ed59f3f8a..6cd0ecdaed2 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/TraceProcessingWorker.java @@ -7,6 +7,7 @@ import datadog.communication.ddagent.DroppingPolicy; import datadog.trace.api.Config; +import datadog.trace.bootstrap.instrumentation.api.SpanPostProcessor; import datadog.trace.common.sampling.SingleSpanSampler; import datadog.trace.common.writer.ddagent.FlushEvent; import datadog.trace.common.writer.ddagent.Prioritization; @@ -14,8 +15,6 @@ import datadog.trace.core.CoreSpan; import datadog.trace.core.DDSpan; import datadog.trace.core.monitor.HealthMetrics; -import datadog.trace.core.postprocessor.SpanPostProcessor; -import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -53,8 +52,7 @@ public TraceProcessingWorker( final Prioritization prioritization, final long flushInterval, final TimeUnit timeUnit, - final SingleSpanSampler singleSpanSampler, - final SpanPostProcessor spanPostProcessor) { + final SingleSpanSampler singleSpanSampler) { this.capacity = capacity; this.primaryQueue = createQueue(capacity); this.secondaryQueue = createQueue(capacity); @@ -75,13 +73,7 @@ public TraceProcessingWorker( this.serializingHandler = new TraceSerializingHandler( - primaryQueue, - secondaryQueue, - healthMetrics, - dispatcher, - flushInterval, - timeUnit, - spanPostProcessor); + primaryQueue, secondaryQueue, healthMetrics, dispatcher, flushInterval, timeUnit); this.serializerThread = newAgentThread(TRACE_PROCESSOR, serializingHandler); } @@ -142,7 +134,6 @@ public static class TraceSerializingHandler implements Runnable { private final boolean doTimeFlush; private final PayloadDispatcher payloadDispatcher; private long lastTicks; - private final SpanPostProcessor spanPostProcessor; public TraceSerializingHandler( final MpscBlockingConsumerArrayQueue primaryQueue, @@ -150,8 +141,7 @@ public TraceSerializingHandler( final HealthMetrics healthMetrics, final PayloadDispatcher payloadDispatcher, final long flushInterval, - final TimeUnit timeUnit, - final SpanPostProcessor spanPostProcessor) { + final TimeUnit timeUnit) { this.primaryQueue = primaryQueue; this.secondaryQueue = secondaryQueue; this.healthMetrics = healthMetrics; @@ -163,7 +153,6 @@ public TraceSerializingHandler( } else { this.ticksRequiredToFlush = Long.MAX_VALUE; } - this.spanPostProcessor = spanPostProcessor; } @Override @@ -262,36 +251,26 @@ private void maybeTracePostProcessing(List trace) { return; } - // Filter spans that need post-processing - List spansToPostProcess = null; - for (DDSpan span : trace) { - if (span.isRequiresPostProcessing()) { - if (spansToPostProcess == null) { - spansToPostProcess = new ArrayList<>(); - } - spansToPostProcess.add(span); - } - } - - if (spansToPostProcess == null) { - return; - } - + final SpanPostProcessor postProcessor = SpanPostProcessor.Holder.INSTANCE; try { - long timeout = Config.get().getTracePostProcessingTimeout(); - long deadline = System.currentTimeMillis() + timeout; - BooleanSupplier timeoutCheck = () -> System.currentTimeMillis() > deadline; - - for (DDSpan span : spansToPostProcess) { - if (!spanPostProcessor.process(span, timeoutCheck)) { - log.debug("Span post-processing interrupted due to timeout."); - break; - } + final long timeout = Config.get().getTracePostProcessingTimeout(); + final long deadline = System.currentTimeMillis() + timeout; + final boolean[] timedOut = {false}; + final BooleanSupplier timeoutCheck = + () -> { + if (timedOut[0]) { + return true; + } + if (System.currentTimeMillis() > deadline) { + timedOut[0] = true; + } + return timedOut[0]; + }; + for (DDSpan span : trace) { + postProcessor.process(span, timeoutCheck); } } catch (Throwable e) { - if (log.isDebugEnabled()) { - log.debug("Error while trace post-processing", e); - } + log.debug("Error while trace post-processing", e); } } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java index 771d22e3758..19f3852a0a6 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java @@ -825,16 +825,6 @@ public void addLink(AgentSpanLink link) { } } - @Override - public boolean isRequiresPostProcessing() { - return context.isRequiresPostProcessing(); - } - - @Override - public void setRequiresPostProcessing(boolean requiresPostProcessing) { - context.setRequiresPostProcessing(requiresPostProcessing); - } - // to be accessible in Spock spies, which the field wouldn't otherwise be public long getStartTimeNano() { return startTimeNano; diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java index e6980e2b41d..70b96f2ff55 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java @@ -142,7 +142,6 @@ public class DDSpanContext private final boolean injectBaggageAsTags; private volatile int encodedOperationName; private volatile int encodedResourceName; - private volatile boolean requiresPostProcessing; private volatile CharSequence lastParentId; private final boolean isRemote; @@ -1044,14 +1043,6 @@ public void setMetaStructCurrent(String field, Object value) { setMetaStruct(field, value); } - public void setRequiresPostProcessing(boolean postProcessing) { - this.requiresPostProcessing = postProcessing; - } - - public boolean isRequiresPostProcessing() { - return requiresPostProcessing; - } - public CharSequence getLastParentId() { return lastParentId; } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java b/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java deleted file mode 100644 index 8751959456f..00000000000 --- a/dd-trace-core/src/main/java/datadog/trace/core/postprocessor/SpanPostProcessor.java +++ /dev/null @@ -1,26 +0,0 @@ -package datadog.trace.core.postprocessor; - -import datadog.trace.core.DDSpan; -import java.util.function.BooleanSupplier; - -/** - * Span Post-processing with a timeout check capability. - * - *

Implementations of this interface should carry out post-processing of spans while supporting - * interruption when a specified time limit is exceeded. The method {@code process} must check the - * state of {@code timeoutCheck} while processing span. If {@code timeoutCheck.getAsBoolean()} - * returns {@code true}, processing should be immediately halted, and the method should return - * {@code false}. If post-processing completes successfully before the timeout, the method should - * return {@code true}. - */ -public interface SpanPostProcessor { - /** - * Post-processes a span. - * - * @param span The span to be post-processed. - * @param timeoutCheck A timeout check returning {@code true} if the allotted time has elapsed. - * @return {@code true} if the span was successfully processed; {@code false} in case of a - * timeout. - */ - boolean process(DDSpan span, BooleanSupplier timeoutCheck); -} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceProcessingWorkerTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceProcessingWorkerTest.groovy index eeef4121580..2a37a69ec64 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceProcessingWorkerTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceProcessingWorkerTest.groovy @@ -7,7 +7,7 @@ import datadog.trace.core.DDSpan import datadog.trace.core.DDSpanContext import datadog.trace.core.PendingTrace import datadog.trace.core.monitor.HealthMetrics -import datadog.trace.core.postprocessor.SpanPostProcessor +import datadog.trace.bootstrap.instrumentation.api.SpanPostProcessor import datadog.trace.test.util.DDSpecification import spock.util.concurrent.PollingConditions @@ -44,7 +44,6 @@ class TraceProcessingWorkerTest extends DDSpecification { FAST_LANE, 1, TimeUnit.NANOSECONDS, - null, null ) // stop heartbeats from being throttled @@ -71,7 +70,6 @@ class TraceProcessingWorkerTest extends DDSpecification { FAST_LANE, 1, TimeUnit.NANOSECONDS, - null, null ) // stop heartbeats from being throttled def timeConditions = new PollingConditions(timeout: 1, initialDelay: 1, factor: 1.25) @@ -97,7 +95,7 @@ class TraceProcessingWorkerTest extends DDSpecification { false }, FAST_LANE, - 100, TimeUnit.SECONDS, null, null) // prevent heartbeats from helping the flush happen + 100, TimeUnit.SECONDS, null) // prevent heartbeats from helping the flush happen when: "there is pending work it is completed before a flush" // processing this span will throw an exception, but it should be caught @@ -136,7 +134,7 @@ class TraceProcessingWorkerTest extends DDSpecification { throwingDispatcher, { false }, FAST_LANE, - 100, TimeUnit.SECONDS, null, null) // prevent heartbeats from helping the flush happen + 100, TimeUnit.SECONDS, null) // prevent heartbeats from helping the flush happen worker.start() when: "a trace is processed but can't be passed on" @@ -163,9 +161,7 @@ class TraceProcessingWorkerTest extends DDSpecification { } HealthMetrics healthMetrics = Mock(HealthMetrics) - // Span 1 - should be post-processed def span1 = DDSpan.create("test", 0, Mock(DDSpanContext) { - isRequiresPostProcessing() >> true getTraceCollector() >> Mock(PendingTrace) { getCurrentTimeNano() >> 0 } @@ -174,14 +170,13 @@ class TraceProcessingWorkerTest extends DDSpecification { // Span 2 - should NOT be post-processed def span2 = DDSpan.create("test", 0, Mock(DDSpanContext) { - isRequiresPostProcessing() >> false getTraceCollector() >> Mock(PendingTrace) { getCurrentTimeNano() >> 0 } }, []) def processedSpan2 = false - SpanPostProcessor spanPostProcessor = Mock(SpanPostProcessor) { + SpanPostProcessor.Holder.INSTANCE = Mock(SpanPostProcessor) { process(span1, _) >> { processedSpan1 = true } process(span2, _) >> { processedSpan2 = true } } @@ -189,7 +184,7 @@ class TraceProcessingWorkerTest extends DDSpecification { TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { false - }, FAST_LANE, 100, TimeUnit.SECONDS, null, spanPostProcessor) + }, FAST_LANE, 100, TimeUnit.SECONDS, null) worker.start() when: "traces are submitted" @@ -198,11 +193,12 @@ class TraceProcessingWorkerTest extends DDSpecification { then: "traces are passed through unless rejected on submission" conditions.eventually { - assert processedSpan1 == true - assert processedSpan2 == false + assert processedSpan1 + assert processedSpan2 } cleanup: + SpanPostProcessor.Holder.INSTANCE = SpanPostProcessor.Holder.NOOP worker.close() } @@ -217,7 +213,7 @@ class TraceProcessingWorkerTest extends DDSpecification { TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { false - }, FAST_LANE, 100, TimeUnit.SECONDS, null, null) + }, FAST_LANE, 100, TimeUnit.SECONDS, null) // prevent heartbeats from helping the flush happen worker.start() @@ -268,7 +264,7 @@ class TraceProcessingWorkerTest extends DDSpecification { TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { false - }, FAST_LANE, 100, TimeUnit.SECONDS, null, null) + }, FAST_LANE, 100, TimeUnit.SECONDS, null) worker.start() worker.close() int queueSize = 0 @@ -305,7 +301,7 @@ class TraceProcessingWorkerTest extends DDSpecification { return false } } - TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { true }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler, null) + TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { true }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler) worker.start() when: "traces are submitted" @@ -381,7 +377,7 @@ class TraceProcessingWorkerTest extends DDSpecification { return false } } - TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { false }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler, null) + TraceProcessingWorker worker = new TraceProcessingWorker(10, healthMetrics, countingDispatcher, { false }, FAST_LANE, 100, TimeUnit.SECONDS, singleSpanSampler) worker.start() when: "traces are submitted" diff --git a/internal-api/build.gradle b/internal-api/build.gradle index 9d0107ae991..0cd1ad54a37 100644 --- a/internal-api/build.gradle +++ b/internal-api/build.gradle @@ -213,7 +213,10 @@ excludedClassesCoverage += [ 'datadog.trace.util.stacktrace.StackTraceFrame', 'datadog.trace.api.iast.VulnerabilityMarks', 'datadog.trace.api.iast.securitycontrol.SecurityControlHelper', - 'datadog.trace.api.iast.securitycontrol.SecurityControl' + 'datadog.trace.api.iast.securitycontrol.SecurityControl', + // Trivial holder and no-op + 'datadog.trace.bootstrap.instrumentation.api.SpanPostProcessor.Holder', + 'datadog.trace.bootstrap.instrumentation.api.SpanPostProcessor.NoOpSpanPostProcessor', ] excludedClassesBranchCoverage = [ 'datadog.trace.api.ProductActivationConfig', diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index 6a0745c6745..eb4091e1156 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -292,7 +292,7 @@ public static String getHostName() { private final int appSecMaxStackTraces; private final int appSecMaxStackTraceDepth; private final boolean apiSecurityEnabled; - private final float apiSecurityRequestSampleRate; + private final float apiSecuritySampleDelay; private final IastDetectionMode iastDetectionMode; private final int iastMaxConcurrentRequests; @@ -1385,9 +1385,8 @@ PROFILING_DATADOG_PROFILER_ENABLED, isDatadogProfilerSafeInCurrentEnvironment()) apiSecurityEnabled = configProvider.getBoolean( API_SECURITY_ENABLED, DEFAULT_API_SECURITY_ENABLED, API_SECURITY_ENABLED_EXPERIMENTAL); - apiSecurityRequestSampleRate = - configProvider.getFloat( - API_SECURITY_REQUEST_SAMPLE_RATE, DEFAULT_API_SECURITY_REQUEST_SAMPLE_RATE); + apiSecuritySampleDelay = + configProvider.getFloat(API_SECURITY_SAMPLE_DELAY, DEFAULT_API_SECURITY_SAMPLE_DELAY); iastDebugEnabled = configProvider.getBoolean(IAST_DEBUG_ENABLED, DEFAULT_IAST_DEBUG_ENABLED); @@ -2763,8 +2762,8 @@ public boolean isApiSecurityEnabled() { return apiSecurityEnabled; } - public float getApiSecurityRequestSampleRate() { - return apiSecurityRequestSampleRate; + public float getApiSecuritySampleDelay() { + return apiSecuritySampleDelay; } public ProductActivation getIastActivation() { @@ -4799,8 +4798,6 @@ public String toString() { + appSecHttpBlockedTemplateJson + ", apiSecurityEnabled=" + apiSecurityEnabled - + ", apiSecurityRequestSampleRate=" - + apiSecurityRequestSampleRate + ", cwsEnabled=" + cwsEnabled + ", cwsTlsRefresh=" diff --git a/internal-api/src/main/java/datadog/trace/api/gateway/IGSpanInfo.java b/internal-api/src/main/java/datadog/trace/api/gateway/IGSpanInfo.java index 28ddbe27ad3..60bca47f184 100644 --- a/internal-api/src/main/java/datadog/trace/api/gateway/IGSpanInfo.java +++ b/internal-api/src/main/java/datadog/trace/api/gateway/IGSpanInfo.java @@ -16,8 +16,4 @@ public interface IGSpanInfo { void setRequestBlockingAction(Flow.Action.RequestBlockingAction rba); Flow.Action.RequestBlockingAction getRequestBlockingAction(); - - boolean isRequiresPostProcessing(); - - void setRequiresPostProcessing(boolean requiresPostProcessing); } diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/ExtractedSpan.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/ExtractedSpan.java index 84b9d55ecd1..59bf633b834 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/ExtractedSpan.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/ExtractedSpan.java @@ -159,11 +159,6 @@ public RequestBlockingAction getRequestBlockingAction() { return null; } - @Override - public boolean isRequiresPostProcessing() { - return false; - } - @Override public String toString() { return "ExtractedSpan{spanContext=" + this.spanContext + '}'; diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/ImmutableSpan.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/ImmutableSpan.java index 620d51b57a1..c7a63cdfccf 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/ImmutableSpan.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/ImmutableSpan.java @@ -187,7 +187,4 @@ public AgentSpan setMetaStruct(String field, Object value) { @Override public void setRequestBlockingAction(RequestBlockingAction rba) {} - - @Override - public void setRequiresPostProcessing(boolean requiresPostProcessing) {} } diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/NoopSpan.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/NoopSpan.java index 5d476c39b20..a4ea00f23de 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/NoopSpan.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/NoopSpan.java @@ -139,9 +139,4 @@ public TraceConfig traceConfig() { public boolean isOutbound() { return false; } - - @Override - public boolean isRequiresPostProcessing() { - return false; - } } diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/SpanPostProcessor.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/SpanPostProcessor.java new file mode 100644 index 00000000000..137ddedba1d --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/SpanPostProcessor.java @@ -0,0 +1,37 @@ +package datadog.trace.bootstrap.instrumentation.api; + +import java.util.function.BooleanSupplier; +import javax.annotation.Nonnull; + +/** + * Applies post-processing of spans before serialization. + * + *

Post-processing runs in TraceProcessingWorker thread. This provides the following properties: + *

  • Runs in a single thread. Post-processing for each span runs sequentially. + *
  • Runs after the request end, and does not block the main thread. + *
  • Runs at a point where the sampler decision is already available. + */ +public interface SpanPostProcessor { + + /** + * Post-processes a span, if needed. + * + *

    Implementations should use {@code timeoutCheck}, and if true, they should halt processing as + * much as possible. This method is guaranteed to be called even if post-processing of previous + * spans have timed out. + */ + void process(@Nonnull AgentSpan span, @Nonnull BooleanSupplier timeoutCheck); + + class Holder { + public static final SpanPostProcessor NOOP = new NoOpSpanPostProcessor(); + + // XXX: At the moment, a single post-processor can be registered, and only AppSec defines one. + // If other products add their own, we'll need to refactor this to support multiple processors. + public static volatile SpanPostProcessor INSTANCE = NOOP; + } + + class NoOpSpanPostProcessor implements SpanPostProcessor { + @Override + public void process(@Nonnull AgentSpan span, @Nonnull BooleanSupplier timeoutCheck) {} + } +}