Skip to content

Default NAP security-violation logs to be gzipped individually via custom processor #1125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/collector/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package collector

import (
"github.com/nginx/agent/v3/internal/collector/containermetricsreceiver"
"github.com/nginx/agent/v3/internal/collector/logsgzipprocessor"
nginxreceiver "github.com/nginx/agent/v3/internal/collector/nginxossreceiver"
"github.com/nginx/agent/v3/internal/collector/nginxplusreceiver"

Expand Down Expand Up @@ -104,6 +105,7 @@ func createProcessorFactories() map[component.Type]processor.Factory {
redactionprocessor.NewFactory(),
resourceprocessor.NewFactory(),
transformprocessor.NewFactory(),
logsgzipprocessor.NewFactory(),
}

processors := make(map[component.Type]processor.Factory)
Expand Down
2 changes: 1 addition & 1 deletion internal/collector/factories_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestOTelComponentFactoriesDefault(t *testing.T) {
assert.NotNil(t, factories, "factories should not be nil")

assert.Len(t, factories.Receivers, 6)
assert.Len(t, factories.Processors, 8)
assert.Len(t, factories.Processors, 9)
assert.Len(t, factories.Exporters, 4)
assert.Len(t, factories.Extensions, 3)
assert.Empty(t, factories.Connectors)
Expand Down
81 changes: 81 additions & 0 deletions internal/collector/logsgzipprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Logs gzip processor

The Logs gzip processor gzips the input log record body, updating the log record in-place.

For metrics and traces, this will just be a pass-through as it does not implement related interfaces.

## Configuration

No configuration needed.

## Benchmarking

We performed benchmark measuring the performance of serial and concurrent operations (more practical) of this processor, with and without the `sync.Pool`. Here are the results:

```
Concurrent Run: Without Sync Pool
goos: darwin
goarch: arm64
pkg: github.com/nginx/agent/v3/internal/collector/logsgzipprocessor
cpu: Apple M2 Pro
BenchmarkGzipProcessor_Concurrent-12 24 45279866 ns/op 817791582 B/op 24727 allocs/op
PASS
ok github.com/nginx/agent/v3/internal/collector/logsgzipprocessor 1.939s

Concurrent Run: With Sync Pool

goos: darwin
goarch: arm64
pkg: github.com/nginx/agent/v3/internal/collector/logsgzipprocessor
cpu: Apple M2 Pro
BenchmarkGzipProcessor_Concurrent-12 147 9383213 ns/op 10948640 B/op 7820 allocs/op
PASS
ok github.com/nginx/agent/v3/internal/collector/logsgzipprocessor 2.026s

————

Serial Run: Without Sync Pool

goos: darwin
goarch: arm64
pkg: github.com/nginx/agent/v3/internal/collector/logsgzipprocessor
cpu: Apple M2 Pro
BenchmarkGzipProcessor/SmallRecords-12 100 12048268 ns/op 81898890 B/op 2537 allocs/op
BenchmarkGzipProcessor/MediumRecords-12 100 13143269 ns/op 82027307 B/op 2541 allocs/op
BenchmarkGzipProcessor/LargeRecords-12 91 15912399 ns/op 83198992 B/op 2580 allocs/op
BenchmarkGzipProcessor/ManySmallRecords-12 2 807707542 ns/op 8143237656 B/op 243348 allocs/op


Serial Run: With Sync Pool

goos: darwin
goarch: arm64
pkg: github.com/nginx/agent/v3/internal/collector/logsgzipprocessor
cpu: Apple M2 Pro
BenchmarkGzipProcessor/SmallRecords-12 205 7304839 ns/op 1027942 B/op 783 allocs/op
BenchmarkGzipProcessor/MediumRecords-12 182 7336266 ns/op 1078050 B/op 784 allocs/op
BenchmarkGzipProcessor/LargeRecords-12 132 9646940 ns/op 2057059 B/op 815 allocs/op
BenchmarkGzipProcessor/ManySmallRecords-12 5 239726258 ns/op 6883977 B/op 73679 allocs/op
PASS
```


To run this benchmark yourself with syncpool implementation, you can run the tests in `processor_benchmark_test.go` in with the `sync.Pool` mode.

To compare benchmark without syncpool, you can use this code block in `processor.go` and comment the existing `gzipCompress` function, and run `processor_benchmark_test.go` :

```
func (p *logsGzipProcessor) gzipCompress(data []byte) ([]byte, error) {
var buf bytes.Buffer
w := gzip.NewWriter(&buf)
_, err := w.Write(data)
if err != nil {
return nil, err
}
if err = w.Close(); err != nil {
return nil, err
}

return buf.Bytes(), nil
}
```
182 changes: 182 additions & 0 deletions internal/collector/logsgzipprocessor/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
// Copyright (c) F5, Inc.
//
// This source code is licensed under the Apache License, Version 2.0 license found in the
// LICENSE file in the root directory of this source tree.
package logsgzipprocessor

import (
"bytes"
"compress/gzip"
"context"
"fmt"
"io"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/processor"
"go.uber.org/multierr"
"go.uber.org/zap"
)

// nolint: ireturn
func NewFactory() processor.Factory {
return processor.NewFactory(
component.MustNewType("logsgzip"),
func() component.Config {
return &struct{}{}
},
processor.WithLogs(createLogsGzipProcessor, component.StabilityLevelBeta),
)
}

// nolint: ireturn
func createLogsGzipProcessor(_ context.Context,
settings processor.Settings,
cfg component.Config,
logs consumer.Logs,
) (processor.Logs, error) {
logger := settings.Logger
logger.Info("Creating logs gzip processor")

return newLogsGzipProcessor(logs, settings), nil
}

// logsGzipProcessor is a custom-processor implementation for compressing individual log records into
// gzip format. This can be used to reduce the size of log records and improve performance when processing
// large log volumes. This processor will be used by default for agent interacting with NGINX One
// console (https://docs.nginx.com/nginx-one/about/).
type logsGzipProcessor struct {
nextConsumer consumer.Logs
// We use sync.Pool to efficiently manage and reuse gzip.Writer instances within this processor.
// Otherwise, creating a new compressor for every log record would result in frequent memory allocations
// and increased garbage collection overhead, especially under high-throughput workload like this one.
// By pooling these objects, we minimize allocation churn, reduce GC pressure, and improve overall performance.
pool *sync.Pool
settings processor.Settings
}

type GzipWriter interface {
Write(p []byte) (int, error)
Close() error
Reset(w io.Writer)
}

func newLogsGzipProcessor(logs consumer.Logs, settings processor.Settings) *logsGzipProcessor {
return &logsGzipProcessor{
nextConsumer: logs,
pool: &sync.Pool{
New: func() any {
return gzip.NewWriter(nil)
},
},
settings: settings,
}
}

func (p *logsGzipProcessor) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
var errs error
resourceLogs := ld.ResourceLogs()
for i := range resourceLogs.Len() {
scopeLogs := resourceLogs.At(i).ScopeLogs()
for j := range scopeLogs.Len() {
err := p.processLogRecords(scopeLogs.At(j).LogRecords())
if err != nil {
errs = multierr.Append(errs, err)
}
}
}
if errs != nil {
return fmt.Errorf("failed processing log records: %w", errs)
}

return p.nextConsumer.ConsumeLogs(ctx, ld)
}

func (p *logsGzipProcessor) processLogRecords(logRecords plog.LogRecordSlice) error {
var errs error
// Filter out unsupported data types in the log before processing
logRecords.RemoveIf(func(lr plog.LogRecord) bool {
body := lr.Body()
// Keep only STRING or BYTES types
if body.Type() != pcommon.ValueTypeStr &&
body.Type() != pcommon.ValueTypeBytes {
p.settings.Logger.Debug("Skipping log record with unsupported body type", zap.Any("type", body.Type()))
return true
}

return false
})
// Process remaining valid records
for k := range logRecords.Len() {
record := logRecords.At(k)
body := record.Body()
var data []byte
//nolint:exhaustive // Already filtered out other types with RemoveIf
switch body.Type() {
case pcommon.ValueTypeStr:
data = []byte(body.Str())
case pcommon.ValueTypeBytes:
data = body.Bytes().AsRaw()
}
gzipped, err := p.gzipCompress(data)
if err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to compress log record: %w", err))

continue
}
err = record.Body().FromRaw(gzipped)
if err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to set gzipped data to log record body: %w", err))

continue
}
}

