Skip to content

Commit 061b18d

Browse files
committed
working writer
1 parent 8247b74 commit 061b18d

File tree

18 files changed

+655
-22
lines changed

18 files changed

+655
-22
lines changed

communication/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ dependencies {
44
implementation libs.slf4j
55

66
api project(':remote-config:remote-config-api')
7+
implementation project(':components:json')
78
implementation project(':remote-config:remote-config-core')
89
implementation project(':internal-api')
910
implementation project(':utils:container-utils')
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package datadog.communication.serialization.json;
2+
3+
import datadog.communication.serialization.EncodingCache;
4+
import datadog.communication.serialization.Mapper;
5+
import datadog.communication.serialization.WritableFormatter;
6+
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
7+
import java.nio.ByteBuffer;
8+
import java.util.Map;
9+
10+
public class JSONWritableFormatter implements WritableFormatter {
11+
12+
@Override
13+
public <T> boolean format(T message, Mapper<T> mapper) {
14+
return false;
15+
}
16+
17+
@Override
18+
public void flush() {
19+
20+
}
21+
22+
@Override
23+
public void writeNull() {
24+
25+
}
26+
27+
@Override
28+
public void writeBoolean(boolean value) {
29+
30+
}
31+
32+
@Override
33+
public void writeObject(Object value, EncodingCache encodingCache) {
34+
35+
}
36+
37+
@Override
38+
public void writeObjectString(Object value, EncodingCache encodingCache) {
39+
40+
}
41+
42+
@Override
43+
public void writeMap(Map<? extends CharSequence, ?> map, EncodingCache encodingCache) {
44+
45+
}
46+
47+
@Override
48+
public void writeString(CharSequence s, EncodingCache encodingCache) {
49+
50+
}
51+
52+
@Override
53+
public void writeUTF8(byte[] string, int offset, int length) {
54+
55+
}
56+
57+
@Override
58+
public void writeUTF8(byte[] string) {
59+
60+
}
61+
62+
@Override
63+
public void writeUTF8(UTF8BytesString string) {
64+
65+
}
66+
67+
@Override
68+
public void writeBinary(byte[] binary) {
69+
70+
}
71+
72+
@Override
73+
public void writeBinary(byte[] binary, int offset, int length) {
74+
75+
}
76+
77+
@Override
78+
public void startMap(int elementCount) {
79+
80+
}
81+
82+
@Override
83+
public void startStruct(int elementCount) {
84+
85+
}
86+
87+
@Override
88+
public void startArray(int elementCount) {
89+
90+
}
91+
92+
@Override
93+
public void writeBinary(ByteBuffer buffer) {
94+
95+
}
96+
97+
@Override
98+
public void writeInt(int value) {
99+
100+
}
101+
102+
@Override
103+
public void writeSignedInt(int value) {
104+
105+
}
106+
107+
@Override
108+
public void writeLong(long value) {
109+
110+
}
111+
112+
@Override
113+
public void writeUnsignedLong(long value) {
114+
115+
}
116+
117+
@Override
118+
public void writeSignedLong(long value) {
119+
120+
}
121+
122+
@Override
123+
public void writeFloat(float value) {
124+
125+
}
126+
127+
@Override
128+
public void writeDouble(double value) {
129+
130+
}
131+
}

dd-java-agent/agent-llmobs/src/main/java/datadog/trace/llmobs/domain/DDLLMObsSpan.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,17 @@
1111
import java.util.List;
1212
import java.util.Map;
1313
import javax.annotation.Nonnull;
14+
import org.slf4j.Logger;
15+
import org.slf4j.LoggerFactory;
1416

1517
public class DDLLMObsSpan implements LLMObsSpan {
1618

1719
private final AgentSpan span;
1820

1921
private boolean finished = false;
2022

23+
private static final Logger LOGGER = LoggerFactory.getLogger(DDLLMObsSpan.class);
24+
2125
public DDLLMObsSpan(
2226
@Nonnull String kind,
2327
String spanName,
@@ -71,6 +75,7 @@ public void annotateIO(
7175
if (finished) {
7276
return;
7377
}
78+
LOGGER.warn("ANNOTATE IN {} OUT {}", inputData, outputData);
7479
if (inputData != null && !inputData.isEmpty()) {
7580
this.span.setTag(LLMObsTags.LLMOBS_TAG_PREFIX + LLMObsTags.INPUT, inputData);
7681
}

dd-java-agent/instrumentation/wildfly-9/build.gradle

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ muzzle {
3333
pass {
3434
group = 'org.wildfly'
3535
module = 'wildfly-ee'
36-
versions = '[9.0.0.Final,)'
36+
versions = '[9.0.0.Final,35.0.0.Final]'
3737
excludeDependency 'org.jboss.xnio:*' // not related and causes issues with missing jar in maven repo
3838
}
3939
}
@@ -75,9 +75,10 @@ dependencies {
7575
wildflyLatestPoll group: 'org.wildfly', name: 'wildfly-dist', version: '+'
7676

7777
configurations.wildflyLatestPoll.resolve()
78-
def latestWildflyVersion = configurations.wildflyLatestPoll.resolvedConfiguration.getResolvedArtifacts().find {
79-
it.name == "wildfly-dist"
80-
}.moduleVersion.id.version
78+
// def latestWildflyVersion = configurations.wildflyLatestPoll.resolvedConfiguration.getResolvedArtifacts().find {
79+
// it.name == "wildfly-dist"
80+
// }.moduleVersion.id.version
81+
def latestWildflyVersion = "35.0.0.Final"
8182
wildflyLatestDepTest "wildfly:wildfly:$latestWildflyVersion@zip"
8283
latestDepForkedTest {
8384
configure {

dd-trace-core/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ dependencies {
6464
implementation project(':components:json')
6565
implementation project(':utils:container-utils')
6666
implementation project(':utils:socket-utils')
67+
68+
implementation group: 'org.msgpack', name: 'msgpack-core', version: '0.8.10'
69+
implementation group: 'org.msgpack', name: 'jackson-dataformat-msgpack', version: '0.8.10'
70+
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.10.0'
71+
6772
// for span exception debugging
6873
compileOnly project(':dd-java-agent:agent-debugger:debugger-bootstrap')
6974

dd-trace-core/src/main/java/datadog/trace/common/writer/DDAgentWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import java.util.concurrent.TimeUnit;
1818
import okhttp3.HttpUrl;
1919
import okhttp3.OkHttpClient;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
2022

2123
public class DDAgentWriter extends RemoteWriter {
2224

@@ -28,6 +30,8 @@ public static DDAgentWriterBuilder builder() {
2830

2931
public static class DDAgentWriterBuilder {
3032

33+
private static final Logger LOGGER = LoggerFactory.getLogger(DDAgentWriterBuilder.class);
34+
3135
String agentHost = DEFAULT_AGENT_HOST;
3236
int traceAgentPort = DEFAULT_TRACE_AGENT_PORT;
3337
String unixDomainSocket = null;
@@ -151,6 +155,8 @@ public DDAgentWriter build() {
151155
}
152156

153157
final DDAgentMapperDiscovery mapperDiscovery = new DDAgentMapperDiscovery(featureDiscovery);
158+
LOGGER.warn("ADDING MAPPER IN AGENT WRITER {} ", mapperDiscovery);
159+
154160
final PayloadDispatcher dispatcher =
155161
new PayloadDispatcherImpl(mapperDiscovery, agentApi, healthMetrics, monitoring);
156162
final TraceProcessingWorker traceProcessingWorker =

dd-trace-core/src/main/java/datadog/trace/common/writer/DDIntakeWriter.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import java.util.EnumMap;
1313
import java.util.Map;
1414
import java.util.concurrent.TimeUnit;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
1517

1618
public class DDIntakeWriter extends RemoteWriter {
1719

@@ -40,6 +42,8 @@ public static class DDIntakeWriterBuilder {
4042

4143
private SingleSpanSampler singleSpanSampler;
4244

45+
private static final Logger log = LoggerFactory.getLogger(DDIntakeWriterBuilder.class);
46+
4347
public DDIntakeWriterBuilder addTrack(final TrackType trackType, final RemoteApi intakeApi) {
4448
tracks.put(trackType, intakeApi);
4549
return this;
@@ -98,6 +102,7 @@ public DDIntakeWriterBuilder singleSpanSampler(SingleSpanSampler singleSpanSampl
98102
}
99103

100104
public DDIntakeWriter build() {
105+
log.debug("DDINTAKEWRITER TRACKS {}", tracks);
101106
if (tracks.isEmpty()) {
102107
throw new IllegalArgumentException("At least one track needs to be configured");
103108
}
@@ -111,7 +116,11 @@ public DDIntakeWriter build() {
111116
.map(this::createDispatcher)
112117
.toArray(PayloadDispatcher[]::new);
113118
dispatcher = new CompositePayloadDispatcher(dispatchers);
119+
for (PayloadDispatcher dispatcher2 : dispatchers) {
120+
log.debug("COMP DISPATCHER {}", dispatcher2);
121+
}
114122
}
123+
log.debug("DISPATCHER {}", dispatcher);
115124

116125
final TraceProcessingWorker traceProcessingWorker =
117126
new TraceProcessingWorker(

dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ public static Writer createWriter(
3939
final Sampler sampler,
4040
final SingleSpanSampler singleSpanSampler,
4141
final HealthMetrics healthMetrics) {
42-
return createWriter(
42+
Writer w = createWriter(
4343
config, commObjects, sampler, singleSpanSampler, healthMetrics, config.getWriterType());
44+
return w;
4445
}
4546

4647
public static Writer createWriter(
@@ -51,14 +52,20 @@ public static Writer createWriter(
5152
final HealthMetrics healthMetrics,
5253
String configuredType) {
5354

55+
log.debug("START CREATE WRITER {}", configuredType);
56+
5457
if (LOGGING_WRITER_TYPE.equals(configuredType)) {
58+
log.debug("STARTED WRITER LOGGING");
5559
return new LoggingWriter();
5660
} else if (PRINTING_WRITER_TYPE.equals(configuredType)) {
61+
log.debug("STARTED WRITER PRINTING");
5762
return new PrintingWriter(System.out, true);
5863
} else if (configuredType.startsWith(TRACE_STRUCTURE_WRITER_TYPE)) {
64+
log.debug("STARTED WRITER TRACE STRCT");
5965
return new TraceStructureWriter(
6066
Strings.replace(configuredType, TRACE_STRUCTURE_WRITER_TYPE, ""));
6167
} else if (configuredType.startsWith(MULTI_WRITER_TYPE)) {
68+
log.debug("STARTED WRITER MULTI");
6269
return new MultiWriter(
6370
config, commObjects, sampler, singleSpanSampler, healthMetrics, configuredType);
6471
}
@@ -82,7 +89,8 @@ public static Writer createWriter(
8289

8390
// The AgentWriter doesn't support the CI Visibility protocol. If CI Visibility is
8491
// enabled, check if we can use the IntakeWriter instead.
85-
if (DD_AGENT_WRITER_TYPE.equals(configuredType) && config.isCiVisibilityEnabled()) {
92+
if (DD_AGENT_WRITER_TYPE.equals(configuredType) && (config.isCiVisibilityEnabled())) {
93+
log.info("SUPPORTS EVP PROXY {}", featuresDiscovery.supportsEvpProxy());
8694
if (featuresDiscovery.supportsEvpProxy() || config.isCiVisibilityAgentlessEnabled()) {
8795
configuredType = DD_INTAKE_WRITER_TYPE;
8896
} else {
@@ -116,6 +124,13 @@ public static Writer createWriter(
116124
builder.addTrack(TrackType.CITESTCOV, coverageApi);
117125
}
118126

127+
log.debug("BEFORE ADDING LLM OBSERVER");
128+
// if (config.isLlmObsEnabled() && config.isLlmObsAgentlessEnabled()) {
129+
final RemoteApi llmobsApi = createDDIntakeRemoteApi(config, commObjects, featuresDiscovery, TrackType.LLMOBS);
130+
builder.addTrack(TrackType.LLMOBS, llmobsApi);
131+
log.debug("ADDED LLM OBSERVER");
132+
133+
119134
remoteWriter = builder.build();
120135

121136
} else { // configuredType == DDAgentWriter
@@ -171,26 +186,34 @@ private static RemoteApi createDDIntakeRemoteApi(
171186
SharedCommunicationObjects commObjects,
172187
DDAgentFeaturesDiscovery featuresDiscovery,
173188
TrackType trackType) {
174-
if (featuresDiscovery.supportsEvpProxy() && !config.isCiVisibilityAgentlessEnabled()) {
189+
// TODO make it so that it is agentless for the requested product and not both
190+
if (featuresDiscovery.supportsEvpProxy() && !config.isCiVisibilityAgentlessEnabled() && !config.isLlmObsAgentlessEnabled()) {
175191
return DDEvpProxyApi.builder()
176192
.httpClient(commObjects.okHttpClient)
177193
.agentUrl(commObjects.agentUrl)
178194
.evpProxyEndpoint(featuresDiscovery.getEvpProxyEndpoint())
179195
.trackType(trackType)
180196
.compressionEnabled(featuresDiscovery.supportsContentEncodingHeadersWithEvpProxy())
181197
.build();
182-
183198
} else {
184199
HttpUrl hostUrl = null;
200+
String llmObsAgentlessUrl = config.getLlMObsAgentlessUrl();
201+
log.debug("LLMOBS URL {}", llmObsAgentlessUrl);
202+
185203
if (config.getCiVisibilityAgentlessUrl() != null) {
186204
hostUrl = HttpUrl.get(config.getCiVisibilityAgentlessUrl());
187205
log.info("Using host URL '{}' to report CI Visibility traces in Agentless mode.", hostUrl);
206+
} else if (config.isLlmObsEnabled() && config.isLlmObsAgentlessEnabled() && llmObsAgentlessUrl != null && !llmObsAgentlessUrl.isEmpty()) {
207+
hostUrl = HttpUrl.get(llmObsAgentlessUrl);
208+
log.info("Using host URL '{}' to report LLM Obs traces in Agentless mode.", hostUrl);
188209
}
189-
return DDIntakeApi.builder()
210+
RemoteApi ddintake = DDIntakeApi.builder()
190211
.hostUrl(hostUrl)
191212
.apiKey(config.getApiKey())
192213
.trackType(trackType)
193214
.build();
215+
log.debug("CREATED DD INTAKE for track {} {}", trackType.name(), ddintake);
216+
return ddintake;
194217
}
195218
}
196219

dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/DDAgentApi.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ public void addResponseListener(final RemoteResponseListener listener) {
9090
public Response sendSerializedTraces(final Payload payload) {
9191
final int sizeInBytes = payload.sizeInBytes();
9292
String tracesEndpoint = featuresDiscovery.getTraceEndpoint();
93+
log.warn("SENDING AGENT PL {} {}", payload, tracesEndpoint);
94+
9395
if (null == tracesEndpoint) {
9496
featuresDiscovery.discoverIfOutdated();
9597
tracesEndpoint = featuresDiscovery.getTraceEndpoint();
@@ -124,9 +126,11 @@ public Response sendSerializedTraces(final Payload payload) {
124126
handleAgentChange(response.header(DATADOG_AGENT_STATE));
125127
if (response.code() != 200) {
126128
agentErrorCounter.incrementErrorCount(response.message(), payload.traceCount());
129+
log.error("FAILED TO SEND AGENT PL {}", response.message());
127130
countAndLogFailedSend(payload.traceCount(), sizeInBytes, response, null);
128131
return Response.failed(response.code());
129132
}
133+
log.info("SUCCESS SEND AGENT PL {}", response);
130134
countAndLogSuccessfulSend(payload.traceCount(), sizeInBytes);
131135
String responseString = null;
132136
try {
@@ -146,6 +150,7 @@ public Response sendSerializedTraces(final Payload payload) {
146150
}
147151
}
148152
} catch (final IOException e) {
153+
log.error("FAILED TO SEND AGENT PL", e);
149154
countAndLogFailedSend(payload.traceCount(), sizeInBytes, null, e);
150155
return Response.failed(e);
151156
}

dd-trace-core/src/main/java/datadog/trace/common/writer/ddintake/DDEvpProxyApi.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ private DDEvpProxyApi(
128128
public Response sendSerializedTraces(Payload payload) {
129129
final int sizeInBytes = payload.sizeInBytes();
130130

131+
log.debug("SENDING PL {} TO TRACK {}", payload, trackType);
131132
Request.Builder builder =
132133
new Request.Builder()
133134
.url(proxiedApiUrl)

0 commit comments

Comments
 (0)