diff --git a/cmd/metrics/metric.go b/cmd/metrics/metric.go index c47f9656..5a5693bd 100644 --- a/cmd/metrics/metric.go +++ b/cmd/metrics/metric.go @@ -36,7 +36,7 @@ type MetricFrame struct { } // ProcessEvents is responsible for producing metrics from raw perf events -func ProcessEvents(perfEvents [][]byte, eventGroupDefinitions []GroupDefinition, metricDefinitions []MetricDefinition, processes []Process, previousTimestamp float64, metadata Metadata, outputDir string) (metricFrames []MetricFrame, timeStamp float64, err error) { +func ProcessEvents(perfEvents [][]byte, eventGroupDefinitions []GroupDefinition, metricDefinitions []MetricDefinition, processes []Process, previousTimestamp float64, metadata Metadata) (metricFrames []MetricFrame, timeStamp float64, err error) { var eventFrames []EventFrame if eventFrames, err = GetEventFrames(perfEvents, eventGroupDefinitions, flagScope, flagGranularity, metadata); err != nil { // arrange the events into groups err = fmt.Errorf("failed to put perf events into groups: %v", err) diff --git a/cmd/metrics/metrics.go b/cmd/metrics/metrics.go index c0bbceff..d0dc6d08 100644 --- a/cmd/metrics/metrics.go +++ b/cmd/metrics/metrics.go @@ -1212,14 +1212,15 @@ func runPerf(myTarget target.Target, noRoot bool, processes []Process, cmd *exec } // must manually terminate perf in cgroup scope when a timeout is specified and/or need to refresh cgroups startPerfTimestamp := time.Now() - var timeout int + var cgroupTimeout int if flagScope == scopeCgroup && (flagDuration != 0 || len(flagCidList) == 0) { if flagDuration > 0 && flagDuration < flagRefresh { - timeout = flagDuration + cgroupTimeout = flagDuration } else { - timeout = flagRefresh + cgroupTimeout = flagRefresh } } + // Start a goroutine to wait for and then process perf output // Use a timer to determine when we received an entire frame of events from perf // The timer will expire when no lines (events) have been received from perf for more than 100ms. This // works because perf writes the events to stderr in a burst every collection interval, e.g., 5 seconds. @@ -1230,22 +1231,17 @@ func runPerf(myTarget target.Target, noRoot bool, processes []Process, cmd *exec frameCount := 0 stopAnonymousFuncChannel := make(chan bool) go func() { + stop := false for { select { case <-t1.C: // waits for timer to expire case <-stopAnonymousFuncChannel: // wait for signal to exit the goroutine - return + stop = true // exit the loop } - if len(outputLines) != 0 { - if flagWriteEventsToFile { - if err = writeEventsToFile(outputDir+"/"+myTarget.GetName()+"_"+"events.json", outputLines); err != nil { - err = fmt.Errorf("failed to write events to raw file: %v", err) - slog.Error(err.Error()) - return - } - } + if !stop && len(outputLines) != 0 { + // process the events var metricFrames []MetricFrame - if metricFrames, frameTimestamp, err = ProcessEvents(outputLines, eventGroupDefinitions, metricDefinitions, processes, frameTimestamp, metadata, outputDir); err != nil { + if metricFrames, frameTimestamp, err = ProcessEvents(outputLines, eventGroupDefinitions, metricDefinitions, processes, frameTimestamp, metadata); err != nil { slog.Warn(err.Error()) outputLines = [][]byte{} // empty it continue @@ -1254,57 +1250,58 @@ func runPerf(myTarget target.Target, noRoot bool, processes []Process, cmd *exec frameCount += 1 metricFrames[i].FrameCount = frameCount } + // send the metrics frames out to be printed frameChannel <- metricFrames - outputLines = [][]byte{} // empty it + // write the events to a file + if flagWriteEventsToFile { + if err = writeEventsToFile(outputDir+"/"+myTarget.GetName()+"_"+"events.json", outputLines); err != nil { + err = fmt.Errorf("failed to write events to raw file: %v", err) + slog.Error(err.Error()) + return + } + } + // empty the outputLines + outputLines = [][]byte{} } - if timeout != 0 && int(time.Since(startPerfTimestamp).Seconds()) > timeout { - err = localCommand.Process.Signal(os.Interrupt) - if err != nil { - err = fmt.Errorf("failed to terminate perf: %v", err) - slog.Error(err.Error()) + // for cgroup scope, terminate perf if timeout is reached + if flagScope == scopeCgroup { + if stop || (cgroupTimeout != 0 && int(time.Since(startPerfTimestamp).Seconds()) > cgroupTimeout) { + err = localCommand.Process.Signal(os.Interrupt) + if err != nil { + err = fmt.Errorf("failed to terminate perf: %v", err) + slog.Error(err.Error()) + } } } + if stop { + break + } } + // signal that the goroutine is done + stopAnonymousFuncChannel <- true }() - // read perf output + // receive perf output done := false for !done { select { - case err := <-scriptErrorChannel: + case err := <-scriptErrorChannel: // if there is an error running perf, it comes here if err != nil { slog.Error("error from perf", slog.String("error", err.Error())) } - done = true - case exitCode := <-exitcodeChannel: + done = true // exit the loop + case exitCode := <-exitcodeChannel: // when perf exits, the exit code comes to this channel slog.Debug("perf exited", slog.Int("exit code", exitCode)) - done = true - case line := <-stderrChannel: + done = true // exit the loop + case line := <-stderrChannel: // perf output comes in on this channel, one line at a time t1.Stop() t1.Reset(100 * time.Millisecond) // 100ms is somewhat arbitrary, but seems to work + // accumulate the lines, they will be processed in the goroutine when the timer expires outputLines = append(outputLines, []byte(line)) } } t1.Stop() // send signal to exit the goroutine - defer func() { stopAnonymousFuncChannel <- true }() - // process any remaining events - if len(outputLines) != 0 { - if flagWriteEventsToFile { - if err = writeEventsToFile(outputDir+"/"+myTarget.GetName()+"_"+"events.json", outputLines); err != nil { - err = fmt.Errorf("failed to write events to raw file: %v", err) - slog.Error(err.Error()) - return - } - } - var metricFrames []MetricFrame - if metricFrames, frameTimestamp, err = ProcessEvents(outputLines, eventGroupDefinitions, metricDefinitions, processes, frameTimestamp, metadata, outputDir); err != nil { - slog.Error(err.Error()) - return - } - for i := range metricFrames { - frameCount += 1 - metricFrames[i].FrameCount = frameCount - } - frameChannel <- metricFrames - } + stopAnonymousFuncChannel <- true + // wait for the goroutine to exit + <-stopAnonymousFuncChannel }