Skip to content

HTTP response schema collection and data classification #8840

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import datadog.trace.api.gateway.IGSpanInfo;
import datadog.trace.api.gateway.RequestContext;
import datadog.trace.api.gateway.RequestContextSlot;
import datadog.trace.api.http.StoredBodySupplier;
import datadog.trace.api.naming.SpanNaming;
import datadog.trace.bootstrap.ActiveSubsystems;
import datadog.trace.bootstrap.instrumentation.api.AgentPropagation;
Expand Down Expand Up @@ -435,7 +436,8 @@ private Flow<Void> callIGCallbackRequestHeaders(AgentSpan span, REQUEST_CARRIER
IGKeyClassifier.create(
requestContext,
cbp.getCallback(EVENTS.requestHeader()),
cbp.getCallback(EVENTS.requestHeaderDone()));
cbp.getCallback(EVENTS.requestHeaderDone()),
null);
if (null != igKeyClassifier) {
getter.forEachKey(carrier, igKeyClassifier);
return igKeyClassifier.done();
Expand Down Expand Up @@ -491,7 +493,8 @@ public <RESP> Flow<Void> callIGCallbackResponseAndHeaders(
IGKeyClassifier.create(
requestContext,
cbp.getCallback(EVENTS.responseHeader()),
cbp.getCallback(EVENTS.responseHeaderDone()));
cbp.getCallback(EVENTS.responseHeaderDone()),
cbp.getCallback(EVENTS.responseBodyDone()));
if (null != igKeyClassifier) {
contextVisitor.forEachKey(carrier, igKeyClassifier);
return igKeyClassifier.done();
Expand Down Expand Up @@ -575,7 +578,8 @@ protected static final class IGKeyClassifier implements AgentPropagation.KeyClas
public static IGKeyClassifier create(
RequestContext requestContext,
TriConsumer<RequestContext, String, String> headerCallback,
Function<RequestContext, Flow<Void>> doneCallback) {
Function<RequestContext, Flow<Void>> doneCallback,
BiFunction<RequestContext, StoredBodySupplier, Flow<Void>> bodyDoneCallback) {
if (null == requestContext || null == headerCallback) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public interface KnownAddresses {
Address<Object> RESPONSE_BODY_OBJECT = new Address<>("server.response.body");

/** First chars of HTTP response body */
Address<String> RESPONSE_BODY_RAW = new Address<>("server.response.body.raw");
Address<CharSequence> RESPONSE_BODY_RAW = new Address<>("server.response.body.raw");

/** Reponse headers excluding cookies */
Address<Map<String, List<String>>> RESPONSE_HEADERS_NO_COOKIES =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public class AppSecRequestContext implements DataBundle, Closeable {
private String inferredClientIp;

private volatile StoredBodySupplier storedRequestBodySupplier;
private volatile StoredBodySupplier storedResponseBodySupplier;
private String dbType;

private int responseStatus;
Expand All @@ -106,6 +107,7 @@ public class AppSecRequestContext implements DataBundle, Closeable {
private boolean rawReqBodyPublished;
private boolean convertedReqBodyPublished;
private boolean respDataPublished;
private boolean rawResBodyPublished;
private boolean pathParamsPublished;
private volatile Map<String, String> derivatives;

Expand Down Expand Up @@ -451,6 +453,10 @@ void setStoredRequestBodySupplier(StoredBodySupplier storedRequestBodySupplier)
this.storedRequestBodySupplier = storedRequestBodySupplier;
}

void setStoredResponseBodySupplier(StoredBodySupplier storedResponseBodySupplier) {
this.storedResponseBodySupplier = storedResponseBodySupplier;
}

public String getDbType() {
return dbType;
}
Expand Down Expand Up @@ -503,10 +509,18 @@ public boolean isRespDataPublished() {
return respDataPublished;
}

public boolean isRawResBodyPublished() {
return rawResBodyPublished;
}

public void setRespDataPublished(boolean respDataPublished) {
this.respDataPublished = respDataPublished;
}

public void setRawResBodyPublished(boolean rawResBodyPublished) {
this.rawResBodyPublished = rawResBodyPublished;
}

/**
* Updates the current used usr.id
*
Expand Down Expand Up @@ -580,6 +594,15 @@ public CharSequence getStoredRequestBody() {
return storedRequestBodySupplier.get();
}

/** @return the portion of the body read so far, if any */
public CharSequence getStoredResponseBody() {
StoredBodySupplier storedResponseBodySupplier = this.storedResponseBodySupplier;
if (storedResponseBodySupplier == null) {
return null;
}
return storedResponseBodySupplier.get();
}

public void reportEvents(Collection<AppSecEvent> appSecEvents) {
for (AppSecEvent event : appSecEvents) {
StandardizedLogging.attackDetected(log, event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class GatewayBridge {
// subscriber cache
private volatile DataSubscriberInfo initialReqDataSubInfo;
private volatile DataSubscriberInfo rawRequestBodySubInfo;
private volatile DataSubscriberInfo rawResponseBodySubInfo;
private volatile DataSubscriberInfo requestBodySubInfo;
private volatile DataSubscriberInfo pathParamsSubInfo;
private volatile DataSubscriberInfo respDataSubInfo;
Expand Down Expand Up @@ -141,6 +142,8 @@ public void init() {
subscriptionService.registerCallback(EVENTS.responseStarted(), this::onResponseStarted);
subscriptionService.registerCallback(EVENTS.responseHeader(), this::onResponseHeader);
subscriptionService.registerCallback(EVENTS.responseHeaderDone(), this::onResponseHeaderDone);
subscriptionService.registerCallback(EVENTS.responseBodyStart(), this::onResponseBodyStart);
subscriptionService.registerCallback(EVENTS.responseBodyDone(), this::onResponseBodyDone);
subscriptionService.registerCallback(EVENTS.grpcServerMethod(), this::onGrpcServerMethod);
subscriptionService.registerCallback(
EVENTS.grpcServerRequestMessage(), this::onGrpcServerRequestMessage);
Expand Down Expand Up @@ -602,7 +605,7 @@ private Flow<Void> onRequestBodyDone(RequestContext ctx_, StoredBodySupplier sup
}

CharSequence bodyContent = supplier.get();
if (bodyContent == null || bodyContent.length() == 0) {
if (bodyContent.length() == 0) {
return NoopFlow.INSTANCE;
}
DataBundle bundle = new SingletonDataBundle<>(KnownAddresses.REQUEST_BODY_RAW, bodyContent);
Expand All @@ -615,6 +618,38 @@ private Flow<Void> onRequestBodyDone(RequestContext ctx_, StoredBodySupplier sup
}
}

private Flow<Void> onResponseBodyDone(RequestContext ctx_, StoredBodySupplier supplier) {
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
if (ctx == null || ctx.isRawResBodyPublished()) {
return NoopFlow.INSTANCE;
}
ctx.setRawResBodyPublished(true);

while (true) {
DataSubscriberInfo subInfo = rawResponseBodySubInfo;
if (subInfo == null) {
subInfo = producerService.getDataSubscribers(KnownAddresses.RESPONSE_BODY_RAW);
rawResponseBodySubInfo = subInfo;
}
if (subInfo == null || subInfo.isEmpty()) {
return NoopFlow.INSTANCE;
}

CharSequence bodyContent = supplier.get();
if (bodyContent.length() == 0) {
return NoopFlow.INSTANCE;
}
DataBundle bundle =
new SingletonDataBundle<>(KnownAddresses.RESPONSE_BODY_OBJECT, bodyContent);
try {
GatewayContext gwCtx = new GatewayContext(false);
return producerService.publishDataEvent(subInfo, ctx, bundle, gwCtx);
} catch (ExpiredSubscriberInfoException e) {
rawResponseBodySubInfo = null;
}
}
}

private Flow<Void> onRequestPathParams(RequestContext ctx_, Map<String, ?> data) {
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
if (ctx == null || ctx.isPathParamsPublished()) {
Expand Down Expand Up @@ -651,6 +686,16 @@ private Void onRequestBodyStart(RequestContext ctx_, StoredBodySupplier supplier
return null;
}

private Void onResponseBodyStart(RequestContext ctx_, StoredBodySupplier supplier) {
AppSecRequestContext ctx = ctx_.getData(RequestContextSlot.APPSEC);
if (ctx == null) {
return null;
}

ctx.setStoredResponseBodySupplier(supplier);
return null;
}

private Flow<AppSecRequestContext> onRequestStarted() {
if (!AppSecSystem.isActive()) {
return RequestContextSupplier.EMPTY;
Expand Down Expand Up @@ -962,8 +1007,12 @@ private Flow<Void> maybePublishResponseData(AppSecRequestContext ctx) {

MapDataBundle bundle =
MapDataBundle.of(
KnownAddresses.RESPONSE_STATUS, String.valueOf(ctx.getResponseStatus()),
KnownAddresses.RESPONSE_HEADERS_NO_COOKIES, ctx.getResponseHeaders());
KnownAddresses.RESPONSE_STATUS,
String.valueOf(ctx.getResponseStatus()),
KnownAddresses.RESPONSE_HEADERS_NO_COOKIES,
ctx.getResponseHeaders(),
KnownAddresses.RESPONSE_BODY_OBJECT,
ctx.getStoredResponseBody());

while (true) {
DataSubscriberInfo subInfo = respDataSubInfo;
Expand Down Expand Up @@ -1058,6 +1107,9 @@ private static class IGAppSecEventDependencies {
KnownAddresses.REQUEST_BODY_RAW, l(EVENTS.requestBodyStart(), EVENTS.requestBodyDone()));
DATA_DEPENDENCIES.put(KnownAddresses.REQUEST_PATH_PARAMS, l(EVENTS.requestPathParams()));
DATA_DEPENDENCIES.put(KnownAddresses.REQUEST_BODY_OBJECT, l(EVENTS.requestBodyProcessed()));
DATA_DEPENDENCIES.put(
KnownAddresses.RESPONSE_BODY_RAW,
l(EVENTS.responseBodyStart(), EVENTS.responseBodyDone()));
}

private static Collection<datadog.trace.api.gateway.EventType<?>> l(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import com.datadog.appsec.report.AppSecEvent
import com.datadog.appsec.report.AppSecEventWrapper
import datadog.trace.api.ProductTraceSource
import datadog.trace.api.config.GeneralConfig
import static datadog.trace.api.config.IastConfig.IAST_DEDUPLICATION_ENABLED
import datadog.trace.api.function.TriConsumer
import datadog.trace.api.function.TriFunction
import datadog.trace.api.gateway.BlockResponseFunction
Expand Down Expand Up @@ -114,6 +113,8 @@ class GatewayBridgeSpecification extends DDSpecification {
BiFunction<RequestContext, String, Flow<Void>> shellCmdCB
BiFunction<RequestContext, String, Flow<Void>> userCB
TriFunction<RequestContext, LoginEvent, String, Flow<Void>> loginEventCB
BiFunction<RequestContext, StoredBodySupplier, Void> responseBodyStartCB
BiFunction<RequestContext, StoredBodySupplier, Flow<Void>> responseBodyDoneCB

WafMetricCollector wafMetricCollector = Mock(WafMetricCollector)

Expand Down Expand Up @@ -452,6 +453,8 @@ class GatewayBridgeSpecification extends DDSpecification {
1 * ig.registerCallback(EVENTS.responseStarted(), _) >> { responseStartedCB = it[1]; null }
1 * ig.registerCallback(EVENTS.responseHeader(), _) >> { respHeaderCB = it[1]; null }
1 * ig.registerCallback(EVENTS.responseHeaderDone(), _) >> { respHeadersDoneCB = it[1]; null }
1 * ig.registerCallback(EVENTS.responseBodyStart(), _) >> { responseBodyStartCB = it[1]; null }
1 * ig.registerCallback(EVENTS.responseBodyDone(), _) >> { responseBodyDoneCB = it[1]; null }
1 * ig.registerCallback(EVENTS.grpcServerMethod(), _) >> { grpcServerMethodCB = it[1]; null }
1 * ig.registerCallback(EVENTS.grpcServerRequestMessage(), _) >> { grpcServerRequestMessageCB = it[1]; null }
1 * ig.registerCallback(EVENTS.graphqlServerRequestMessage(), _) >> { graphqlServerRequestMessageCB = it[1]; null }
Expand Down Expand Up @@ -933,7 +936,7 @@ class GatewayBridgeSpecification extends DDSpecification {
}

@Override
def <T> T getOrCreateMetaStructTop(String key, Function<String, T> defaultValue) {
<T> T getOrCreateMetaStructTop(String key, Function<String, T> defaultValue) {
return null
}

Expand Down Expand Up @@ -991,7 +994,7 @@ class GatewayBridgeSpecification extends DDSpecification {
getTraceSegment() >> traceSegment
}
final spanInfo = Mock(AgentSpan) {
getTags() >> ['http.route':'/']
getTags() >> ['http.route': '/']
}

when:
Expand Down Expand Up @@ -1127,7 +1130,7 @@ class GatewayBridgeSpecification extends DDSpecification {
}
}

void "test onLoginFailure (#mode)"() {
void "test onLoginFailure"() {
setup:
eventDispatcher.getDataSubscribers(_) >> nonEmptyDsInfo

Expand Down Expand Up @@ -1222,4 +1225,95 @@ class GatewayBridgeSpecification extends DDSpecification {
1 * traceSegment.setTagTop(Tags.PROPAGATED_TRACE_SOURCE, ProductTraceSource.ASM)
}

void 'forwards response body start events and stores the supplier'() {
StoredBodySupplier supplier = Stub()

setup:
supplier.get() >> 'response content'

expect:
ctx.data.storedResponseBody == null

when:
responseBodyStartCB.apply(ctx, supplier)

then:
ctx.data.storedResponseBody == 'response content'
}

void 'forwards response body done events and distributes the body contents'() {
DataBundle bundle
GatewayContext gatewayContext
StoredBodySupplier supplier = Stub()

setup:
supplier.get() >> 'response body content'
eventDispatcher.getDataSubscribers({ KnownAddresses.RESPONSE_BODY_RAW in it }) >> nonEmptyDsInfo
eventDispatcher.publishDataEvent(nonEmptyDsInfo, ctx.data, _ as DataBundle, _ as GatewayContext) >>
{ bundle = it[2]; gatewayContext = it[3]; NoopFlow.INSTANCE }

when:
responseBodyDoneCB.apply(ctx, supplier)

then:
bundle.get(KnownAddresses.RESPONSE_BODY_OBJECT) == 'response body content'
gatewayContext.isTransient == false
gatewayContext.isRasp == false
}

void 'response body does not get published twice'() {
StoredBodySupplier supplier = Stub()
Flow flow

given:
supplier.get() >> 'response body content'

when:
ctx.data.setRawResBodyPublished(true)
flow = responseBodyDoneCB.apply(ctx, supplier)

then:
flow == NoopFlow.INSTANCE
0 * eventDispatcher.getDataSubscribers(KnownAddresses.RESPONSE_BODY_RAW)
}

void 'response body with empty content is not published'() {
StoredBodySupplier supplier = Stub()
Flow flow

given:
supplier.get() >> ''

when:
flow = responseBodyDoneCB.apply(ctx, supplier)

then:
flow == NoopFlow.INSTANCE
1 * eventDispatcher.getDataSubscribers({ KnownAddresses.RESPONSE_BODY_RAW in it }) >> nonEmptyDsInfo
0 * eventDispatcher.publishDataEvent(_, _, _, _)
}

void 'response data includes response body when available'() {
setup:
eventDispatcher.getDataSubscribers({ KnownAddresses.RESPONSE_STATUS in it }) >> nonEmptyDsInfo
ctx.data.responseStatus = 200
ctx.data.addResponseHeader('Content-Type', 'application/json')

StoredBodySupplier supplier = Stub()
supplier.get() >> '{"status":"success"}'
ctx.data.storedResponseBodySupplier = supplier

DataBundle bundle

when:
respHeadersDoneCB.apply(ctx)

then:
1 * eventDispatcher.publishDataEvent(nonEmptyDsInfo, ctx.data, _ as DataBundle, _ as GatewayContext) >>
{ dsInfo, appSecCtx, db, gwCtx -> bundle = db; NoopFlow.INSTANCE }
bundle.get(KnownAddresses.RESPONSE_STATUS) == '200'
bundle.get(KnownAddresses.RESPONSE_HEADERS_NO_COOKIES) == ['content-type': ['application/json']]
bundle.get(KnownAddresses.RESPONSE_BODY_OBJECT) == '{"status":"success"}'
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ class SpringBootSmokeTest extends AbstractAppSecServerSmokeTest {

List<String> command = new ArrayList<>()
command.add(javaPath())
// TODO delete when ure done testing
command.add("-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005")
command.addAll(defaultJavaProperties)
command.addAll(defaultAppSecProperties)
command.addAll((String[]) ["-jar", springBootShadowJar, "--server.port=${httpPort}"])
Expand Down
Loading
Loading