diff --git a/cmd/localstack/xraydaemon.go b/cmd/localstack/xraydaemon.go index 686c2a6..f004503 100644 --- a/cmd/localstack/xraydaemon.go +++ b/cmd/localstack/xraydaemon.go @@ -47,6 +47,7 @@ type Daemon struct { receiverCount int processorCount int receiveBufferSize int + enableTelemetry bool // Boolean channel, set to true if error is received reading from Socket. done chan bool @@ -134,7 +135,12 @@ func initDaemon(config *cfg.Config, enableTelemetry bool) *Daemon { log.Infof("Using region: %v", aws.StringValue(awsConfig.Region)) if enableTelemetry { + // Telemetry can be quite verbose, for example 10+ PutTelemetryRecords requests for a single invocation telemetry.Init(awsConfig, session, config.ResourceARN, noMetadata) + } else { + // Telemetry cannot be nil because it is used internally in the X-Ray daemon, for example in batchprocessor.go + // We assume that SegmentReceived is never invoked internally in X-Ray because it enables postTelemetry. + telemetry.T = telemetry.GetTestTelemetry() } // If calculated number of buffer is lower than our default, use calculated one. Otherwise, use default value. @@ -152,6 +158,7 @@ func initDaemon(config *cfg.Config, enableTelemetry bool) *Daemon { receiverCount: parameterConfig.ReceiverRoutines, processorCount: processorCount, receiveBufferSize: receiveBufferSize, + enableTelemetry: enableTelemetry, done: make(chan bool), std: std, pool: bufferPool, @@ -180,12 +187,12 @@ func (d *Daemon) close() { // Signal routines to finish // This will push telemetry and customer segments in parallel d.std.Close() - if telemetry.T != nil { + if d.enableTelemetry { telemetry.T.Quit <- true } <-d.processor.Done - if telemetry.T != nil { + if d.enableTelemetry { <-telemetry.T.Done } @@ -231,7 +238,7 @@ func (d *Daemon) poll() { fallbackPointerUsed = true } rlen := d.read(bufPointer) - if rlen > 0 && telemetry.T != nil { + if rlen > 0 && d.enableTelemetry { telemetry.T.SegmentReceived(1) } if rlen == 0 { @@ -242,7 +249,7 @@ func (d *Daemon) poll() { } if fallbackPointerUsed { log.Warn("Segment dropped. Consider increasing memory limit") - if telemetry.T != nil { + if d.enableTelemetry { telemetry.T.SegmentSpillover(1) } continue @@ -257,7 +264,7 @@ func (d *Daemon) poll() { if len(slices[1]) == 0 { log.Warnf("Missing header or segment: %s", string(slices[0])) d.pool.Return(bufPointer) - if telemetry.T != nil { + if d.enableTelemetry { telemetry.T.SegmentRejected(1) } continue @@ -273,7 +280,7 @@ func (d *Daemon) poll() { default: log.Warnf("Invalid header: %s", string(header)) d.pool.Return(bufPointer) - if telemetry.T != nil { + if d.enableTelemetry { telemetry.T.SegmentRejected(1) } continue