return errs
}

func (p *logsGzipProcessor) gzipCompress(data []byte) ([]byte, error) {
var buf bytes.Buffer
var err error
wIface := p.pool.Get()
w, ok := wIface.(GzipWriter)
if !ok {
return nil, fmt.Errorf("writer of type %T not supported", wIface)
}
w.Reset(&buf)
defer func() {
if err = w.Close(); err != nil {
p.settings.Logger.Error("Failed to close gzip writer", zap.Error(err))
}
p.pool.Put(w)
}()

_, err = w.Write(data)
if err != nil {
return nil, err
}
if err = w.Close(); err != nil {
return nil, err
}

return buf.Bytes(), nil
}

func (p *logsGzipProcessor) Capabilities() consumer.Capabilities {
return consumer.Capabilities{
MutatesData: true,
}
}

func (p *logsGzipProcessor) Start(ctx context.Context, _ component.Host) error {
p.settings.Logger.Info("Starting logs gzip processor")
return nil
}

func (p *logsGzipProcessor) Shutdown(ctx context.Context) error {
p.settings.Logger.Info("Shutting down logs gzip processor")
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) F5, Inc.
//
// This source code is licensed under the Apache License, Version 2.0 license found in the
// LICENSE file in the root directory of this source tree.
package logsgzipprocessor

