Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/database/sql/internal/dsn.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func isValidHostnameStart(s string) bool {
}
// Should contain hostname-like patterns
return strings.Contains(s, ".") || strings.Contains(s, ":") ||
strings.Contains(s, "/") || s == strings.TrimSpace(s)
strings.Contains(s, "/") || s == strings.TrimSpace(s)
}

// sanitizeMySQLPasswords sanitizes passwords in MySQL DSN format (user:pass@tcp...).
Expand Down
69 changes: 54 additions & 15 deletions ddtrace/tracer/civisibility_payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@ package tracer

import (
"bytes"
"sync/atomic"
"time"

"github.com/tinylib/msgp/msgp"

"github.com/DataDog/dd-trace-go/v2/internal/civisibility/constants"
"github.com/DataDog/dd-trace-go/v2/internal/civisibility/utils"
"github.com/DataDog/dd-trace-go/v2/internal/civisibility/utils/telemetry"
"github.com/DataDog/dd-trace-go/v2/internal/globalconfig"
"github.com/DataDog/dd-trace-go/v2/internal/log"
"github.com/DataDog/dd-trace-go/v2/internal/version"
"github.com/tinylib/msgp/msgp"
)

// ciVisibilityPayload represents a payload specifically designed for CI Visibility events.
// It embeds the generic payload structure and adds methods to handle CI Visibility specific data.
// It uses the generic payload interface and adds methods to handle CI Visibility specific data.
type ciVisibilityPayload struct {
*payloadV04
payload payload
serializationTime time.Duration
}

