Skip to content

Commit 4da03ca

Browse files
committed
Break background cache writes into batches of 100
This improves parallelism and observability. Fixes #2134 Signed-off-by: Bryan Boreham <[email protected]>
1 parent 151c577 commit 4da03ca

File tree

1 file changed

+25
-13
lines changed

1 file changed

+25
-13
lines changed

pkg/chunk/cache/background.go

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ type BackgroundConfig struct {
3333
// RegisterFlagsWithPrefix adds the flags required to config this to the given FlagSet
3434
func (cfg *BackgroundConfig) RegisterFlagsWithPrefix(prefix string, description string, f *flag.FlagSet) {
3535
f.IntVar(&cfg.WriteBackGoroutines, prefix+"memcache.write-back-goroutines", 10, description+"How many goroutines to use to write back to memcache.")
36-
f.IntVar(&cfg.WriteBackBuffer, prefix+"memcache.write-back-buffer", 10000, description+"How many chunks to buffer for background write back.")
36+
f.IntVar(&cfg.WriteBackBuffer, prefix+"memcache.write-back-buffer", 10000, description+"How many key batches to buffer for background write-back.")
3737
}
3838

3939
type backgroundCache struct {
@@ -80,21 +80,33 @@ func (c *backgroundCache) Stop() {
8080
c.Cache.Stop()
8181
}
8282

83+
const keysPerBatch = 100
84+
8385
// Store writes keys for the cache in the background.
8486
func (c *backgroundCache) Store(ctx context.Context, keys []string, bufs [][]byte) {
85-
bgWrite := backgroundWrite{
86-
keys: keys,
87-
bufs: bufs,
88-
}
89-
select {
90-
case c.bgWrites <- bgWrite:
91-
c.queueLength.Add(float64(len(keys)))
92-
default:
93-
c.droppedWriteBack.Add(float64(len(keys)))
94-
sp := opentracing.SpanFromContext(ctx)
95-
if sp != nil {
96-
sp.LogFields(otlog.Int("dropped", len(keys)))
87+
for len(keys) > 0 {
88+
num := keysPerBatch
89+
if num > len(keys) {
90+
num = len(keys)
91+
}
92+
93+
bgWrite := backgroundWrite{
94+
keys: keys[:num],
95+
bufs: bufs[:num],
96+
}
97+
select {
98+
case c.bgWrites <- bgWrite:
99+
c.queueLength.Add(float64(len(keys)))
100+
default:
101+
c.droppedWriteBack.Add(float64(len(keys)))
102+
sp := opentracing.SpanFromContext(ctx)
103+
if sp != nil {
104+
sp.LogFields(otlog.Int("dropped", len(keys)))
105+
}
106+
break // queue is full; give up
97107
}
108+
keys = keys[num:]
109+
bufs = bufs[num:]
98110
}
99111
}
100112

0 commit comments

Comments
 (0)