Skip to content
This repository was archived by the owner on Jan 23, 2025. It is now read-only.

Limit # of requests per service #13

Merged
merged 4 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions assertsprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,14 @@ func newProcessor(logger *zap.Logger, ctx context.Context, config component.Conf
}

traceSampler := sampler{
logger: logger,
config: pConfig,
thresholdHelper: &thresholdsHelper,
topTracesMap: &sync.Map{},
healthySamplingState: &sync.Map{},
traceFlushTicker: clock.FromContext(ctx).NewTicker(time.Minute),
nextConsumer: nextConsumer,
requestRegexps: regexps,
stop: make(chan bool),
logger: logger,
config: pConfig,
thresholdHelper: &thresholdsHelper,
topTracesByService: &sync.Map{},
traceFlushTicker: clock.FromContext(ctx).NewTicker(time.Minute),
nextConsumer: nextConsumer,
requestRegexps: regexps,
stop: make(chan bool),
}

p := &assertsProcessorImpl{
Expand Down
2 changes: 1 addition & 1 deletion assertsprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,6 @@ func TestDefaultConfig(t *testing.T) {
// assert.NotNil(t, _assertsProcessor.sampler.stop)
// assert.Equal(t, nextConsumer, _assertsProcessor.sampler.nextConsumer)
// assert.Equal(t, _assertsProcessor.thresholdsHelper, _assertsProcessor.sampler.thresholdHelper)
// assert.NotNil(t, _assertsProcessor.sampler.topTracesMap)
// assert.NotNil(t, _assertsProcessor.sampler.topTracesByService)
// assert.NotNil(t, _assertsProcessor.sampler.traceFlushTicker)
//}
3 changes: 2 additions & 1 deletion assertsprocessor/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ type PriorityQueue []*Item
type TraceQueue struct {
priorityQueue PriorityQueue
maxSize int
mutex sync.Mutex
mutex *sync.Mutex
}

func NewTraceQueue(maxSize int) *TraceQueue {
traceQueue := TraceQueue{
priorityQueue: make(PriorityQueue, 0),
maxSize: maxSize,
mutex: &sync.Mutex{},
}
return &traceQueue
}
Expand Down
37 changes: 17 additions & 20 deletions assertsprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,14 @@ func TestStart(t *testing.T) {
attributeValueRegExps: &map[string]*regexp.Regexp{},
},
sampler: &sampler{
logger: testLogger,
config: &testConfig,
nextConsumer: dConsumer,
topTracesMap: &sync.Map{},
healthySamplingState: &sync.Map{},
stop: make(chan bool),
traceFlushTicker: clock.FromContext(ctx).NewTicker(time.Minute),
thresholdHelper: &_th,
requestRegexps: &map[string]*regexp.Regexp{},
logger: testLogger,
config: &testConfig,
nextConsumer: dConsumer,
topTracesByService: &sync.Map{},
stop: make(chan bool),
traceFlushTicker: clock.FromContext(ctx).NewTicker(time.Minute),
thresholdHelper: &_th,
requestRegexps: &map[string]*regexp.Regexp{},
},
}
assert.Nil(t, p.Start(ctx, nil))
Expand Down Expand Up @@ -96,8 +95,7 @@ func TestStart(t *testing.T) {
// logger: testLogger,
// config: &testConfig,
// nextConsumer: dConsumer,
// topTracesMap: &sync.Map{},
// healthySamplingState: &sync.Map{},
// topTracesByService: &sync.Map{},
// stop: make(chan bool),
// traceFlushTicker: clock.FromContext(ctx).NewTicker(time.Minute),
// thresholdHelper: &_th,
Expand Down Expand Up @@ -130,15 +128,14 @@ func TestConsumeTraces(t *testing.T) {
attributeValueRegExps: &map[string]*regexp.Regexp{},
},
sampler: &sampler{
logger: testLogger,
config: &testConfig,
nextConsumer: dConsumer,
topTracesMap: &sync.Map{},
healthySamplingState: &sync.Map{},
stop: make(chan bool),
traceFlushTicker: clock.FromContext(ctx).NewTicker(time.Minute),
thresholdHelper: &_th,
requestRegexps: &map[string]*regexp.Regexp{},
logger: testLogger,
config: &testConfig,
nextConsumer: dConsumer,
topTracesByService: &sync.Map{},
stop: make(chan bool),
traceFlushTicker: clock.FromContext(ctx).NewTicker(time.Minute),
thresholdHelper: &_th,
requestRegexps: &map[string]*regexp.Regexp{},
},
}

Expand Down
141 changes: 75 additions & 66 deletions assertsprocessor/sampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,33 @@ import (
"sync"
)

type traceQueues struct {
slowQueue *TraceQueue
errorQueue *TraceQueue
type traceSampler struct {
slowQueue *TraceQueue
errorQueue *TraceQueue
samplingState *periodicSamplingState
}

func (tQ *traceQueues) errorTraceCount() int {
return len(tQ.errorQueue.priorityQueue)
func (tS *traceSampler) errorTraceCount() int {
return len(tS.errorQueue.priorityQueue)
}

func (tQ *traceQueues) slowTraceCount() int {
return len(tQ.slowQueue.priorityQueue)
func (tS *traceSampler) slowTraceCount() int {
return len(tS.slowQueue.priorityQueue)
}

func (ts *traceSampler) sample(config *Config) bool {
return ts.samplingState.sample(config.NormalSamplingFrequencyMinutes)
}

type sampler struct {
logger *zap.Logger
config *Config
thresholdHelper *thresholdHelper
topTracesMap *sync.Map
healthySamplingState *sync.Map
traceFlushTicker *clock.Ticker
nextConsumer consumer.Traces
stop chan bool
requestRegexps *map[string]*regexp.Regexp
logger *zap.Logger
config *Config
thresholdHelper *thresholdHelper
topTracesByService *sync.Map
traceFlushTicker *clock.Ticker
nextConsumer consumer.Traces
stop chan bool
requestRegexps *map[string]*regexp.Regexp
}

type traceSummary struct {
Expand All @@ -57,40 +61,45 @@ func (s *sampler) stopProcessing() {
func (s *sampler) sampleTrace(ctx context.Context,
trace ptrace.Traces, traceId string, spanSet *resourceSpanGroup) {
summary := s.getSummary(traceId, spanSet)
if summary == nil {
return
}
item := Item{
trace: &trace,
ctx: &ctx,
latency: summary.latency,
}
if summary.hasError || summary.isSlow {
// Get the trace queue for the entity and request
pq := s.getTraceQueues(summary.requestKey)
if summary.hasError {
s.logger.Debug("Capturing error trace",
zap.String("traceId", summary.slowestRootSpan.TraceID().String()),
zap.Float64("latency", summary.latency))
pq.errorQueue.push(&item)
} else {
s.logger.Debug("Capturing slow trace",
zap.String("traceId", summary.slowestRootSpan.TraceID().String()),
zap.Float64("latency", summary.latency))
pq.slowQueue.push(&item)
perService, _ := s.topTracesByService.LoadOrStore(summary.requestKey.entityKey.AsString(), NewServiceQueues(s.config))
requestState := perService.(*serviceQueues).getRequestState(summary.requestKey.request)

// If there are too many requests, we may not get a queue due to constraints
if requestState != nil {
if summary.hasError {
s.logger.Debug("Capturing error trace",
zap.String("traceId", traceId),
zap.Float64("latency", summary.latency))
requestState.errorQueue.push(&item)
} else {
s.logger.Debug("Capturing slow trace",
zap.String("traceId", traceId),
zap.Float64("latency", summary.latency))
requestState.slowQueue.push(&item)
}
}
} else if len(spanSet.rootSpans) > 0 && summary.requestKey.AsString() != "" {
// Capture healthy samples based on configured sampling rate
state, _ := s.healthySamplingState.LoadOrStore(summary.requestKey.AsString(), &periodicSamplingState{
lastSampleTime: 0,
rwMutex: &sync.RWMutex{},
})
samplingState := state.(*periodicSamplingState)
if samplingState.sample(s.config.NormalSamplingFrequencyMinutes) {
entry, _ := s.topTracesByService.LoadOrStore(summary.requestKey.entityKey.AsString(), NewServiceQueues(s.config))
perService := entry.(*serviceQueues)
requestState := perService.getRequestState(summary.requestKey.request)
if requestState != nil && requestState.sample(s.config) {
s.logger.Debug("Capturing normal trace",
zap.String("traceId", summary.slowestRootSpan.TraceID().String()),
zap.String("traceId", traceId),
zap.Float64("latency", summary.latency))
pq := s.getTraceQueues(summary.requestKey)

// Push to the latency queue to prioritize the healthy sample too
pq.slowQueue.push(&item)
requestState.slowQueue.push(&item)
}
}
}
Expand Down Expand Up @@ -123,14 +132,6 @@ func (s *sampler) getSummary(traceId string, spanSet *resourceSpanGroup) *traceS
return &summary
}

func (s *sampler) getTraceQueues(key RequestKey) *traceQueues {
var pq, _ = s.topTracesMap.LoadOrStore(key.AsString(), &traceQueues{
slowQueue: NewTraceQueue(int(math.Min(5, float64(s.config.LimitPerRequestPerService)))),
errorQueue: NewTraceQueue(int(math.Min(5, float64(s.config.LimitPerRequestPerService)))),
})
return pq.(*traceQueues)
}

func (s *sampler) isSlow(namespace string, serviceName string, rootSpan ptrace.Span, request string) bool {
spanDuration := computeLatency(rootSpan)
threshold := s.thresholdHelper.getThreshold(namespace, serviceName, request)
Expand All @@ -153,31 +154,39 @@ func (s *sampler) startTraceFlusher() {
s.logger.Info("Trace flush background routine stopped")
return
case <-s.traceFlushTicker.C:
var previousTraces = s.topTracesMap
s.topTracesMap = &sync.Map{}
var previousTraces = s.topTracesByService
s.topTracesByService = &sync.Map{}
previousTraces.Range(func(key any, value any) bool {
var requestKey = key.(string)
var q = value.(*traceQueues)

// Flush all the errors
if len(q.errorQueue.priorityQueue) > 0 {
s.logger.Debug("Flushing Error Traces for",
zap.String("Request", requestKey),
zap.Int("Count", len(q.errorQueue.priorityQueue)))
for _, item := range q.errorQueue.priorityQueue {
_ = (*s).nextConsumer.ConsumeTraces(*item.ctx, *item.trace)
var entityKey = key.(string)
var sq = value.(*serviceQueues)

sq.requestStates.Range(func(key1 any, value1 any) bool {
var requestKey = key1.(string)
var _sampler = value1.(*traceSampler)

// Flush all the errors
if len(_sampler.errorQueue.priorityQueue) > 0 {
s.logger.Debug("Flushing Error Traces for",
zap.String("Service", entityKey),
zap.String("Request", requestKey),
zap.Int("Count", len(_sampler.errorQueue.priorityQueue)))
for _, item := range _sampler.errorQueue.priorityQueue {
_ = (*s).nextConsumer.ConsumeTraces(*item.ctx, *item.trace)
}
}
}

// Flush all the slow traces
if len(q.slowQueue.priorityQueue) > 0 {
s.logger.Debug("Flushing Slow Traces for",
zap.String("Request", requestKey),
zap.Int("Count", len(q.slowQueue.priorityQueue)))
for _, item := range q.slowQueue.priorityQueue {
_ = (*s).nextConsumer.ConsumeTraces(*item.ctx, *item.trace)

// Flush all the slow traces
if len(_sampler.slowQueue.priorityQueue) > 0 {
s.logger.Debug("Flushing Slow Traces for",
zap.String("Service", entityKey),
zap.String("Request", requestKey),
zap.Int("Count", len(_sampler.slowQueue.priorityQueue)))
for _, item := range _sampler.slowQueue.priorityQueue {
_ = (*s).nextConsumer.ConsumeTraces(*item.ctx, *item.trace)
}
}
}
return true
})
return true
})
}
Expand Down
Loading