Skip to content

feat: concurrent storage put #1304

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Aug 30, 2022
13 changes: 4 additions & 9 deletions pkg/cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ type serverService struct {
group *errgroup.Group
}

const (
storageQueueWorkers = 1
storageQueueSize = 100
)

func newServerService(c *config.Server) (*serverService, error) {
logLevel, err := logrus.ParseLevel(c.LogLevel)
if err != nil {
Expand All @@ -97,7 +92,9 @@ func newServerService(c *config.Server) (*serverService, error) {
}

svc.healthController = health.NewController(svc.logger, time.Minute, diskPressure)
svc.storage, err = storage.New(storage.NewConfig(svc.config), svc.logger, prometheus.DefaultRegisterer, svc.healthController)

storageConfig := storage.NewConfig(svc.config)
svc.storage, err = storage.New(storageConfig, svc.logger, prometheus.DefaultRegisterer, svc.healthController)
if err != nil {
return nil, fmt.Errorf("new storage: %w", err)
}
Expand Down Expand Up @@ -146,9 +143,7 @@ func newServerService(c *config.Server) (*serverService, error) {
return nil, fmt.Errorf("new metric exporter: %w", err)
}

svc.storageQueue = storage.NewIngestionQueue(svc.logger, svc.storage, prometheus.DefaultRegisterer,
storageQueueWorkers,
storageQueueSize)
svc.storageQueue = storage.NewIngestionQueue(svc.logger, svc.storage, prometheus.DefaultRegisterer, storageConfig)

defaultMetricsRegistry := prometheus.DefaultRegisterer

Expand Down
2 changes: 2 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ type Server struct {
BadgerLogLevel string `def:"error" desc:"log level: debug|info|warn|error" mapstructure:"badger-log-level"`

StoragePath string `def:"<installPrefix>/var/lib/pyroscope" desc:"directory where pyroscope stores profiling data" mapstructure:"storage-path"`
StorageQueueSize int `desc:"storage queue size" mapstructure:"storage-queue-size"`
StorageQueueWorkers int `desc:"number of workers handling internal storage queue" mapstructure:"storage-queue-workers"`
MinFreeSpacePercentage float64 `def:"5" desc:"percentage of available disk space at which ingestion requests are discarded. Defaults to 5% but not less than 1GB. Set 0 to disable" mapstructure:"min-free-space-percentage"`

APIBindAddr string `def:":4040" desc:"port for the HTTP(S) server used for data ingestion and web UI" mapstructure:"api-bind-addr"`
Expand Down
27 changes: 9 additions & 18 deletions pkg/storage/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,27 +214,15 @@ func dropPrefixBatch(db *badger.DB, p []byte, n int) (bool, error) {
}

func (cache *Cache) GetOrCreate(key string) (interface{}, error) {
v, err := cache.get(key) // find the key from cache first
if err != nil {
return nil, err
}
if v != nil {
return v, nil
}
v = cache.codec.New(key)
cache.lfu.Set(key, v)
return v, nil
return cache.get(key, true)
}

func (cache *Cache) Lookup(key string) (interface{}, bool) {
v, err := cache.get(key)
if v == nil || err != nil {
return nil, false
}
return v, true
v, err := cache.get(key, false)
return v, v != nil && err == nil
}

func (cache *Cache) get(key string) (interface{}, error) {
func (cache *Cache) get(key string, createNotFound bool) (interface{}, error) {
cache.metrics.ReadsCounter.Inc()
return cache.lfu.GetOrSet(key, func() (interface{}, error) {
cache.metrics.MissesCounter.Inc()
Expand All @@ -249,11 +237,14 @@ func (cache *Cache) get(key string) (interface{}, error) {
})

switch {
default:
return nil, err
case err == nil:
case errors.Is(err, badger.ErrKeyNotFound):
if createNotFound {
return cache.codec.New(key), nil
}
return nil, nil
default:
return nil, err
}

cache.metrics.DBReads.Observe(float64(len(buf)))
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/cache/lfu/lfu.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ func (c *Cache) GetOrSet(key string, value func() (interface{}, error)) (interfa
e = new(cacheEntry)
e.key = key
e.value = v
// The item returned by value() is either newly allocated or was just
// read from the DB, therefore we mark it as persisted to avoid redundant
// writes or writing empty object. Once the item is invalidated, caller
// has to explicitly set it with Set call.
e.persisted = true
c.values[key] = e
c.increment(e)
c.len++
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type Config struct {
retention time.Duration
retentionExemplars time.Duration
retentionLevels config.RetentionLevels
queueWorkers int
queueSize int

NewBadger func(name string, p Prefix, codec cache.Codec) (BadgerDBWithCache, error)
}
Expand All @@ -41,6 +43,8 @@ func NewConfig(server *config.Server) *Config {
retentionExemplars: server.ExemplarsRetention,
retentionLevels: server.RetentionLevels,
hideApplications: server.HideApplications,
queueSize: server.StorageQueueSize,
queueWorkers: server.StorageQueueWorkers,
inMemory: false,
}
}
Expand Down
19 changes: 8 additions & 11 deletions pkg/storage/exemplars.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"github.com/pyroscope-io/pyroscope/pkg/storage/metadata"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -375,20 +376,16 @@ func (s *Storage) ensureAppSegmentExists(in *PutInput) error {
return fmt.Errorf("segments cache for %v: %w", k, err)
}
st := r.(*segment.Segment)
if !isMetadataEqual(st, in) {
st.SetMetadata(in.SpyName, in.SampleRate, in.Units, in.AggregationType)
s.segments.Put(k, st)
}
st.SetMetadata(metadata.Metadata{
SpyName: in.SpyName,
SampleRate: in.SampleRate,
Units: in.Units,
AggregationType: in.AggregationType,
})
s.segments.Put(k, st)
return err
}

func isMetadataEqual(s *segment.Segment, in *PutInput) bool {
return in.SpyName == s.SpyName() &&
in.AggregationType == s.AggregationType() &&
in.SampleRate == s.SampleRate() &&
in.Units == s.Units()
}

func (b *exemplarsBatch) insert(_ context.Context, input *PutInput) error {
if len(b.entries) == exemplarsPerBatch {
return errBatchIsFull
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/labels/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@ func New(db *badger.DB) *Labels {
return ll
}

func (ll *Labels) PutLabels(labels map[string]string) error {
return ll.db.Update(func(txn *badger.Txn) error {
for k, v := range labels {
if err := txn.SetEntry(badger.NewEntry([]byte("l:"+k), nil)); err != nil {
return err
}
if err := txn.SetEntry(badger.NewEntry([]byte("v:"+k+":"+v), nil)); err != nil {
return err
}
}
return nil
})
}

func (ll *Labels) Put(key, val string) {
kk := "l:" + key
kv := "v:" + key + ":" + val
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/metadata/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,10 @@ const (
func (a AggregationType) String() string {
return string(a)
}

type Metadata struct {
SpyName string
SampleRate uint32
Units Units
AggregationType AggregationType
}
16 changes: 15 additions & 1 deletion pkg/storage/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,21 @@ type IngestionQueue struct {
discardedTotal prometheus.Counter
}

func NewIngestionQueue(logger logrus.FieldLogger, putter Putter, r prometheus.Registerer, queueWorkers, queueSize int) *IngestionQueue {
const (
defaultQueueSize = 100
defaultWorkers = 1
)

func NewIngestionQueue(logger logrus.FieldLogger, putter Putter, r prometheus.Registerer, c *Config) *IngestionQueue {
queueSize := c.queueSize
if queueSize == 0 {
queueSize = defaultQueueSize
}
queueWorkers := c.queueWorkers
if queueWorkers == 0 {
queueWorkers = defaultWorkers
}

q := IngestionQueue{
logger: logger,
putter: putter,
Expand Down
37 changes: 17 additions & 20 deletions pkg/storage/segment/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,28 +410,25 @@ func (s *Segment) WalkNodesToDelete(t *RetentionPolicy, cb func(depth int, t tim
return s.root.walkNodesToDelete(t.normalize(), cb)
}

// TODO: this should be refactored
func (s *Segment) SetMetadata(spyName string, sampleRate uint32, units metadata.Units, aggregationType metadata.AggregationType) {
s.spyName = spyName
s.sampleRate = sampleRate
s.units = units
s.aggregationType = aggregationType
}

func (s *Segment) SpyName() string {
return s.spyName
}

func (s *Segment) SampleRate() uint32 {
return s.sampleRate
}

func (s *Segment) Units() metadata.Units {
return s.units
func (s *Segment) SetMetadata(md metadata.Metadata) {
s.m.Lock()
s.spyName = md.SpyName
s.sampleRate = md.SampleRate
s.units = md.Units
s.aggregationType = md.AggregationType
s.m.Unlock()
}

func (s *Segment) AggregationType() metadata.AggregationType {
return s.aggregationType
func (s *Segment) GetMetadata() metadata.Metadata {
s.m.Lock()
md := metadata.Metadata{
SpyName: s.spyName,
SampleRate: s.sampleRate,
Units: s.units,
AggregationType: s.aggregationType,
}
s.m.Unlock()
return md
}

var zeroTime time.Time
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/segment/timeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ func GenerateTimeline(st, et time.Time) *Timeline {
}

func (tl *Timeline) PopulateTimeline(s *Segment) {
s.m.Lock()
if s.root != nil {
s.root.populateTimeline(tl, s)
}
s.m.Unlock()
}

func (sn streeNode) populateTimeline(tl *Timeline, s *Segment) {
Expand Down
23 changes: 9 additions & 14 deletions pkg/storage/storage_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,9 @@ func (s *Storage) Get(ctx context.Context, gi *GetInput) (*GetOutput, error) {
resultTrie *tree.Tree
lastSegment *segment.Segment
writesTotal uint64

aggregationType = "sum"
timeline = segment.GenerateTimeline(gi.StartTime, gi.EndTime)
timeline = segment.GenerateTimeline(gi.StartTime, gi.EndTime)
timelines = make(map[string]*segment.Timeline)
)
timelines := make(map[string]*segment.Timeline)

for _, k := range dimensionKeys() {
// TODO: refactor, store `Key`s in dimensions
Expand All @@ -107,11 +105,6 @@ func (s *Storage) Get(ctx context.Context, gi *GetInput) (*GetOutput, error) {
}

st := res.(*segment.Segment)
switch st.AggregationType() {
case averageAggregationType, "avg":
aggregationType = averageAggregationType
}

timelineKey := "*"
if v, ok := parsedKey.Labels()[gi.GroupBy]; ok {
timelineKey = v
Expand Down Expand Up @@ -145,19 +138,21 @@ func (s *Storage) Get(ctx context.Context, gi *GetInput) (*GetOutput, error) {
return nil, nil
}

if writesTotal > 0 && aggregationType == averageAggregationType {
md := lastSegment.GetMetadata()
switch md.AggregationType {
case averageAggregationType, "avg":
resultTrie = resultTrie.Clone(big.NewRat(1, int64(writesTotal)))
}

return &GetOutput{
Tree: resultTrie,
Timeline: timeline,
Groups: timelines,
SpyName: lastSegment.SpyName(),
SampleRate: lastSegment.SampleRate(),
SpyName: md.SpyName,
SampleRate: md.SampleRate,
Units: md.Units,
AggregationType: md.AggregationType,
Count: writesTotal,
Units: lastSegment.Units(),
AggregationType: lastSegment.AggregationType(),
}, nil
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/storage_get_exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ func (s *Storage) GetExemplar(ctx context.Context, gi GetExemplarInput) (out Get
}

if m.segment != nil {
out.SpyName = m.segment.SpyName()
out.Units = m.segment.Units()
out.SampleRate = m.segment.SampleRate()
out.AggregationType = m.segment.AggregationType()
md := m.segment.GetMetadata()
out.SpyName = md.SpyName
out.Units = md.Units
out.SampleRate = md.SampleRate
out.AggregationType = md.AggregationType
}

return out, nil
Expand Down
9 changes: 5 additions & 4 deletions pkg/storage/storage_merge_exemplars.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,11 @@ func (s *Storage) MergeExemplars(ctx context.Context, mi MergeExemplarsInput) (o
out.Tree = m.tree
out.Count = m.count
if m.segment != nil {
out.SpyName = m.segment.SpyName()
out.Units = m.segment.Units()
out.SampleRate = m.segment.SampleRate()
out.AggregationType = m.segment.AggregationType()
md := m.segment.GetMetadata()
out.SpyName = md.SpyName
out.Units = md.Units
out.SampleRate = md.SampleRate
out.AggregationType = md.AggregationType
}

if out.Count > 1 && out.AggregationType == metadata.AverageAggregationType {
Expand Down
16 changes: 9 additions & 7 deletions pkg/storage/storage_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ type PutInput struct {
}

func (s *Storage) Put(ctx context.Context, pi *PutInput) error {
// TODO: This is a pretty broad lock. We should find a way to make these locks more selective.
s.putMutex.Lock()
defer s.putMutex.Unlock()
if s.hc.IsOutOfDiskSpace() {
return errOutOfSpace
}
Expand All @@ -53,8 +50,8 @@ func (s *Storage) Put(ctx context.Context, pi *PutInput) error {
"aggregationType": pi.AggregationType,
}).Debug("storage.Put")

for k, v := range pi.Key.Labels() {
s.labels.Put(k, v)
if err := s.labels.PutLabels(pi.Key.Labels()); err != nil {
return fmt.Errorf("unable to write labels: %w", err)
}

sk := pi.Key.SegmentKey()
Expand All @@ -75,9 +72,14 @@ func (s *Storage) Put(ctx context.Context, pi *PutInput) error {
}

st := r.(*segment.Segment)
st.SetMetadata(pi.SpyName, pi.SampleRate, pi.Units, pi.AggregationType)
samples := pi.Val.Samples()
st.SetMetadata(metadata.Metadata{
SpyName: pi.SpyName,
SampleRate: pi.SampleRate,
Units: pi.Units,
AggregationType: pi.AggregationType,
})

samples := pi.Val.Samples()
err = st.Put(pi.StartTime, pi.EndTime, samples, func(depth int, t time.Time, r *big.Rat, addons []segment.Addon) {
tk := pi.Key.TreeKey(depth, t)
res, err := s.trees.GetOrCreate(tk)
Expand Down
Loading