diff --git a/cmd/localstack/main.go b/cmd/localstack/main.go index 8a4187a..baafc37 100644 --- a/cmd/localstack/main.go +++ b/cmd/localstack/main.go @@ -15,17 +15,18 @@ import ( ) type LsOpts struct { - InteropPort string - RuntimeEndpoint string - RuntimeId string - InitTracingPort string - User string - CodeArchives string - HotReloadingPaths []string - EnableDnsServer string - LocalstackIP string - InitLogLevel string - EdgePort string + InteropPort string + RuntimeEndpoint string + RuntimeId string + InitTracingPort string + User string + CodeArchives string + HotReloadingPaths []string + EnableDnsServer string + LocalstackIP string + InitLogLevel string + EdgePort string + EnableXRayTelemetry string } func GetEnvOrDie(env string) string { @@ -48,10 +49,11 @@ func InitLsOpts() *LsOpts { InitLogLevel: GetenvWithDefault("LOCALSTACK_INIT_LOG_LEVEL", "debug"), EdgePort: GetenvWithDefault("EDGE_PORT", "4566"), // optional or empty - CodeArchives: os.Getenv("LOCALSTACK_CODE_ARCHIVES"), - HotReloadingPaths: strings.Split(GetenvWithDefault("LOCALSTACK_HOT_RELOADING_PATHS", ""), ","), - EnableDnsServer: os.Getenv("LOCALSTACK_ENABLE_DNS_SERVER"), - LocalstackIP: os.Getenv("LOCALSTACK_HOSTNAME"), + CodeArchives: os.Getenv("LOCALSTACK_CODE_ARCHIVES"), + HotReloadingPaths: strings.Split(GetenvWithDefault("LOCALSTACK_HOT_RELOADING_PATHS", ""), ","), + EnableDnsServer: os.Getenv("LOCALSTACK_ENABLE_DNS_SERVER"), + EnableXRayTelemetry: os.Getenv("LOCALSTACK_ENABLE_XRAY_TELEMETRY"), + LocalstackIP: os.Getenv("LOCALSTACK_HOSTNAME"), } } @@ -67,6 +69,7 @@ func UnsetLsEnvs() { "LOCALSTACK_CODE_ARCHIVES", "LOCALSTACK_HOT_RELOADING_PATHS", "LOCALSTACK_ENABLE_DNS_SERVER", + "LOCALSTACK_ENABLE_XRAY_TELEMETRY", "LOCALSTACK_INIT_LOG_LEVEL", // Docker container ID "HOSTNAME", @@ -146,7 +149,7 @@ func main() { // xray daemon xrayConfig := initConfig("http://" + lsOpts.LocalstackIP + ":" + lsOpts.EdgePort) - d := initDaemon(xrayConfig) + d := initDaemon(xrayConfig, lsOpts.EnableXRayTelemetry == "1") sandbox.AddShutdownFunc(func() { log.Debugln("Shutting down xray daemon") d.stop() diff --git a/cmd/localstack/xraydaemon.go b/cmd/localstack/xraydaemon.go index 8d2b768..686c2a6 100644 --- a/cmd/localstack/xraydaemon.go +++ b/cmd/localstack/xraydaemon.go @@ -85,7 +85,7 @@ func initConfig(endpoint string) *cfg.Config { return xrayConfig } -func initDaemon(config *cfg.Config) *Daemon { +func initDaemon(config *cfg.Config, enableTelemetry bool) *Daemon { if logFile != "" { var fileWriter io.Writer if *config.Logging.LogRotation { @@ -133,8 +133,9 @@ func initDaemon(config *cfg.Config) *Daemon { awsConfig, session := conn.GetAWSConfigSession(&conn.Conn{}, config, config.RoleARN, config.Region, noMetadata) log.Infof("Using region: %v", aws.StringValue(awsConfig.Region)) - log.Debugf("ARN of the AWS resource running the daemon: %v", config.ResourceARN) - telemetry.Init(awsConfig, session, config.ResourceARN, noMetadata) + if enableTelemetry { + telemetry.Init(awsConfig, session, config.ResourceARN, noMetadata) + } // If calculated number of buffer is lower than our default, use calculated one. Otherwise, use default value. parameterConfig.Processor.BatchSize = util.GetMinIntValue(parameterConfig.Processor.BatchSize, buffers) @@ -179,10 +180,14 @@ func (d *Daemon) close() { // Signal routines to finish // This will push telemetry and customer segments in parallel d.std.Close() - telemetry.T.Quit <- true + if telemetry.T != nil { + telemetry.T.Quit <- true + } <-d.processor.Done - <-telemetry.T.Done + if telemetry.T != nil { + <-telemetry.T.Done + } log.Debugf("Trace segment: received: %d, truncated: %d, processed: %d", atomic.LoadUint64(&d.count), d.std.TruncatedCount(), d.processor.ProcessedCount()) log.Debugf("Shutdown finished. Current epoch in nanoseconds: %v", time.Now().UnixNano()) @@ -226,7 +231,7 @@ func (d *Daemon) poll() { fallbackPointerUsed = true } rlen := d.read(bufPointer) - if rlen > 0 { + if rlen > 0 && telemetry.T != nil { telemetry.T.SegmentReceived(1) } if rlen == 0 { @@ -237,7 +242,9 @@ func (d *Daemon) poll() { } if fallbackPointerUsed { log.Warn("Segment dropped. Consider increasing memory limit") - telemetry.T.SegmentSpillover(1) + if telemetry.T != nil { + telemetry.T.SegmentSpillover(1) + } continue } else if rlen == -1 { return @@ -250,7 +257,9 @@ func (d *Daemon) poll() { if len(slices[1]) == 0 { log.Warnf("Missing header or segment: %s", string(slices[0])) d.pool.Return(bufPointer) - telemetry.T.SegmentRejected(1) + if telemetry.T != nil { + telemetry.T.SegmentRejected(1) + } continue } @@ -264,7 +273,9 @@ func (d *Daemon) poll() { default: log.Warnf("Invalid header: %s", string(header)) d.pool.Return(bufPointer) - telemetry.T.SegmentRejected(1) + if telemetry.T != nil { + telemetry.T.SegmentRejected(1) + } continue }