Skip to content

New API Security sampling algorithm #8178

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Mar 27, 2025
Merged
2 changes: 1 addition & 1 deletion .circleci/config.continue.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ instrumentation_modules: &instrumentation_modules "dd-java-agent/instrumentation
debugger_modules: &debugger_modules "dd-java-agent/agent-debugger|dd-java-agent/agent-bootstrap|dd-java-agent/agent-builder|internal-api|communication|dd-trace-core"
profiling_modules: &profiling_modules "dd-java-agent/agent-profiling"

default_system_tests_commit: &default_system_tests_commit 69a5e874384dd256e2e3f42fc1c95807a67efbe6
default_system_tests_commit: &default_system_tests_commit 1ef00a34ad1f83ae999887e510ef1ea1c27b151b

parameters:
nightly:
Expand Down
1 change: 1 addition & 0 deletions dd-java-agent/appsec/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ ext {
'com.datadog.appsec.config.AppSecFeatures.ApiSecurity',
'com.datadog.appsec.config.AppSecFeatures.AutoUserInstrum',
'com.datadog.appsec.event.ReplaceableEventProducerService',
'com.datadog.appsec.api.security.ApiSecuritySampler.NoOp',
]
excludedClassesBranchCoverage = [
'com.datadog.appsec.gateway.GatewayBridge',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.datadog.appsec;

import com.datadog.appsec.api.security.ApiSecurityRequestSampler;
import com.datadog.appsec.api.security.ApiSecuritySampler;
import com.datadog.appsec.api.security.ApiSecuritySamplerImpl;
import com.datadog.appsec.api.security.AppSecSpanPostProcessor;
import com.datadog.appsec.blocking.BlockingServiceImpl;
import com.datadog.appsec.config.AppSecConfigService;
import com.datadog.appsec.config.AppSecConfigServiceImpl;
Expand All @@ -21,6 +23,7 @@
import datadog.trace.api.telemetry.ProductChange;
import datadog.trace.api.telemetry.ProductChangeCollector;
import datadog.trace.bootstrap.ActiveSubsystems;
import datadog.trace.bootstrap.instrumentation.api.SpanPostProcessor;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -66,7 +69,17 @@ private static void doStart(SubscriptionService gw, SharedCommunicationObjects s
EventDispatcher eventDispatcher = new EventDispatcher();
REPLACEABLE_EVENT_PRODUCER.replaceEventProducerService(eventDispatcher);

ApiSecurityRequestSampler requestSampler = new ApiSecurityRequestSampler(config);
ApiSecuritySampler requestSampler;
if (Config.get().isApiSecurityEnabled()) {
requestSampler = new ApiSecuritySamplerImpl();
// When DD_API_SECURITY_ENABLED=true, ths post-processor is set even when AppSec is inactive.
// This should be low overhead since the post-processor exits early if there's no AppSec
// context.
SpanPostProcessor.Holder.INSTANCE =
new AppSecSpanPostProcessor(requestSampler, REPLACEABLE_EVENT_PRODUCER);
} else {
requestSampler = new ApiSecuritySampler.NoOp();
}

ConfigurationPoller configurationPoller = sco.configurationPoller(config);
// may throw and abort startup
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.datadog.appsec.api.security;

import com.datadog.appsec.gateway.AppSecRequestContext;
import javax.annotation.Nonnull;

public interface ApiSecuritySampler {
/**
* Prepare a request context for later sampling decision. This method should be called at request
* end, and is thread-safe. If a request can potentially be sampled, this method will return true.
* If this method returns true, the caller MUST call {@link #releaseOne()} once the context is not
* needed anymore.
*/
boolean preSampleRequest(final @Nonnull AppSecRequestContext ctx);

/** Get the final sampling decision. This method is NOT required to be thread-safe. */
boolean sampleRequest(AppSecRequestContext ctx);

/** Release one permit for the sampler. This must be called after processing a span. */
void releaseOne();

final class NoOp implements ApiSecuritySampler {
@Override
public boolean preSampleRequest(@Nonnull AppSecRequestContext ctx) {
return false;
}

@Override
public boolean sampleRequest(AppSecRequestContext ctx) {
return false;
}

@Override
public void releaseOne() {}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package com.datadog.appsec.api.security;

import com.datadog.appsec.gateway.AppSecRequestContext;
import datadog.trace.api.Config;
import datadog.trace.api.time.SystemTimeSource;
import datadog.trace.api.time.TimeSource;
import java.util.Deque;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Semaphore;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ApiSecuritySamplerImpl implements ApiSecuritySampler {

private static final Logger log = LoggerFactory.getLogger(ApiSecuritySamplerImpl.class);

/**
* A maximum number of request contexts we'll keep open past the end of request at any given time.
* This will avoid excessive memory usage in case of a high number of concurrent requests, and
* should also prevent memory leaks.
*/
private static final int MAX_POST_PROCESSING_TASKS = 4;
/** Maximum number of entries in the access map. */
private static final int MAX_SIZE = 4096;
/** Mapping from endpoint hash to last access timestamp in millis. */
private final ConcurrentHashMap<Long, Long> accessMap;
/** Deque of endpoint hashes ordered by access time. Oldest is always first. */
private final Deque<Long> accessDeque;

private final long expirationTimeInMs;
private final int capacity;
private final TimeSource timeSource;
private final Semaphore counter = new Semaphore(MAX_POST_PROCESSING_TASKS);

public ApiSecuritySamplerImpl() {
this(
MAX_SIZE,
(long) (Config.get().getApiSecuritySampleDelay() * 1_000),
SystemTimeSource.INSTANCE);
}

public ApiSecuritySamplerImpl(
int capacity, long expirationTimeInMs, @Nonnull TimeSource timeSource) {
this.capacity = capacity;
this.expirationTimeInMs = expirationTimeInMs;
this.accessMap = new ConcurrentHashMap<>();
this.accessDeque = new ConcurrentLinkedDeque<>();
this.timeSource = timeSource;
}

@Override
public boolean preSampleRequest(final @Nonnull AppSecRequestContext ctx) {
final String route = ctx.getRoute();
if (route == null) {
return false;
}
final String method = ctx.getMethod();
if (method == null) {
return false;
}
final int statusCode = ctx.getResponseStatus();
if (statusCode <= 0) {
return false;
}
long hash = computeApiHash(route, method, statusCode);
ctx.setApiSecurityEndpointHash(hash);
if (!isApiAccessExpired(hash)) {
return false;
}
if (counter.tryAcquire()) {
log.debug("API security sampling is required for this request (presampled)");
ctx.setKeepOpenForApiSecurityPostProcessing(true);
return true;
}
return false;
}

/** Get the final sampling decision. This method is NOT thread-safe. */
@Override
public boolean sampleRequest(AppSecRequestContext ctx) {
if (ctx == null) {
return false;
}
final Long hash = ctx.getApiSecurityEndpointHash();
if (hash == null) {
// This should never happen, it should have been short-circuited before.
return false;
}
return updateApiAccessIfExpired(hash);
}

@Override
public void releaseOne() {
counter.release();
}

private boolean updateApiAccessIfExpired(final long hash) {
final long currentTime = timeSource.getCurrentTimeMillis();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Same remark about current time millis as in #8178 (comment), currentTimeMillis is not monotonic, and is subject to DST, NTP syncs, leap seconds. nanoTime might be better suited.


Long lastAccess = accessMap.get(hash);
if (lastAccess != null && currentTime - lastAccess < expirationTimeInMs) {
return false;
}

if (accessMap.put(hash, currentTime) == null) {
accessDeque.addLast(hash);
// If we added a new entry, we perform purging.
cleanupExpiredEntries(currentTime);
} else {
// This is now the most recently accessed entry.
accessDeque.remove(hash);
accessDeque.addLast(hash);
}

return true;
}

private boolean isApiAccessExpired(final long hash) {
final long currentTime = timeSource.getCurrentTimeMillis();
final Long lastAccess = accessMap.get(hash);
return lastAccess == null || currentTime - lastAccess >= expirationTimeInMs;
}

private void cleanupExpiredEntries(final long currentTime) {
// Purge all expired entries.
while (!accessDeque.isEmpty()) {
final Long oldestHash = accessDeque.peekFirst();
if (oldestHash == null) {
// Should never happen
continue;
}

final Long lastAccessTime = accessMap.get(oldestHash);
if (lastAccessTime == null) {
// Should never happen
continue;
}

if (currentTime - lastAccessTime < expirationTimeInMs) {
// The oldest hash is up-to-date, so stop here.
break;
}

accessDeque.pollFirst();
accessMap.remove(oldestHash);
}

// If we went over capacity, remove the oldest entries until we are within the limit.
// This should never be more than 1.
final int toRemove = accessMap.size() - this.capacity;
for (int i = 0; i < toRemove; i++) {
Long oldestHash = accessDeque.pollFirst();
if (oldestHash != null) {
accessMap.remove(oldestHash);
}
}
}

private long computeApiHash(final String route, final String method, final int statusCode) {
long result = 17;
result = 31 * result + route.hashCode();
result = 31 * result + method.hashCode();
result = 31 * result + statusCode;
return result;
}
}
Loading