import (
"context"
"crypto/rand"
"math/big"
"testing"

"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/processor"
)

// Helper to generate logs with variable size and content
func generateLogs(numRecords, recordSize int) plog.Logs {
logs := plog.NewLogs()
rl := logs.ResourceLogs().AppendEmpty()
sl := rl.ScopeLogs().AppendEmpty()
for i := 0; i < numRecords; i++ {
lr := sl.LogRecords().AppendEmpty()
content, _ := randomString(recordSize)
lr.Body().SetStr(content)
}

return logs
}

func randomString(n int) (string, error) {
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
b := make([]byte, n)
lettersSize := big.NewInt(int64(len(letters)))
for i := range b {
num, err := rand.Int(rand.Reader, lettersSize)
if err != nil {
return "", err
}
b[i] = letters[num.Int64()]
}

return string(b), nil
}

func BenchmarkGzipProcessor(b *testing.B) {
benchmarks := []struct {
name string
numRecords int
recordSize int
}{
{"SmallRecords", 100, 50},
{"MediumRecords", 100, 500},
{"LargeRecords", 100, 5000},
{"ManySmallRecords", 10000, 50},
}

for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
b.ReportAllocs()
consumer := &consumertest.LogsSink{}
p := newLogsGzipProcessor(consumer, processor.Settings{})
logs := generateLogs(bm.numRecords, bm.recordSize)

b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = p.ConsumeLogs(context.Background(), logs)
}
})
}
}

// Optional: Benchmark with concurrency to simulate real pipeline load
func BenchmarkGzipProcessor_Concurrent(b *testing.B) {
// nolint:unused // concurrent runs require total parallel workers to be specified
const workers = 8
logs := generateLogs(1000, 1000)
consumer := &consumertest.LogsSink{}
p := newLogsGzipProcessor(consumer, processor.Settings{})

b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = p.ConsumeLogs(context.Background(), logs)
}
})
}
Loading