Skip to content

Fix race on chunks multilevel cache + Optimize to avoid refetching already found keys. #6312

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions pkg/storage/tsdb/multilevel_chunk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (m *multiLevelChunkCache) Fetch(ctx context.Context, keys []string) map[str
timer := prometheus.NewTimer(m.fetchLatency.WithLabelValues())
defer timer.ObserveDuration()

missingKeys := keys
hits := map[string][]byte{}
backfillItems := make([]map[string][]byte, len(m.caches)-1)

Expand All @@ -108,13 +109,25 @@ func (m *multiLevelChunkCache) Fetch(ctx context.Context, keys []string) map[str
if ctx.Err() != nil {
return nil
}
if data := c.Fetch(ctx, keys); len(data) > 0 {
if data := c.Fetch(ctx, missingKeys); len(data) > 0 {
for k, d := range data {
hits[k] = d
}

if i > 0 && len(hits) > 0 {
backfillItems[i-1] = hits
// lets fetch only the mising keys
m := missingKeys[:0]
for _, key := range missingKeys {
if _, ok := hits[key]; !ok {
m = append(m, key)
}
}

missingKeys = m

for k, b := range hits {
backfillItems[i-1][k] = b
}
}

if len(hits) == len(keys) {
Expand Down
60 changes: 54 additions & 6 deletions pkg/storage/tsdb/multilevel_chunk_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"testing"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/cache"
)

func Test_MultiLevelChunkCacheStore(t *testing.T) {
Expand Down Expand Up @@ -72,6 +74,43 @@ func Test_MultiLevelChunkCacheStore(t *testing.T) {
}
}

func Test_MultiLevelChunkCacheFetchRace(t *testing.T) {
cfg := MultiLevelChunkCacheConfig{
MaxAsyncConcurrency: 10,
MaxAsyncBufferSize: 100000,
MaxBackfillItems: 10000,
BackFillTTL: time.Hour * 24,
}
reg := prometheus.NewRegistry()

m1 := newMockChunkCache("m1", map[string][]byte{
"key1": []byte("value1"),
"key2": []byte("value2"),
})

inMemory, err := cache.NewInMemoryCacheWithConfig("test", log.NewNopLogger(), reg, cache.InMemoryCacheConfig{MaxSize: 10 * 1024, MaxItemSize: 1024})
require.NoError(t, err)

inMemory.Store(map[string][]byte{
"key2": []byte("value2"),
"key3": []byte("value3"),
}, time.Minute)

c := newMultiLevelChunkCache("chunk-cache", cfg, reg, inMemory, m1)

hits := c.Fetch(context.Background(), []string{"key1", "key2", "key3", "key4"})

require.Equal(t, 3, len(hits))

// We should be able to change the returned values without any race problem
delete(hits, "key1")

mlc := c.(*multiLevelChunkCache)
//Wait until async operation finishes.
mlc.backfillProcessor.Stop()

}

func Test_MultiLevelChunkCacheFetch(t *testing.T) {
cfg := MultiLevelChunkCacheConfig{
MaxAsyncConcurrency: 10,
Expand All @@ -81,12 +120,14 @@ func Test_MultiLevelChunkCacheFetch(t *testing.T) {
}

testCases := map[string]struct {
m1ExistingData map[string][]byte
m2ExistingData map[string][]byte
expectedM1Data map[string][]byte
expectedM2Data map[string][]byte
expectedFetchedData map[string][]byte
fetchKeys []string
m1ExistingData map[string][]byte
m2ExistingData map[string][]byte
expectedM1Data map[string][]byte
expectedM2Data map[string][]byte
expectedFetchedData map[string][]byte
expectedM1FetchedKeys []string
expectedM2FetchedKeys []string
fetchKeys []string
}{
"fetched data should be union of m1, m2 and 'key2' and `key3' should be backfilled to m1": {
m1ExistingData: map[string][]byte{
Expand All @@ -96,6 +137,8 @@ func Test_MultiLevelChunkCacheFetch(t *testing.T) {
"key2": []byte("value2"),
"key3": []byte("value3"),
},
expectedM1FetchedKeys: []string{"key1", "key2", "key3"},
expectedM2FetchedKeys: []string{"key2", "key3"},
expectedM1Data: map[string][]byte{
"key1": []byte("value1"),
"key2": []byte("value2"),
Expand All @@ -119,6 +162,8 @@ func Test_MultiLevelChunkCacheFetch(t *testing.T) {
m2ExistingData: map[string][]byte{
"key2": []byte("value2"),
},
expectedM1FetchedKeys: []string{"key1", "key2", "key3"},
expectedM2FetchedKeys: []string{"key2", "key3"},
expectedM1Data: map[string][]byte{
"key1": []byte("value1"),
"key2": []byte("value2"),
Expand Down Expand Up @@ -157,6 +202,8 @@ type mockChunkCache struct {
mu sync.Mutex
name string
data map[string][]byte

fetchedKeys []string
}

func newMockChunkCache(name string, data map[string][]byte) *mockChunkCache {
Expand All @@ -180,6 +227,7 @@ func (m *mockChunkCache) Fetch(_ context.Context, keys []string) map[string][]by
h := map[string][]byte{}

for _, k := range keys {
m.fetchedKeys = append(m.fetchedKeys, k)
if _, ok := m.data[k]; ok {
h[k] = m.data[k]
}
Expand Down
Loading