Expand All @@ -36,18 +36,17 @@ type ciVisibilityPayload struct {
// Returns:
//
// An error if encoding the event fails.
func (p *ciVisibilityPayload) push(event *ciVisibilityEvent) error {
p.buf.Grow(event.Msgsize())
func (p *ciVisibilityPayload) push(event *ciVisibilityEvent) (size int, err error) {
p.payload.grow(event.Msgsize())
startTime := time.Now()
defer func() {
p.serializationTime += time.Since(startTime)
}()
if err := msgp.Encode(&p.buf, event); err != nil {
return err
if err := msgp.Encode(p.payload, event); err != nil {
return 0, err
}
atomic.AddUint32(&p.count, 1)
p.updateHeader()
return nil
p.payload.recordItem() // This already calls updateHeader() internally.
return p.size(), nil
}

// newCiVisibilityPayload creates a new instance of civisibilitypayload.
Expand All @@ -57,7 +56,7 @@ func (p *ciVisibilityPayload) push(event *ciVisibilityEvent) error {
// A pointer to a newly initialized civisibilitypayload instance.
func newCiVisibilityPayload() *ciVisibilityPayload {
log.Debug("ciVisibilityPayload: creating payload instance")
return &ciVisibilityPayload{newPayload(), 0}
return &ciVisibilityPayload{payload: newPayload(traceProtocolV04), serializationTime: 0}
}

// getBuffer retrieves the complete body of the CI Visibility payload, including metadata.
Expand All @@ -73,11 +72,11 @@ func newCiVisibilityPayload() *ciVisibilityPayload {
// An error if reading from the buffer or encoding the payload fails.
func (p *ciVisibilityPayload) getBuffer(config *config) (*bytes.Buffer, error) {
startTime := time.Now()
log.Debug("ciVisibilityPayload: .getBuffer (count: %d)", p.itemCount())
log.Debug("ciVisibilityPayload: .getBuffer (count: %d)", p.payload.stats().itemCount)

// Create a buffer to read the current payload
payloadBuf := new(bytes.Buffer)
if _, err := payloadBuf.ReadFrom(p.payloadV04); err != nil {
if _, err := payloadBuf.ReadFrom(p.payload); err != nil {
return nil, err
}

Expand All @@ -90,7 +89,7 @@ func (p *ciVisibilityPayload) getBuffer(config *config) (*bytes.Buffer, error) {
return nil, err
}

telemetry.EndpointPayloadEventsCount(telemetry.TestCycleEndpointType, float64(p.itemCount()))
telemetry.EndpointPayloadEventsCount(telemetry.TestCycleEndpointType, float64(p.payload.stats().itemCount))
telemetry.EndpointPayloadBytes(telemetry.TestCycleEndpointType, float64(encodedBuf.Len()))
telemetry.EndpointEventsSerializationMs(telemetry.TestCycleEndpointType, float64((p.serializationTime + time.Since(startTime)).Milliseconds()))
return encodedBuf, nil
Expand Down Expand Up @@ -150,3 +149,43 @@ func (p *ciVisibilityPayload) writeEnvelope(env string, events []byte) *ciTestCy

return visibilityPayload
}

// stats returns the current stats of the payload.
func (p *ciVisibilityPayload) stats() payloadStats {
return p.payload.stats()
}

// size returns the payload size in bytes (for backward compatibility).
func (p *ciVisibilityPayload) size() int {
return p.payload.size()
}

// itemCount returns the number of items available in the stream (for backward compatibility).
func (p *ciVisibilityPayload) itemCount() int {
return p.payload.itemCount()
}

// protocol returns the protocol version of the payload.
func (p *ciVisibilityPayload) protocol() float64 {
return p.payload.protocol()
}

// clear empties the payload buffers.
func (p *ciVisibilityPayload) clear() {
p.payload.clear()
}

// reset sets up the payload to be read a second time.
func (p *ciVisibilityPayload) reset() {
p.payload.reset()
}

// Read implements io.Reader by reading from the underlying payload.
func (p *ciVisibilityPayload) Read(b []byte) (n int, err error) {
return p.payload.Read(b)
}

// Close implements io.Closer by closing the underlying payload.
func (p *ciVisibilityPayload) Close() error {
return p.payload.Close()
}
13 changes: 5 additions & 8 deletions ddtrace/tracer/civisibility_payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"io"
"strconv"
"strings"
"sync/atomic"
"testing"

"github.com/DataDog/dd-trace-go/v2/internal/civisibility/constants"
Expand Down Expand Up @@ -55,8 +54,9 @@ func TestCiVisibilityPayloadIntegrity(t *testing.T) {
want.Reset()
err := msgp.Encode(want, allEvents)
assert.NoError(err)
assert.Equal(want.Len(), p.size())
assert.Equal(p.itemCount(), len(allEvents))
stats := p.stats()
assert.Equal(want.Len(), stats.size)
assert.Equal(len(allEvents), stats.itemCount)

got, err := io.ReadAll(p)
assert.NoError(err)
Expand Down Expand Up @@ -152,15 +152,12 @@ func benchmarkCiVisibilityPayloadThroughput(count int) func(*testing.B) {
b.ReportAllocs()
b.ResetTimer()
reset := func() {
p.header = make([]byte, 8)
p.off = 8
atomic.StoreUint32(&p.count, 0)
p.buf.Reset()
p = newCiVisibilityPayload()
}
for i := 0; i < b.N; i++ {
reset()
for _, event := range events {
for p.size() < payloadMaxLimit {
for p.stats().size < payloadMaxLimit {
p.push(event)
}
}
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/tracer/civisibility_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ func newCiVisibilityTransport(config *config) *ciVisibilityTransport {
// Returns:
//
// An io.ReadCloser for reading the response body, and an error if the operation fails.
func (t *ciVisibilityTransport) send(p *payloadV04) (body io.ReadCloser, err error) {
ciVisibilityPayload := &ciVisibilityPayload{p, 0}
func (t *ciVisibilityTransport) send(p payload) (body io.ReadCloser, err error) {
ciVisibilityPayload := &ciVisibilityPayload{payload: p, serializationTime: 0}
buffer, bufferErr := ciVisibilityPayload.getBuffer(t.config)
if bufferErr != nil {
return nil, fmt.Errorf("cannot create buffer payload: %v", bufferErr)
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/tracer/civisibility_transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,12 @@ func runTransportTest(t *testing.T, agentless, shouldSetAPIKey bool) {
p := newCiVisibilityPayload()
for _, t := range tc.payload {
for _, span := range t {
err := p.push(getCiVisibilityEvent(span))
_, err := p.push(getCiVisibilityEvent(span))
assert.NoError(err)
}
}

_, err := transport.send(p.payloadV04)
_, err := transport.send(p.payload)
assert.NoError(err)
}
assert.Equal(hits, len(testCases))
Expand Down
12 changes: 7 additions & 5 deletions ddtrace/tracer/civisibility_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,11 @@ func (w *ciVisibilityTraceWriter) add(trace []*Span) {
telemetry.EventsEnqueueForSerialization()
for _, s := range trace {
cvEvent := getCiVisibilityEvent(s)
if err := w.payload.push(cvEvent); err != nil {
size, err := w.payload.push(cvEvent)
if err != nil {
log.Error("ciVisibilityTraceWriter: Error encoding msgpack: %s", err.Error())
}
if w.payload.size() > agentlessPayloadSizeLimit {
if size > agentlessPayloadSizeLimit {
w.flush()
}
}
Expand All @@ -82,7 +83,7 @@ func (w *ciVisibilityTraceWriter) stop() {
// flush sends the current payload to the transport. It ensures that the payload is reset
// and the resources are freed after the flush operation is completed.
func (w *ciVisibilityTraceWriter) flush() {
if w.payload.itemCount() == 0 {
if w.payload.stats().itemCount == 0 {
return
}

Expand Down Expand Up @@ -113,9 +114,10 @@ func (w *ciVisibilityTraceWriter) flush() {
telemetry.EndpointPayloadRequests(telemetry.TestCycleEndpointType, requestCompressedType)

for attempt := 0; attempt <= w.config.sendRetries; attempt++ {
size, count = p.size(), p.itemCount()
stats := p.stats()
size, count = stats.size, stats.itemCount
log.Debug("ciVisibilityTraceWriter: sending payload: size: %d events: %d\n", size, count)
_, err = w.config.transport.send(p.payloadV04)
_, err = w.config.transport.send(p.payload)
if err == nil {
log.Debug("ciVisibilityTraceWriter: sent events after %d attempts", attempt+1)
return
Expand Down
4 changes: 2 additions & 2 deletions ddtrace/tracer/civisibility_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ type failingCiVisibilityTransport struct {
assert *assert.Assertions
}

func (t *failingCiVisibilityTransport) send(p *payloadV04) (io.ReadCloser, error) {
func (t *failingCiVisibilityTransport) send(p payload) (io.ReadCloser, error) {
t.sendAttempts++

ciVisibilityPayload := &ciVisibilityPayload{p, 0}
ciVisibilityPayload := &ciVisibilityPayload{payload: p, serializationTime: 0}

var events ciVisibilityEvents
err := msgp.Decode(ciVisibilityPayload, &events)
Expand Down
7 changes: 4 additions & 3 deletions ddtrace/tracer/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"golang.org/x/mod/semver"

pb "github.com/DataDog/datadog-agent/pkg/proto/pbgo/trace"
"github.com/tinylib/msgp/msgp"

"github.com/DataDog/dd-trace-go/v2/ddtrace/ext"
"github.com/DataDog/dd-trace-go/v2/internal"
appsecconfig "github.com/DataDog/dd-trace-go/v2/internal/appsec/config"
Expand All @@ -40,7 +42,6 @@ import (
"github.com/DataDog/dd-trace-go/v2/internal/telemetry"
"github.com/DataDog/dd-trace-go/v2/internal/traceprof"
"github.com/DataDog/dd-trace-go/v2/internal/version"
"github.com/tinylib/msgp/msgp"

"github.com/DataDog/datadog-go/v5/statsd"
)
Expand Down Expand Up @@ -1505,7 +1506,7 @@ func (t *dummyTransport) ObfuscationVersion() int {
return t.obfVersion
}

func (t *dummyTransport) send(p *payloadV04) (io.ReadCloser, error) {
func (t *dummyTransport) send(p payload) (io.ReadCloser, error) {
traces, err := decode(p)
if err != nil {
return nil, err
Expand All @@ -1521,7 +1522,7 @@ func (t *dummyTransport) endpoint() string {
return "http://localhost:9/v0.4/traces"
}

func decode(p *payloadV04) (spanLists, error) {
func decode(p payloadReader) (spanLists, error) {
var traces spanLists
err := msgp.Decode(p, &traces)
return traces, err
Expand Down
Loading
Loading