Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/metrics/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type MetricFrame struct {
}

// ProcessEvents is responsible for producing metrics from raw perf events
func ProcessEvents(perfEvents [][]byte, eventGroupDefinitions []GroupDefinition, metricDefinitions []MetricDefinition, processes []Process, previousTimestamp float64, metadata Metadata, outputDir string) (metricFrames []MetricFrame, timeStamp float64, err error) {
func ProcessEvents(perfEvents [][]byte, eventGroupDefinitions []GroupDefinition, metricDefinitions []MetricDefinition, processes []Process, previousTimestamp float64, metadata Metadata) (metricFrames []MetricFrame, timeStamp float64, err error) {
var eventFrames []EventFrame
if eventFrames, err = GetEventFrames(perfEvents, eventGroupDefinitions, flagScope, flagGranularity, metadata); err != nil { // arrange the events into groups
err = fmt.Errorf("failed to put perf events into groups: %v", err)
Expand Down
89 changes: 43 additions & 46 deletions cmd/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1212,14 +1212,15 @@ func runPerf(myTarget target.Target, noRoot bool, processes []Process, cmd *exec
}
// must manually terminate perf in cgroup scope when a timeout is specified and/or need to refresh cgroups
startPerfTimestamp := time.Now()
var timeout int
var cgroupTimeout int
if flagScope == scopeCgroup && (flagDuration != 0 || len(flagCidList) == 0) {
if flagDuration > 0 && flagDuration < flagRefresh {
timeout = flagDuration
cgroupTimeout = flagDuration
} else {
timeout = flagRefresh
cgroupTimeout = flagRefresh
}
}
// Start a goroutine to wait for and then process perf output
// Use a timer to determine when we received an entire frame of events from perf
// The timer will expire when no lines (events) have been received from perf for more than 100ms. This
// works because perf writes the events to stderr in a burst every collection interval, e.g., 5 seconds.
Expand All @@ -1230,22 +1231,17 @@ func runPerf(myTarget target.Target, noRoot bool, processes []Process, cmd *exec
frameCount := 0
stopAnonymousFuncChannel := make(chan bool)
go func() {
stop := false
for {
select {
case <-t1.C: // waits for timer to expire
case <-stopAnonymousFuncChannel: // wait for signal to exit the goroutine
return
stop = true // exit the loop
}
if len(outputLines) != 0 {
if flagWriteEventsToFile {
if err = writeEventsToFile(outputDir+"/"+myTarget.GetName()+"_"+"events.json", outputLines); err != nil {
err = fmt.Errorf("failed to write events to raw file: %v", err)
slog.Error(err.Error())
return
}
}
if !stop && len(outputLines) != 0 {
// process the events
var metricFrames []MetricFrame
if metricFrames, frameTimestamp, err = ProcessEvents(outputLines, eventGroupDefinitions, metricDefinitions, processes, frameTimestamp, metadata, outputDir); err != nil {
if metricFrames, frameTimestamp, err = ProcessEvents(outputLines, eventGroupDefinitions, metricDefinitions, processes, frameTimestamp, metadata); err != nil {
slog.Warn(err.Error())
outputLines = [][]byte{} // empty it
continue
Expand All @@ -1254,57 +1250,58 @@ func runPerf(myTarget target.Target, noRoot bool, processes []Process, cmd *exec
frameCount += 1
metricFrames[i].FrameCount = frameCount
}
// send the metrics frames out to be printed
frameChannel <- metricFrames
outputLines = [][]byte{} // empty it
// write the events to a file
if flagWriteEventsToFile {
if err = writeEventsToFile(outputDir+"/"+myTarget.GetName()+"_"+"events.json", outputLines); err != nil {
err = fmt.Errorf("failed to write events to raw file: %v", err)
slog.Error(err.Error())
return
}
}
// empty the outputLines
outputLines = [][]byte{}
}
if timeout != 0 && int(time.Since(startPerfTimestamp).Seconds()) > timeout {
err = localCommand.Process.Signal(os.Interrupt)
if err != nil {
err = fmt.Errorf("failed to terminate perf: %v", err)
slog.Error(err.Error())
// for cgroup scope, terminate perf if timeout is reached
if flagScope == scopeCgroup {
if stop || (cgroupTimeout != 0 && int(time.Since(startPerfTimestamp).Seconds()) > cgroupTimeout) {
err = localCommand.Process.Signal(os.Interrupt)
if err != nil {
err = fmt.Errorf("failed to terminate perf: %v", err)
slog.Error(err.Error())
}
}
}
if stop {
break
}
}
// signal that the goroutine is done
stopAnonymousFuncChannel <- true
}()
// read perf output
// receive perf output
done := false
for !done {
select {
case err := <-scriptErrorChannel:
case err := <-scriptErrorChannel: // if there is an error running perf, it comes here
if err != nil {
slog.Error("error from perf", slog.String("error", err.Error()))
}
done = true
case exitCode := <-exitcodeChannel:
done = true // exit the loop
case exitCode := <-exitcodeChannel: // when perf exits, the exit code comes to this channel
slog.Debug("perf exited", slog.Int("exit code", exitCode))
done = true
case line := <-stderrChannel:
done = true // exit the loop
case line := <-stderrChannel: // perf output comes in on this channel, one line at a time
t1.Stop()
t1.Reset(100 * time.Millisecond) // 100ms is somewhat arbitrary, but seems to work
// accumulate the lines, they will be processed in the goroutine when the timer expires
outputLines = append(outputLines, []byte(line))
}
}
t1.Stop()
// send signal to exit the goroutine
defer func() { stopAnonymousFuncChannel <- true }()
// process any remaining events
if len(outputLines) != 0 {
if flagWriteEventsToFile {
if err = writeEventsToFile(outputDir+"/"+myTarget.GetName()+"_"+"events.json", outputLines); err != nil {
err = fmt.Errorf("failed to write events to raw file: %v", err)
slog.Error(err.Error())
return
}
}
var metricFrames []MetricFrame
if metricFrames, frameTimestamp, err = ProcessEvents(outputLines, eventGroupDefinitions, metricDefinitions, processes, frameTimestamp, metadata, outputDir); err != nil {
slog.Error(err.Error())
return
}
for i := range metricFrames {
frameCount += 1
metricFrames[i].FrameCount = frameCount
}
frameChannel <- metricFrames
}
stopAnonymousFuncChannel <- true
// wait for the goroutine to exit
<-stopAnonymousFuncChannel
}