From 92d4d1833d829917226952a0df59fa7be50d8a17 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 24 Oct 2023 22:32:00 -0700 Subject: [PATCH 1/4] replace inmemory index cache to fastcache based implementation Signed-off-by: Ben Ye --- go.mod | 1 + go.sum | 12 + integration/querier_test.go | 19 +- pkg/storage/tsdb/index_cache.go | 2 +- pkg/storage/tsdb/inmemory_index_cache.go | 236 ++++++++++ pkg/storage/tsdb/inmemory_index_cache_test.go | 141 ++++++ pkg/storage/tsdb/multilevel_cache_test.go | 2 +- .../VictoriaMetrics/fastcache/LICENSE | 22 + .../VictoriaMetrics/fastcache/README.md | 116 +++++ .../VictoriaMetrics/fastcache/bigcache.go | 160 +++++++ .../VictoriaMetrics/fastcache/fastcache.go | 419 +++++++++++++++++ .../VictoriaMetrics/fastcache/file.go | 421 ++++++++++++++++++ .../VictoriaMetrics/fastcache/malloc_heap.go | 12 + .../VictoriaMetrics/fastcache/malloc_mmap.go | 54 +++ vendor/modules.txt | 3 + 15 files changed, 1602 insertions(+), 18 deletions(-) create mode 100644 pkg/storage/tsdb/inmemory_index_cache.go create mode 100644 pkg/storage/tsdb/inmemory_index_cache_test.go create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/LICENSE create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/README.md create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/bigcache.go create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/fastcache.go create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/file.go create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/malloc_heap.go create mode 100644 vendor/github.com/VictoriaMetrics/fastcache/malloc_mmap.go diff --git a/go.mod b/go.mod index 9e82cfee453..65866d20b44 100644 --- a/go.mod +++ b/go.mod @@ -77,6 +77,7 @@ require ( ) require ( + github.com/VictoriaMetrics/fastcache v1.12.1 github.com/cespare/xxhash/v2 v2.2.0 github.com/google/go-cmp v0.5.9 google.golang.org/protobuf v1.31.0 diff --git a/go.sum b/go.sum index eb2cd8f5dbb..c00e2291a4e 100644 --- a/go.sum +++ b/go.sum @@ -648,6 +648,8 @@ github.com/OneOfOne/xxhash v1.2.6 h1:U68crOE3y3MPttCMQGywZOLrTeF5HHJ3/vDBCJn9/bA github.com/OneOfOne/xxhash v1.2.6/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm/4RlzPXRlREEwqTHAN3T56Bv2ITsFT3gY= github.com/ajstarks/deck/generate v0.0.0-20210309230005-c3f852c02e19/go.mod h1:T13YZdzov6OU0A1+RfKZiZN9ca6VeKdBdyDV+BY97Tk= +github.com/VictoriaMetrics/fastcache v1.12.1 h1:i0mICQuojGDL3KblA7wUNlY5lOK6a4bwt3uRKnkZU40= +github.com/VictoriaMetrics/fastcache v1.12.1/go.mod h1:tX04vaqcNoQeGLD+ra5pU5sWkuxnzWhEzLwhP9w653o= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b/go.mod h1:1KcenG0jGWcpt8ov532z81sp/kMMUG485J2InIOyADM= github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497 h1:aDITxVUQ/3KBhpVWX57Vo9ntGTxoRw1F0T6/x/tRzNU= @@ -667,7 +669,12 @@ github.com/alicebob/miniredis/v2 v2.30.4 h1:8S4/o1/KoUArAGbGwPxcwf0krlzceva2XVOS github.com/alicebob/miniredis/v2 v2.30.4/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg= github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible h1:9gWa46nstkJ9miBReJcN8Gq34cBFbzSpQZVVT9N09TM= github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= +<<<<<<< HEAD github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= +======= +github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= +github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= +>>>>>>> 7e5fd9b4b (replace inmemory index cache to fastcache based implementation) github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0= github.com/apache/arrow/go/v11 v11.0.0/go.mod h1:Eg5OsL5H+e299f7u5ssuXsuHQVEGC4xei5aX110hRiI= @@ -1514,8 +1521,13 @@ github.com/thanos-io/objstore v0.0.0-20230921130928-63a603e651ed h1:iWQdY3S6DpWj github.com/thanos-io/objstore v0.0.0-20230921130928-63a603e651ed/go.mod h1:oJ82xgcBDzGJrEgUsjlTj6n01+ZWUMMUR8BlZzX5xDE= github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591 h1:6bZbFM+Mvy2kL8BeL8TJ5+5pV3sUR2PSLaZyw911rtQ= github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591/go.mod h1:vfXJv1JXNdLfHnjsHsLLJl5tyI7KblF76Wo5lZ9YC4Q= +<<<<<<< HEAD github.com/thanos-io/thanos v0.32.5-0.20231103115946-463a6ce8b53c h1:hMpXd1ybZB/vnR3+zex93va42rQ++2E0qi2wVSf3AwY= github.com/thanos-io/thanos v0.32.5-0.20231103115946-463a6ce8b53c/go.mod h1:q+0MQPBugkBKZBFSOec4WV4EcuKJU6tgMI0i4M2znpY= +======= +github.com/thanos-io/thanos v0.32.5-0.20231025032614-da62b6bf84b8 h1:mWlY64XMYTFeCk4WziW33xerKsp+BWOck6g77cz9ZgA= +github.com/thanos-io/thanos v0.32.5-0.20231025032614-da62b6bf84b8/go.mod h1:eVFfte7jP1aTcTkQcZEj5/P9rCeMFHllEqfNZqirLLA= +>>>>>>> 7e5fd9b4b (replace inmemory index cache to fastcache based implementation) github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= diff --git a/integration/querier_test.go b/integration/querier_test.go index e7bf94356a1..8d650517133 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -257,10 +257,7 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64((5+5+2)*numberOfCacheBackends)), "thanos_store_index_cache_requests_total")) require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty - if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory { - require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items")) - require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items_added_total")) - } else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { + if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(21), "thanos_memcached_operations_total")) // 14 gets + 7 sets } @@ -297,10 +294,6 @@ func TestQuerierWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { } require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(2), "thanos_store_index_cache_hits_total")) // this time has used the index cache - if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory { - require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items")) // as before - require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(9), "thanos_store_index_cache_items_added_total")) // as before - } if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(23-l0CacheHits), "thanos_memcached_operations_total")) // as before + 2 gets - cache hits } @@ -516,10 +509,7 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) { require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((5+5+2)*seriesReplicationFactor)), "thanos_store_index_cache_requests_total")) // 5 for expanded postings and postings, 2 for series require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(0), "thanos_store_index_cache_hits_total")) // no cache hit cause the cache was empty - if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory { - require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items")) - require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items_added_total")) - } else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { + if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(21*seriesReplicationFactor)), "thanos_memcached_operations_total")) // 14 gets + 7 sets } @@ -532,10 +522,7 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) { require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((12+2)*seriesReplicationFactor)), "thanos_store_index_cache_requests_total")) require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*seriesReplicationFactor)), "thanos_store_index_cache_hits_total")) // this time has used the index cache - if testCfg.indexCacheBackend == tsdb.IndexCacheBackendInMemory { - require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items")) // as before - require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(9*seriesReplicationFactor)), "thanos_store_index_cache_items_added_total")) // as before - } else if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { + if testCfg.indexCacheBackend == tsdb.IndexCacheBackendMemcached { require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64((21+2)*seriesReplicationFactor)), "thanos_memcached_operations_total")) // as before + 2 gets } diff --git a/pkg/storage/tsdb/index_cache.go b/pkg/storage/tsdb/index_cache.go index 195aa9a24cb..6668d817871 100644 --- a/pkg/storage/tsdb/index_cache.go +++ b/pkg/storage/tsdb/index_cache.go @@ -222,7 +222,7 @@ func newInMemoryIndexCache(cfg InMemoryIndexCacheConfig, logger log.Logger, regi maxItemSize = maxCacheSize } - return storecache.NewInMemoryIndexCacheWithConfig(logger, nil, registerer, storecache.InMemoryIndexCacheConfig{ + return NewInMemoryIndexCacheWithConfig(logger, nil, registerer, storecache.InMemoryIndexCacheConfig{ MaxSize: maxCacheSize, MaxItemSize: maxItemSize, }) diff --git a/pkg/storage/tsdb/inmemory_index_cache.go b/pkg/storage/tsdb/inmemory_index_cache.go new file mode 100644 index 00000000000..8afb06464cd --- /dev/null +++ b/pkg/storage/tsdb/inmemory_index_cache.go @@ -0,0 +1,236 @@ +package tsdb + +import ( + "context" + "reflect" + "unsafe" + + "github.com/VictoriaMetrics/fastcache" + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/oklog/ulid" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + + storecache "github.com/thanos-io/thanos/pkg/store/cache" + "github.com/thanos-io/thanos/pkg/tenancy" +) + +type InMemoryIndexCache struct { + logger log.Logger + cache *fastcache.Cache + maxItemSizeBytes uint64 + + added *prometheus.CounterVec + overflow *prometheus.CounterVec + + commonMetrics *storecache.CommonMetrics +} + +// NewInMemoryIndexCacheWithConfig creates a new thread-safe cache for index entries. It relies on the cache library +// (fastcache) to ensures the total cache size approximately does not exceed maxBytes. +func NewInMemoryIndexCacheWithConfig(logger log.Logger, commonMetrics *storecache.CommonMetrics, reg prometheus.Registerer, config storecache.InMemoryIndexCacheConfig) (*InMemoryIndexCache, error) { + if config.MaxItemSize > config.MaxSize { + return nil, errors.Errorf("max item size (%v) cannot be bigger than overall cache size (%v)", config.MaxItemSize, config.MaxSize) + } + + // fastcache will panic if MaxSize <= 0. + if config.MaxSize <= 0 { + config.MaxSize = storecache.DefaultInMemoryIndexCacheConfig.MaxSize + } + + if commonMetrics == nil { + commonMetrics = storecache.NewCommonMetrics(reg) + } + + c := &InMemoryIndexCache{ + logger: logger, + maxItemSizeBytes: uint64(config.MaxItemSize), + commonMetrics: commonMetrics, + } + + c.added = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_items_added_total", + Help: "Total number of items that were added to the index cache.", + }, []string{"item_type"}) + c.added.WithLabelValues(cacheTypePostings) + c.added.WithLabelValues(cacheTypeSeries) + c.added.WithLabelValues(cacheTypeExpandedPostings) + + c.commonMetrics.RequestTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant) + c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant) + c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant) + + c.overflow = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ + Name: "thanos_store_index_cache_items_overflowed_total", + Help: "Total number of items that could not be added to the cache due to being too big.", + }, []string{"item_type"}) + c.overflow.WithLabelValues(cacheTypePostings) + c.overflow.WithLabelValues(cacheTypeSeries) + c.overflow.WithLabelValues(cacheTypeExpandedPostings) + + c.commonMetrics.HitsTotal.WithLabelValues(cacheTypePostings, tenancy.DefaultTenant) + c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeSeries, tenancy.DefaultTenant) + c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenancy.DefaultTenant) + + c.cache = fastcache.New(int(config.MaxSize)) + level.Info(logger).Log( + "msg", "created in-memory index cache", + "maxItemSizeBytes", c.maxItemSizeBytes, + "maxSizeBytes", config.MaxSize, + ) + return c, nil +} + +func (c *InMemoryIndexCache) get(key storecache.CacheKey) ([]byte, bool) { + k := yoloBuf(key.String()) + resp := c.cache.GetBig(nil, k) + if len(resp) == 0 { + return nil, false + } + return resp, true +} + +func (c *InMemoryIndexCache) set(typ string, key storecache.CacheKey, val []byte) { + k := yoloBuf(key.String()) + r := c.cache.GetBig(nil, k) + // item exists, no need to set it again. + if r != nil { + return + } + + size := uint64(len(k) + len(val)) + if size > c.maxItemSizeBytes { + level.Info(c.logger).Log( + "msg", "item bigger than maxItemSizeBytes. Ignoring..", + "maxItemSizeBytes", c.maxItemSizeBytes, + "cacheType", typ, + ) + c.overflow.WithLabelValues(typ).Inc() + return + } + + c.cache.SetBig(k, val) + c.added.WithLabelValues(typ).Inc() +} + +func yoloBuf(s string) []byte { + return *(*[]byte)(unsafe.Pointer(&s)) +} + +func copyString(s string) string { + var b []byte + h := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + h.Data = (*reflect.StringHeader)(unsafe.Pointer(&s)).Data + h.Len = len(s) + h.Cap = len(s) + return string(b) +} + +// copyToKey is required as underlying strings might be mmaped. +func copyToKey(l labels.Label) storecache.CacheKeyPostings { + return storecache.CacheKeyPostings(labels.Label{Value: copyString(l.Value), Name: copyString(l.Name)}) +} + +// StorePostings sets the postings identified by the ulid and label to the value v, +// if the postings already exists in the cache it is not mutated. +func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) { + c.commonMetrics.DataSizeBytes.WithLabelValues(cacheTypePostings, tenant).Observe(float64(len(v))) + c.set(cacheTypePostings, storecache.CacheKey{Block: blockID.String(), Key: copyToKey(l)}, v) +} + +// FetchMultiPostings fetches multiple postings - each identified by a label - +// and returns a map containing cache hits, along with a list of missing keys. +func (c *InMemoryIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) { + timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(cacheTypePostings, tenant)) + defer timer.ObserveDuration() + + hits = map[labels.Label][]byte{} + + blockIDKey := blockID.String() + requests := 0 + hit := 0 + for _, key := range keys { + if ctx.Err() != nil { + c.commonMetrics.RequestTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(requests)) + c.commonMetrics.HitsTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(hit)) + return hits, misses + } + requests++ + if b, ok := c.get(storecache.CacheKey{Block: blockIDKey, Key: storecache.CacheKeyPostings(key)}); ok { + hit++ + hits[key] = b + continue + } + + misses = append(misses, key) + } + c.commonMetrics.RequestTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(requests)) + c.commonMetrics.HitsTotal.WithLabelValues(cacheTypePostings, tenant).Add(float64(hit)) + + return hits, misses +} + +// StoreExpandedPostings stores expanded postings for a set of label matchers. +func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) { + c.commonMetrics.DataSizeBytes.WithLabelValues(cacheTypeExpandedPostings, tenant).Observe(float64(len(v))) + c.set(cacheTypeExpandedPostings, storecache.CacheKey{Block: blockID.String(), Key: storecache.CacheKeyExpandedPostings(storecache.LabelMatchersToString(matchers))}, v) +} + +// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not. +func (c *InMemoryIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) { + timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(cacheTypeExpandedPostings, tenant)) + defer timer.ObserveDuration() + + if ctx.Err() != nil { + return nil, false + } + c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeExpandedPostings, tenant).Inc() + if b, ok := c.get(storecache.CacheKey{Block: blockID.String(), Key: storecache.CacheKeyExpandedPostings(storecache.LabelMatchersToString(matchers))}); ok { + c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeExpandedPostings, tenant).Inc() + return b, true + } + return nil, false +} + +// StoreSeries sets the series identified by the ulid and id to the value v, +// if the series already exists in the cache it is not mutated. +func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) { + c.commonMetrics.DataSizeBytes.WithLabelValues(cacheTypeSeries, tenant).Observe(float64(len(v))) + c.set(cacheTypeSeries, storecache.CacheKey{Block: blockID.String(), Key: storecache.CacheKeySeries(id)}, v) +} + +// FetchMultiSeries fetches multiple series - each identified by ID - from the cache +// and returns a map containing cache hits, along with a list of missing IDs. +func (c *InMemoryIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) { + timer := prometheus.NewTimer(c.commonMetrics.FetchLatency.WithLabelValues(cacheTypeSeries, tenant)) + defer timer.ObserveDuration() + + hits = map[storage.SeriesRef][]byte{} + + blockIDKey := blockID.String() + requests := 0 + hit := 0 + for _, id := range ids { + if ctx.Err() != nil { + c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(requests)) + c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(hit)) + return hits, misses + } + requests++ + if b, ok := c.get(storecache.CacheKey{Block: blockIDKey, Key: storecache.CacheKeySeries(id)}); ok { + hit++ + hits[id] = b + continue + } + + misses = append(misses, id) + } + c.commonMetrics.RequestTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(requests)) + c.commonMetrics.HitsTotal.WithLabelValues(cacheTypeSeries, tenant).Add(float64(hit)) + + return hits, misses +} diff --git a/pkg/storage/tsdb/inmemory_index_cache_test.go b/pkg/storage/tsdb/inmemory_index_cache_test.go new file mode 100644 index 00000000000..f01896d1ea2 --- /dev/null +++ b/pkg/storage/tsdb/inmemory_index_cache_test.go @@ -0,0 +1,141 @@ +package tsdb + +import ( + "bytes" + "context" + "fmt" + "strconv" + "strings" + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + storecache "github.com/thanos-io/thanos/pkg/store/cache" + "github.com/thanos-io/thanos/pkg/tenancy" +) + +func TestInMemoryIndexCache_UpdateItem(t *testing.T) { + var errorLogs []string + errorLogger := log.LoggerFunc(func(kvs ...interface{}) error { + var lvl string + for i := 0; i < len(kvs); i += 2 { + if kvs[i] == "level" { + lvl = fmt.Sprint(kvs[i+1]) + break + } + } + if lvl != "error" { + return nil + } + var buf bytes.Buffer + defer func() { errorLogs = append(errorLogs, buf.String()) }() + return log.NewLogfmtLogger(&buf).Log(kvs...) + }) + + metrics := prometheus.NewRegistry() + cache, err := NewInMemoryIndexCacheWithConfig(log.NewSyncLogger(errorLogger), nil, metrics, storecache.InMemoryIndexCacheConfig{ + MaxItemSize: 1024, + MaxSize: 1024, + }) + testutil.Ok(t, err) + + uid := func(id storage.SeriesRef) ulid.ULID { return ulid.MustNew(uint64(id), nil) } + lbl := labels.Label{Name: "foo", Value: "bar"} + matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar") + ctx := context.Background() + + for _, tt := range []struct { + typ string + set func(storage.SeriesRef, []byte) + get func(storage.SeriesRef) ([]byte, bool) + }{ + { + typ: cacheTypePostings, + set: func(id storage.SeriesRef, b []byte) { cache.StorePostings(uid(id), lbl, b, tenancy.DefaultTenant) }, + get: func(id storage.SeriesRef) ([]byte, bool) { + hits, _ := cache.FetchMultiPostings(ctx, uid(id), []labels.Label{lbl}, tenancy.DefaultTenant) + b, ok := hits[lbl] + + return b, ok + }, + }, + { + typ: cacheTypeSeries, + set: func(id storage.SeriesRef, b []byte) { cache.StoreSeries(uid(id), id, b, tenancy.DefaultTenant) }, + get: func(id storage.SeriesRef) ([]byte, bool) { + hits, _ := cache.FetchMultiSeries(ctx, uid(id), []storage.SeriesRef{id}, tenancy.DefaultTenant) + b, ok := hits[id] + + return b, ok + }, + }, + { + typ: cacheTypeExpandedPostings, + set: func(id storage.SeriesRef, b []byte) { + cache.StoreExpandedPostings(uid(id), []*labels.Matcher{matcher}, b, tenancy.DefaultTenant) + }, + get: func(id storage.SeriesRef) ([]byte, bool) { + return cache.FetchExpandedPostings(ctx, uid(id), []*labels.Matcher{matcher}, tenancy.DefaultTenant) + }, + }, + } { + t.Run(tt.typ, func(t *testing.T) { + defer func() { errorLogs = nil }() + + // Set value. + tt.set(0, []byte{0}) + buf, ok := tt.get(0) + testutil.Equals(t, true, ok) + testutil.Equals(t, []byte{0}, buf) + testutil.Equals(t, []string(nil), errorLogs) + + // Set the same value again. + tt.set(0, []byte{0}) + buf, ok = tt.get(0) + testutil.Equals(t, true, ok) + testutil.Equals(t, []byte{0}, buf) + testutil.Equals(t, []string(nil), errorLogs) + + // Set a larger value. + tt.set(1, []byte{0, 1}) + buf, ok = tt.get(1) + testutil.Equals(t, true, ok) + testutil.Equals(t, []byte{0, 1}, buf) + testutil.Equals(t, []string(nil), errorLogs) + + // Mutations to existing values will be ignored. + tt.set(1, []byte{1, 2}) + buf, ok = tt.get(1) + testutil.Equals(t, true, ok) + testutil.Equals(t, []byte{0, 1}, buf) + testutil.Equals(t, []string(nil), errorLogs) + }) + } +} + +func TestInMemoryIndexCacheSetOverflow(t *testing.T) { + config := storecache.InMemoryIndexCacheConfig{ + MaxSize: storecache.DefaultInMemoryIndexCacheConfig.MaxSize, + MaxItemSize: 100, + } + cache, err := NewInMemoryIndexCacheWithConfig(log.NewNopLogger(), nil, nil, config) + testutil.Ok(t, err) + counter := cache.overflow.WithLabelValues(cacheTypeSeries) + id := ulid.MustNew(ulid.Now(), nil) + // Insert a small value won't trigger item overflow. + cache.StoreSeries(id, 1, []byte("0"), tenancy.DefaultTenant) + testutil.Equals(t, float64(0), prom_testutil.ToFloat64(counter)) + + var sb strings.Builder + for i := 0; i < 100; i++ { + sb.WriteString(strconv.Itoa(i)) + } + // Trigger overflow with a large value. + cache.StoreSeries(id, 2, []byte(sb.String()), tenancy.DefaultTenant) + testutil.Equals(t, float64(1), prom_testutil.ToFloat64(counter)) +} diff --git a/pkg/storage/tsdb/multilevel_cache_test.go b/pkg/storage/tsdb/multilevel_cache_test.go index 4434fb493c8..c37e05391cd 100644 --- a/pkg/storage/tsdb/multilevel_cache_test.go +++ b/pkg/storage/tsdb/multilevel_cache_test.go @@ -32,7 +32,7 @@ func Test_MultiIndexCacheInstantiation(t *testing.T) { cfg: IndexCacheConfig{ Backend: "inmemory", }, - expectedType: &storecache.InMemoryIndexCache{}, + expectedType: &InMemoryIndexCache{}, }, "instantiate multiples backends - inmemory/redis": { cfg: IndexCacheConfig{ diff --git a/vendor/github.com/VictoriaMetrics/fastcache/LICENSE b/vendor/github.com/VictoriaMetrics/fastcache/LICENSE new file mode 100644 index 00000000000..9a8145e5834 --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2018 VictoriaMetrics + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/vendor/github.com/VictoriaMetrics/fastcache/README.md b/vendor/github.com/VictoriaMetrics/fastcache/README.md new file mode 100644 index 00000000000..b353214af69 --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/README.md @@ -0,0 +1,116 @@ +[![Build Status](https://github.com/VictoriaMetrics/fastcache/workflows/main/badge.svg)](https://github.com/VictoriaMetrics/fastcache/actions) +[![GoDoc](https://godoc.org/github.com/VictoriaMetrics/fastcache?status.svg)](http://godoc.org/github.com/VictoriaMetrics/fastcache) +[![Go Report](https://goreportcard.com/badge/github.com/VictoriaMetrics/fastcache)](https://goreportcard.com/report/github.com/VictoriaMetrics/fastcache) +[![codecov](https://codecov.io/gh/VictoriaMetrics/fastcache/branch/master/graph/badge.svg)](https://codecov.io/gh/VictoriaMetrics/fastcache) + +# fastcache - fast thread-safe inmemory cache for big number of entries in Go + +### Features + +* Fast. Performance scales on multi-core CPUs. See benchmark results below. +* Thread-safe. Concurrent goroutines may read and write into a single + cache instance. +* The fastcache is designed for storing big number of entries without + [GC overhead](https://syslog.ravelin.com/further-dangers-of-large-heaps-in-go-7a267b57d487). +* Fastcache automatically evicts old entries when reaching the maximum cache size + set on its creation. +* [Simple API](http://godoc.org/github.com/VictoriaMetrics/fastcache). +* Simple source code. +* Cache may be [saved to file](https://godoc.org/github.com/VictoriaMetrics/fastcache#Cache.SaveToFile) + and [loaded from file](https://godoc.org/github.com/VictoriaMetrics/fastcache#LoadFromFile). +* Works on [Google AppEngine](https://cloud.google.com/appengine/docs/go/). + + +### Benchmarks + +`Fastcache` performance is compared with [BigCache](https://github.com/allegro/bigcache), standard Go map +and [sync.Map](https://golang.org/pkg/sync/#Map). + +``` +GOMAXPROCS=4 go test github.com/VictoriaMetrics/fastcache -bench='Set|Get' -benchtime=10s +goos: linux +goarch: amd64 +pkg: github.com/VictoriaMetrics/fastcache +BenchmarkBigCacheSet-4 2000 10566656 ns/op 6.20 MB/s 4660369 B/op 6 allocs/op +BenchmarkBigCacheGet-4 2000 6902694 ns/op 9.49 MB/s 684169 B/op 131076 allocs/op +BenchmarkBigCacheSetGet-4 1000 17579118 ns/op 7.46 MB/s 5046744 B/op 131083 allocs/op +BenchmarkCacheSet-4 5000 3808874 ns/op 17.21 MB/s 1142 B/op 2 allocs/op +BenchmarkCacheGet-4 5000 3293849 ns/op 19.90 MB/s 1140 B/op 2 allocs/op +BenchmarkCacheSetGet-4 2000 8456061 ns/op 15.50 MB/s 2857 B/op 5 allocs/op +BenchmarkStdMapSet-4 2000 10559382 ns/op 6.21 MB/s 268413 B/op 65537 allocs/op +BenchmarkStdMapGet-4 5000 2687404 ns/op 24.39 MB/s 2558 B/op 13 allocs/op +BenchmarkStdMapSetGet-4 100 154641257 ns/op 0.85 MB/s 387405 B/op 65558 allocs/op +BenchmarkSyncMapSet-4 500 24703219 ns/op 2.65 MB/s 3426543 B/op 262411 allocs/op +BenchmarkSyncMapGet-4 5000 2265892 ns/op 28.92 MB/s 2545 B/op 79 allocs/op +BenchmarkSyncMapSetGet-4 1000 14595535 ns/op 8.98 MB/s 3417190 B/op 262277 allocs/op +``` + +`MB/s` column here actually means `millions of operations per second`. +As you can see, `fastcache` is faster than the `BigCache` in all the cases. +`fastcache` is faster than the standard Go map and `sync.Map` on workloads +with inserts. + + +### Limitations + +* Keys and values must be byte slices. Other types must be marshaled before + storing them in the cache. +* Big entries with sizes exceeding 64KB must be stored via [distinct API](http://godoc.org/github.com/VictoriaMetrics/fastcache#Cache.SetBig). +* There is no cache expiration. Entries are evicted from the cache only + on cache size overflow. Entry deadline may be stored inside the value in order + to implement cache expiration. + + +### Architecture details + +The cache uses ideas from [BigCache](https://github.com/allegro/bigcache): + +* The cache consists of many buckets, each with its own lock. + This helps scaling the performance on multi-core CPUs, since multiple + CPUs may concurrently access distinct buckets. +* Each bucket consists of a `hash(key) -> (key, value) position` map + and 64KB-sized byte slices (chunks) holding encoded `(key, value)` entries. + Each bucket contains only `O(chunksCount)` pointers. For instance, 64GB cache + would contain ~1M pointers, while similarly-sized `map[string][]byte` + would contain ~1B pointers for short keys and values. This would lead to + [huge GC overhead](https://syslog.ravelin.com/further-dangers-of-large-heaps-in-go-7a267b57d487). + +64KB-sized chunks reduce memory fragmentation and the total memory usage comparing +to a single big chunk per bucket. +Chunks are allocated off-heap if possible. This reduces total memory usage because +GC collects unused memory more frequently without the need in `GOGC` tweaking. + + +### Users + +* `Fastcache` has been extracted from [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics) sources. + See [this article](https://medium.com/devopslinks/victoriametrics-creating-the-best-remote-storage-for-prometheus-5d92d66787ac) + for more info about `VictoriaMetrics`. + + +### FAQ + +#### What is the difference between `fastcache` and other similar caches like [BigCache](https://github.com/allegro/bigcache) or [FreeCache](https://github.com/coocood/freecache)? + +* `Fastcache` is faster. See benchmark results above. +* `Fastcache` uses less memory due to lower heap fragmentation. This allows + saving many GBs of memory on multi-GB caches. +* `Fastcache` API [is simpler](http://godoc.org/github.com/VictoriaMetrics/fastcache). + The API is designed to be used in zero-allocation mode. + + +#### Why `fastcache` doesn't support cache expiration? + +Because we don't need cache expiration in [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics). +Cached entries inside `VictoriaMetrics` never expire. They are automatically evicted on cache size overflow. + +It is easy to implement cache expiration on top of `fastcache` by caching values +with marshaled deadlines and verifying deadlines after reading these values +from the cache. + + +#### Why `fastcache` doesn't support advanced features such as [thundering herd protection](https://en.wikipedia.org/wiki/Thundering_herd_problem) or callbacks on entries' eviction? + +Because these features would complicate the code and would make it slower. +`Fastcache` source code is simple - just copy-paste it and implement the feature you want +on top of it. diff --git a/vendor/github.com/VictoriaMetrics/fastcache/bigcache.go b/vendor/github.com/VictoriaMetrics/fastcache/bigcache.go new file mode 100644 index 00000000000..ea234b40d14 --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/bigcache.go @@ -0,0 +1,160 @@ +package fastcache + +import ( + "sync" + "sync/atomic" + + xxhash "github.com/cespare/xxhash/v2" +) + +// maxSubvalueLen is the maximum size of subvalue chunk. +// +// - 16 bytes are for subkey encoding +// - 4 bytes are for len(key)+len(value) encoding inside fastcache +// - 1 byte is implementation detail of fastcache +const maxSubvalueLen = chunkSize - 16 - 4 - 1 + +// maxKeyLen is the maximum size of key. +// +// - 16 bytes are for (hash + valueLen) +// - 4 bytes are for len(key)+len(subkey) +// - 1 byte is implementation detail of fastcache +const maxKeyLen = chunkSize - 16 - 4 - 1 + +// SetBig sets (k, v) to c where len(v) may exceed 64KB. +// +// GetBig must be used for reading stored values. +// +// The stored entry may be evicted at any time either due to cache +// overflow or due to unlikely hash collision. +// Pass higher maxBytes value to New if the added items disappear +// frequently. +// +// It is safe to store entries smaller than 64KB with SetBig. +// +// k and v contents may be modified after returning from SetBig. +func (c *Cache) SetBig(k, v []byte) { + atomic.AddUint64(&c.bigStats.SetBigCalls, 1) + if len(k) > maxKeyLen { + atomic.AddUint64(&c.bigStats.TooBigKeyErrors, 1) + return + } + valueLen := len(v) + valueHash := xxhash.Sum64(v) + + // Split v into chunks with up to 64Kb each. + subkey := getSubkeyBuf() + var i uint64 + for len(v) > 0 { + subkey.B = marshalUint64(subkey.B[:0], valueHash) + subkey.B = marshalUint64(subkey.B, uint64(i)) + i++ + subvalueLen := maxSubvalueLen + if len(v) < subvalueLen { + subvalueLen = len(v) + } + subvalue := v[:subvalueLen] + v = v[subvalueLen:] + c.Set(subkey.B, subvalue) + } + + // Write metavalue, which consists of valueHash and valueLen. + subkey.B = marshalUint64(subkey.B[:0], valueHash) + subkey.B = marshalUint64(subkey.B, uint64(valueLen)) + c.Set(k, subkey.B) + putSubkeyBuf(subkey) +} + +// GetBig searches for the value for the given k, appends it to dst +// and returns the result. +// +// GetBig returns only values stored via SetBig. It doesn't work +// with values stored via other methods. +// +// k contents may be modified after returning from GetBig. +func (c *Cache) GetBig(dst, k []byte) (r []byte) { + atomic.AddUint64(&c.bigStats.GetBigCalls, 1) + subkey := getSubkeyBuf() + dstWasNil := dst == nil + defer func() { + putSubkeyBuf(subkey) + if len(r) == 0 && dstWasNil { + // Guarantee that if the caller provided nil and this is a cache miss that + // the caller can accurately test for a cache miss with `if r == nil`. + r = nil + } + }() + + // Read and parse metavalue + subkey.B = c.Get(subkey.B[:0], k) + if len(subkey.B) == 0 { + // Nothing found. + return dst + } + if len(subkey.B) != 16 { + atomic.AddUint64(&c.bigStats.InvalidMetavalueErrors, 1) + return dst + } + valueHash := unmarshalUint64(subkey.B) + valueLen := unmarshalUint64(subkey.B[8:]) + + // Collect result from chunks. + dstLen := len(dst) + if n := dstLen + int(valueLen) - cap(dst); n > 0 { + dst = append(dst[:cap(dst)], make([]byte, n)...) + } + dst = dst[:dstLen] + var i uint64 + for uint64(len(dst)-dstLen) < valueLen { + subkey.B = marshalUint64(subkey.B[:0], valueHash) + subkey.B = marshalUint64(subkey.B, uint64(i)) + i++ + dstNew := c.Get(dst, subkey.B) + if len(dstNew) == len(dst) { + // Cannot find subvalue + return dst[:dstLen] + } + dst = dstNew + } + + // Verify the obtained value. + v := dst[dstLen:] + if uint64(len(v)) != valueLen { + atomic.AddUint64(&c.bigStats.InvalidValueLenErrors, 1) + return dst[:dstLen] + } + h := xxhash.Sum64(v) + if h != valueHash { + atomic.AddUint64(&c.bigStats.InvalidValueHashErrors, 1) + return dst[:dstLen] + } + return dst +} + +func getSubkeyBuf() *bytesBuf { + v := subkeyPool.Get() + if v == nil { + return &bytesBuf{} + } + return v.(*bytesBuf) +} + +func putSubkeyBuf(bb *bytesBuf) { + bb.B = bb.B[:0] + subkeyPool.Put(bb) +} + +var subkeyPool sync.Pool + +type bytesBuf struct { + B []byte +} + +func marshalUint64(dst []byte, u uint64) []byte { + return append(dst, byte(u>>56), byte(u>>48), byte(u>>40), byte(u>>32), byte(u>>24), byte(u>>16), byte(u>>8), byte(u)) +} + +func unmarshalUint64(src []byte) uint64 { + _ = src[7] + return uint64(src[0])<<56 | uint64(src[1])<<48 | uint64(src[2])<<40 | uint64(src[3])<<32 | uint64(src[4])<<24 | uint64(src[5])<<16 | uint64(src[6])<<8 | uint64(src[7]) +} diff --git a/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go b/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go new file mode 100644 index 00000000000..092ba37193b --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/fastcache.go @@ -0,0 +1,419 @@ +// Package fastcache implements fast in-memory cache. +// +// The package has been extracted from https://victoriametrics.com/ +package fastcache + +import ( + "fmt" + "sync" + "sync/atomic" + + xxhash "github.com/cespare/xxhash/v2" +) + +const bucketsCount = 512 + +const chunkSize = 64 * 1024 + +const bucketSizeBits = 40 + +const genSizeBits = 64 - bucketSizeBits + +const maxGen = 1<= maxBucketSize { + panic(fmt.Errorf("too big maxBytes=%d; should be smaller than %d", maxBytes, maxBucketSize)) + } + maxChunks := (maxBytes + chunkSize - 1) / chunkSize + b.chunks = make([][]byte, maxChunks) + b.m = make(map[uint64]uint64) + b.Reset() +} + +func (b *bucket) Reset() { + b.mu.Lock() + chunks := b.chunks + for i := range chunks { + putChunk(chunks[i]) + chunks[i] = nil + } + b.m = make(map[uint64]uint64) + b.idx = 0 + b.gen = 1 + atomic.StoreUint64(&b.getCalls, 0) + atomic.StoreUint64(&b.setCalls, 0) + atomic.StoreUint64(&b.misses, 0) + atomic.StoreUint64(&b.collisions, 0) + atomic.StoreUint64(&b.corruptions, 0) + b.mu.Unlock() +} + +func (b *bucket) cleanLocked() { + bGen := b.gen & ((1 << genSizeBits) - 1) + bIdx := b.idx + bm := b.m + for k, v := range bm { + gen := v >> bucketSizeBits + idx := v & ((1 << bucketSizeBits) - 1) + if (gen+1 == bGen || gen == maxGen && bGen == 1) && idx >= bIdx || gen == bGen && idx < bIdx { + continue + } + delete(bm, k) + } +} + +func (b *bucket) UpdateStats(s *Stats) { + s.GetCalls += atomic.LoadUint64(&b.getCalls) + s.SetCalls += atomic.LoadUint64(&b.setCalls) + s.Misses += atomic.LoadUint64(&b.misses) + s.Collisions += atomic.LoadUint64(&b.collisions) + s.Corruptions += atomic.LoadUint64(&b.corruptions) + + b.mu.RLock() + s.EntriesCount += uint64(len(b.m)) + bytesSize := uint64(0) + for _, chunk := range b.chunks { + bytesSize += uint64(cap(chunk)) + } + s.BytesSize += bytesSize + s.MaxBytesSize += uint64(len(b.chunks)) * chunkSize + b.mu.RUnlock() +} + +func (b *bucket) Set(k, v []byte, h uint64) { + atomic.AddUint64(&b.setCalls, 1) + if len(k) >= (1<<16) || len(v) >= (1<<16) { + // Too big key or value - its length cannot be encoded + // with 2 bytes (see below). Skip the entry. + return + } + var kvLenBuf [4]byte + kvLenBuf[0] = byte(uint16(len(k)) >> 8) + kvLenBuf[1] = byte(len(k)) + kvLenBuf[2] = byte(uint16(len(v)) >> 8) + kvLenBuf[3] = byte(len(v)) + kvLen := uint64(len(kvLenBuf) + len(k) + len(v)) + if kvLen >= chunkSize { + // Do not store too big keys and values, since they do not + // fit a chunk. + return + } + + chunks := b.chunks + needClean := false + b.mu.Lock() + idx := b.idx + idxNew := idx + kvLen + chunkIdx := idx / chunkSize + chunkIdxNew := idxNew / chunkSize + if chunkIdxNew > chunkIdx { + if chunkIdxNew >= uint64(len(chunks)) { + idx = 0 + idxNew = kvLen + chunkIdx = 0 + b.gen++ + if b.gen&((1< 0 { + gen := v >> bucketSizeBits + idx := v & ((1 << bucketSizeBits) - 1) + if gen == bGen && idx < b.idx || gen+1 == bGen && idx >= b.idx || gen == maxGen && bGen == 1 && idx >= b.idx { + chunkIdx := idx / chunkSize + if chunkIdx >= uint64(len(chunks)) { + // Corrupted data during the load from file. Just skip it. + atomic.AddUint64(&b.corruptions, 1) + goto end + } + chunk := chunks[chunkIdx] + idx %= chunkSize + if idx+4 >= chunkSize { + // Corrupted data during the load from file. Just skip it. + atomic.AddUint64(&b.corruptions, 1) + goto end + } + kvLenBuf := chunk[idx : idx+4] + keyLen := (uint64(kvLenBuf[0]) << 8) | uint64(kvLenBuf[1]) + valLen := (uint64(kvLenBuf[2]) << 8) | uint64(kvLenBuf[3]) + idx += 4 + if idx+keyLen+valLen >= chunkSize { + // Corrupted data during the load from file. Just skip it. + atomic.AddUint64(&b.corruptions, 1) + goto end + } + if string(k) == string(chunk[idx:idx+keyLen]) { + idx += keyLen + if returnDst { + dst = append(dst, chunk[idx:idx+valLen]...) + } + found = true + } else { + atomic.AddUint64(&b.collisions, 1) + } + } + } +end: + b.mu.RUnlock() + if !found { + atomic.AddUint64(&b.misses, 1) + } + return dst, found +} + +func (b *bucket) Del(h uint64) { + b.mu.Lock() + delete(b.m, h) + b.mu.Unlock() +} diff --git a/vendor/github.com/VictoriaMetrics/fastcache/file.go b/vendor/github.com/VictoriaMetrics/fastcache/file.go new file mode 100644 index 00000000000..dfbc0701d93 --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/file.go @@ -0,0 +1,421 @@ +package fastcache + +import ( + "encoding/binary" + "fmt" + "io" + "io/ioutil" + "os" + "path/filepath" + "regexp" + "runtime" + + "github.com/golang/snappy" +) + +// SaveToFile atomically saves cache data to the given filePath using a single +// CPU core. +// +// SaveToFile may be called concurrently with other operations on the cache. +// +// The saved data may be loaded with LoadFromFile*. +// +// See also SaveToFileConcurrent for faster saving to file. +func (c *Cache) SaveToFile(filePath string) error { + return c.SaveToFileConcurrent(filePath, 1) +} + +// SaveToFileConcurrent saves cache data to the given filePath using concurrency +// CPU cores. +// +// SaveToFileConcurrent may be called concurrently with other operations +// on the cache. +// +// The saved data may be loaded with LoadFromFile*. +// +// See also SaveToFile. +func (c *Cache) SaveToFileConcurrent(filePath string, concurrency int) error { + // Create dir if it doesn't exist. + dir := filepath.Dir(filePath) + if _, err := os.Stat(dir); err != nil { + if !os.IsNotExist(err) { + return fmt.Errorf("cannot stat %q: %s", dir, err) + } + if err := os.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("cannot create dir %q: %s", dir, err) + } + } + + // Save cache data into a temporary directory. + tmpDir, err := ioutil.TempDir(dir, "fastcache.tmp.") + if err != nil { + return fmt.Errorf("cannot create temporary dir inside %q: %s", dir, err) + } + defer func() { + if tmpDir != "" { + _ = os.RemoveAll(tmpDir) + } + }() + gomaxprocs := runtime.GOMAXPROCS(-1) + if concurrency <= 0 || concurrency > gomaxprocs { + concurrency = gomaxprocs + } + if err := c.save(tmpDir, concurrency); err != nil { + return fmt.Errorf("cannot save cache data to temporary dir %q: %s", tmpDir, err) + } + + // Remove old filePath contents, since os.Rename may return + // error if filePath dir exists. + if err := os.RemoveAll(filePath); err != nil { + return fmt.Errorf("cannot remove old contents at %q: %s", filePath, err) + } + if err := os.Rename(tmpDir, filePath); err != nil { + return fmt.Errorf("cannot move temporary dir %q to %q: %s", tmpDir, filePath, err) + } + tmpDir = "" + return nil +} + +// LoadFromFile loads cache data from the given filePath. +// +// See SaveToFile* for saving cache data to file. +func LoadFromFile(filePath string) (*Cache, error) { + return load(filePath, 0) +} + +// LoadFromFileOrNew tries loading cache data from the given filePath. +// +// The function falls back to creating new cache with the given maxBytes +// capacity if error occurs during loading the cache from file. +func LoadFromFileOrNew(filePath string, maxBytes int) *Cache { + c, err := load(filePath, maxBytes) + if err == nil { + return c + } + return New(maxBytes) +} + +func (c *Cache) save(dir string, workersCount int) error { + if err := saveMetadata(c, dir); err != nil { + return err + } + + // Save buckets by workersCount concurrent workers. + workCh := make(chan int, workersCount) + results := make(chan error) + for i := 0; i < workersCount; i++ { + go func(workerNum int) { + results <- saveBuckets(c.buckets[:], workCh, dir, workerNum) + }(i) + } + // Feed workers with work + for i := range c.buckets[:] { + workCh <- i + } + close(workCh) + + // Read results. + var err error + for i := 0; i < workersCount; i++ { + result := <-results + if result != nil && err == nil { + err = result + } + } + return err +} + +func load(filePath string, maxBytes int) (*Cache, error) { + maxBucketChunks, err := loadMetadata(filePath) + if err != nil { + return nil, err + } + if maxBytes > 0 { + maxBucketBytes := uint64((maxBytes + bucketsCount - 1) / bucketsCount) + expectedBucketChunks := (maxBucketBytes + chunkSize - 1) / chunkSize + if maxBucketChunks != expectedBucketChunks { + return nil, fmt.Errorf("cache file %s contains maxBytes=%d; want %d", filePath, maxBytes, expectedBucketChunks*chunkSize*bucketsCount) + } + } + + // Read bucket files from filePath dir. + d, err := os.Open(filePath) + if err != nil { + return nil, fmt.Errorf("cannot open %q: %s", filePath, err) + } + defer func() { + _ = d.Close() + }() + fis, err := d.Readdir(-1) + if err != nil { + return nil, fmt.Errorf("cannot read files from %q: %s", filePath, err) + } + results := make(chan error) + workersCount := 0 + var c Cache + for _, fi := range fis { + fn := fi.Name() + if fi.IsDir() || !dataFileRegexp.MatchString(fn) { + continue + } + workersCount++ + go func(dataPath string) { + results <- loadBuckets(c.buckets[:], dataPath, maxBucketChunks) + }(filePath + "/" + fn) + } + err = nil + for i := 0; i < workersCount; i++ { + result := <-results + if result != nil && err == nil { + err = result + } + } + if err != nil { + return nil, err + } + // Initialize buckets, which could be missing due to incomplete or corrupted files in the cache. + // It is better initializing such buckets instead of returning error, since the rest of buckets + // contain valid data. + for i := range c.buckets[:] { + b := &c.buckets[i] + if len(b.chunks) == 0 { + b.chunks = make([][]byte, maxBucketChunks) + b.m = make(map[uint64]uint64) + } + } + return &c, nil +} + +func saveMetadata(c *Cache, dir string) error { + metadataPath := dir + "/metadata.bin" + metadataFile, err := os.Create(metadataPath) + if err != nil { + return fmt.Errorf("cannot create %q: %s", metadataPath, err) + } + defer func() { + _ = metadataFile.Close() + }() + maxBucketChunks := uint64(cap(c.buckets[0].chunks)) + if err := writeUint64(metadataFile, maxBucketChunks); err != nil { + return fmt.Errorf("cannot write maxBucketChunks=%d to %q: %s", maxBucketChunks, metadataPath, err) + } + return nil +} + +func loadMetadata(dir string) (uint64, error) { + metadataPath := dir + "/metadata.bin" + metadataFile, err := os.Open(metadataPath) + if err != nil { + return 0, fmt.Errorf("cannot open %q: %s", metadataPath, err) + } + defer func() { + _ = metadataFile.Close() + }() + maxBucketChunks, err := readUint64(metadataFile) + if err != nil { + return 0, fmt.Errorf("cannot read maxBucketChunks from %q: %s", metadataPath, err) + } + if maxBucketChunks == 0 { + return 0, fmt.Errorf("invalid maxBucketChunks=0 read from %q", metadataPath) + } + return maxBucketChunks, nil +} + +var dataFileRegexp = regexp.MustCompile(`^data\.\d+\.bin$`) + +func saveBuckets(buckets []bucket, workCh <-chan int, dir string, workerNum int) error { + dataPath := fmt.Sprintf("%s/data.%d.bin", dir, workerNum) + dataFile, err := os.Create(dataPath) + if err != nil { + return fmt.Errorf("cannot create %q: %s", dataPath, err) + } + defer func() { + _ = dataFile.Close() + }() + zw := snappy.NewBufferedWriter(dataFile) + for bucketNum := range workCh { + if err := writeUint64(zw, uint64(bucketNum)); err != nil { + return fmt.Errorf("cannot write bucketNum=%d to %q: %s", bucketNum, dataPath, err) + } + if err := buckets[bucketNum].Save(zw); err != nil { + return fmt.Errorf("cannot save bucket[%d] to %q: %s", bucketNum, dataPath, err) + } + } + if err := zw.Close(); err != nil { + return fmt.Errorf("cannot close snappy.Writer for %q: %s", dataPath, err) + } + return nil +} + +func loadBuckets(buckets []bucket, dataPath string, maxChunks uint64) error { + dataFile, err := os.Open(dataPath) + if err != nil { + return fmt.Errorf("cannot open %q: %s", dataPath, err) + } + defer func() { + _ = dataFile.Close() + }() + zr := snappy.NewReader(dataFile) + for { + bucketNum, err := readUint64(zr) + if err == io.EOF { + // Reached the end of file. + return nil + } + if bucketNum >= uint64(len(buckets)) { + return fmt.Errorf("unexpected bucketNum read from %q: %d; must be smaller than %d", dataPath, bucketNum, len(buckets)) + } + if err := buckets[bucketNum].Load(zr, maxChunks); err != nil { + return fmt.Errorf("cannot load bucket[%d] from %q: %s", bucketNum, dataPath, err) + } + } +} + +func (b *bucket) Save(w io.Writer) error { + b.mu.Lock() + b.cleanLocked() + b.mu.Unlock() + + b.mu.RLock() + defer b.mu.RUnlock() + + // Store b.idx, b.gen and b.m to w. + + bIdx := b.idx + bGen := b.gen + chunksLen := 0 + for _, chunk := range b.chunks { + if chunk == nil { + break + } + chunksLen++ + } + kvs := make([]byte, 0, 2*8*len(b.m)) + var u64Buf [8]byte + for k, v := range b.m { + binary.LittleEndian.PutUint64(u64Buf[:], k) + kvs = append(kvs, u64Buf[:]...) + binary.LittleEndian.PutUint64(u64Buf[:], v) + kvs = append(kvs, u64Buf[:]...) + } + + if err := writeUint64(w, bIdx); err != nil { + return fmt.Errorf("cannot write b.idx: %s", err) + } + if err := writeUint64(w, bGen); err != nil { + return fmt.Errorf("cannot write b.gen: %s", err) + } + if err := writeUint64(w, uint64(len(kvs))/2/8); err != nil { + return fmt.Errorf("cannot write len(b.m): %s", err) + } + if _, err := w.Write(kvs); err != nil { + return fmt.Errorf("cannot write b.m: %s", err) + } + + // Store b.chunks to w. + if err := writeUint64(w, uint64(chunksLen)); err != nil { + return fmt.Errorf("cannot write len(b.chunks): %s", err) + } + for chunkIdx := 0; chunkIdx < chunksLen; chunkIdx++ { + chunk := b.chunks[chunkIdx][:chunkSize] + if _, err := w.Write(chunk); err != nil { + return fmt.Errorf("cannot write b.chunks[%d]: %s", chunkIdx, err) + } + } + + return nil +} + +func (b *bucket) Load(r io.Reader, maxChunks uint64) error { + if maxChunks == 0 { + return fmt.Errorf("the number of chunks per bucket cannot be zero") + } + bIdx, err := readUint64(r) + if err != nil { + return fmt.Errorf("cannot read b.idx: %s", err) + } + bGen, err := readUint64(r) + if err != nil { + return fmt.Errorf("cannot read b.gen: %s", err) + } + kvsLen, err := readUint64(r) + if err != nil { + return fmt.Errorf("cannot read len(b.m): %s", err) + } + kvsLen *= 2 * 8 + kvs := make([]byte, kvsLen) + if _, err := io.ReadFull(r, kvs); err != nil { + return fmt.Errorf("cannot read b.m: %s", err) + } + m := make(map[uint64]uint64, kvsLen/2/8) + for len(kvs) > 0 { + k := binary.LittleEndian.Uint64(kvs) + kvs = kvs[8:] + v := binary.LittleEndian.Uint64(kvs) + kvs = kvs[8:] + m[k] = v + } + + maxBytes := maxChunks * chunkSize + if maxBytes >= maxBucketSize { + return fmt.Errorf("too big maxBytes=%d; should be smaller than %d", maxBytes, maxBucketSize) + } + chunks := make([][]byte, maxChunks) + chunksLen, err := readUint64(r) + if err != nil { + return fmt.Errorf("cannot read len(b.chunks): %s", err) + } + if chunksLen > uint64(maxChunks) { + return fmt.Errorf("chunksLen=%d cannot exceed maxChunks=%d", chunksLen, maxChunks) + } + currChunkIdx := bIdx / chunkSize + if currChunkIdx > 0 && currChunkIdx >= chunksLen { + return fmt.Errorf("too big bIdx=%d; should be smaller than %d", bIdx, chunksLen*chunkSize) + } + for chunkIdx := uint64(0); chunkIdx < chunksLen; chunkIdx++ { + chunk := getChunk() + chunks[chunkIdx] = chunk + if _, err := io.ReadFull(r, chunk); err != nil { + // Free up allocated chunks before returning the error. + for _, chunk := range chunks { + if chunk != nil { + putChunk(chunk) + } + } + return fmt.Errorf("cannot read b.chunks[%d]: %s", chunkIdx, err) + } + } + // Adjust len for the chunk pointed by currChunkIdx. + if chunksLen > 0 { + chunkLen := bIdx % chunkSize + chunks[currChunkIdx] = chunks[currChunkIdx][:chunkLen] + } + + b.mu.Lock() + for _, chunk := range b.chunks { + putChunk(chunk) + } + b.chunks = chunks + b.m = m + b.idx = bIdx + b.gen = bGen + b.mu.Unlock() + + return nil +} + +func writeUint64(w io.Writer, u uint64) error { + var u64Buf [8]byte + binary.LittleEndian.PutUint64(u64Buf[:], u) + _, err := w.Write(u64Buf[:]) + return err +} + +func readUint64(r io.Reader) (uint64, error) { + var u64Buf [8]byte + if _, err := io.ReadFull(r, u64Buf[:]); err != nil { + return 0, err + } + u := binary.LittleEndian.Uint64(u64Buf[:]) + return u, nil +} diff --git a/vendor/github.com/VictoriaMetrics/fastcache/malloc_heap.go b/vendor/github.com/VictoriaMetrics/fastcache/malloc_heap.go new file mode 100644 index 00000000000..810d460b79e --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/malloc_heap.go @@ -0,0 +1,12 @@ +//go:build appengine || windows +// +build appengine windows + +package fastcache + +func getChunk() []byte { + return make([]byte, chunkSize) +} + +func putChunk(chunk []byte) { + // No-op. +} diff --git a/vendor/github.com/VictoriaMetrics/fastcache/malloc_mmap.go b/vendor/github.com/VictoriaMetrics/fastcache/malloc_mmap.go new file mode 100644 index 00000000000..e24d578bf75 --- /dev/null +++ b/vendor/github.com/VictoriaMetrics/fastcache/malloc_mmap.go @@ -0,0 +1,54 @@ +//go:build !appengine && !windows +// +build !appengine,!windows + +package fastcache + +import ( + "fmt" + "sync" + "unsafe" + + "golang.org/x/sys/unix" +) + +const chunksPerAlloc = 1024 + +var ( + freeChunks []*[chunkSize]byte + freeChunksLock sync.Mutex +) + +func getChunk() []byte { + freeChunksLock.Lock() + if len(freeChunks) == 0 { + // Allocate offheap memory, so GOGC won't take into account cache size. + // This should reduce free memory waste. + data, err := unix.Mmap(-1, 0, chunkSize*chunksPerAlloc, unix.PROT_READ|unix.PROT_WRITE, unix.MAP_ANON|unix.MAP_PRIVATE) + if err != nil { + panic(fmt.Errorf("cannot allocate %d bytes via mmap: %s", chunkSize*chunksPerAlloc, err)) + } + for len(data) > 0 { + p := (*[chunkSize]byte)(unsafe.Pointer(&data[0])) + freeChunks = append(freeChunks, p) + data = data[chunkSize:] + } + } + n := len(freeChunks) - 1 + p := freeChunks[n] + freeChunks[n] = nil + freeChunks = freeChunks[:n] + freeChunksLock.Unlock() + return p[:] +} + +func putChunk(chunk []byte) { + if chunk == nil { + return + } + chunk = chunk[:chunkSize] + p := (*[chunkSize]byte)(unsafe.Pointer(&chunk[0])) + + freeChunksLock.Lock() + freeChunks = append(freeChunks, p) + freeChunksLock.Unlock() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 5da1efba9a0..a810d39f4e4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -93,6 +93,9 @@ github.com/AzureAD/microsoft-authentication-library-for-go/apps/public # github.com/Masterminds/squirrel v1.5.4 ## explicit; go 1.14 github.com/Masterminds/squirrel +# github.com/VictoriaMetrics/fastcache v1.12.1 +## explicit; go 1.13 +github.com/VictoriaMetrics/fastcache # github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 ## explicit github.com/alecthomas/template From f60f99a8d68fcc61a2c45ea1482bff7a7020429d Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 24 Oct 2023 22:37:57 -0700 Subject: [PATCH 2/4] changelog Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a4a4105f3f3..2ea7b2db011 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ * [CHANGE] Query Frontend/Querier: Make build info API disabled by default and add feature flag `api.build-info-enabled` to enable it. #5533 * [CHANGE] Purger: Do no use S3 tenant kms key when uploading deletion marker. #5575 * [CHANGE] Ingester: Shipper always upload compacted blocks. #5625 +* [CHANGE] Store Gateway: Add a new fastcache based inmemory index cache. #5619 * [FEATURE] Store Gateway: Add `max_downloaded_bytes_per_request` to limit max bytes to download per store gateway request. * [FEATURE] Added 2 flags `-alertmanager.alertmanager-client.grpc-max-send-msg-size` and ` -alertmanager.alertmanager-client.grpc-max-recv-msg-size` to configure alert manager grpc client message size limits. #5338 * [FEATURE] Query Frontend: Add `cortex_rejected_queries_total` metric for throttled queries. #5356 From 24450dc97d3f0dc33b7cf334e76c71b027014039 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Wed, 1 Nov 2023 22:04:12 -0700 Subject: [PATCH 3/4] add benchmarks Signed-off-by: Ben Ye --- pkg/storage/tsdb/inmemory_index_cache_test.go | 282 +++++++++++++++++- 1 file changed, 278 insertions(+), 4 deletions(-) diff --git a/pkg/storage/tsdb/inmemory_index_cache_test.go b/pkg/storage/tsdb/inmemory_index_cache_test.go index f01896d1ea2..aba9099a54f 100644 --- a/pkg/storage/tsdb/inmemory_index_cache_test.go +++ b/pkg/storage/tsdb/inmemory_index_cache_test.go @@ -4,10 +4,6 @@ import ( "bytes" "context" "fmt" - "strconv" - "strings" - "testing" - "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/oklog/ulid" @@ -15,8 +11,14 @@ import ( prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/require" storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/tenancy" + "math/rand" + "strconv" + "strings" + "testing" + "time" ) func TestInMemoryIndexCache_UpdateItem(t *testing.T) { @@ -139,3 +141,275 @@ func TestInMemoryIndexCacheSetOverflow(t *testing.T) { cache.StoreSeries(id, 2, []byte(sb.String()), tenancy.DefaultTenant) testutil.Equals(t, float64(1), prom_testutil.ToFloat64(counter)) } + +func BenchmarkInMemoryIndexCacheStore(b *testing.B) { + logger := log.NewNopLogger() + cfg := InMemoryIndexCacheConfig{ + MaxSizeBytes: uint64(storecache.DefaultInMemoryIndexCacheConfig.MaxSize), + } + + blockID := ulid.MustNew(ulid.Now(), nil) + r := rand.New(rand.NewSource(time.Now().Unix())) + // 1KB is a common size for series + seriesData := make([]byte, 1024) + r.Read(seriesData) + // 10MB might happen for large postings. + postingData := make([]byte, 10*1024*1024) + r.Read(postingData) + + b.Run("FastCache", func(b *testing.B) { + cache, err := newInMemoryIndexCache(cfg, logger, prometheus.NewRegistry()) + require.NoError(b, err) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.StoreSeries(blockID, storage.SeriesRef(i), seriesData, tenancy.DefaultTenant) + } + }) + + b.Run("ThanosCache", func(b *testing.B) { + cache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, prometheus.NewRegistry(), storecache.DefaultInMemoryIndexCacheConfig) + require.NoError(b, err) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.StoreSeries(blockID, storage.SeriesRef(i), seriesData, tenancy.DefaultTenant) + } + }) + + b.Run("FastCacheLargeItem", func(b *testing.B) { + cache, err := newInMemoryIndexCache(cfg, logger, prometheus.NewRegistry()) + require.NoError(b, err) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.StoreSeries(blockID, storage.SeriesRef(i), postingData, tenancy.DefaultTenant) + } + }) + + b.Run("ThanosCacheLargeItem", func(b *testing.B) { + cache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, prometheus.NewRegistry(), storecache.DefaultInMemoryIndexCacheConfig) + require.NoError(b, err) + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.StoreSeries(blockID, storage.SeriesRef(i), postingData, tenancy.DefaultTenant) + } + }) +} + +func BenchmarkInMemoryIndexCacheStoreConcurrent(b *testing.B) { + logger := log.NewNopLogger() + cfg := InMemoryIndexCacheConfig{ + MaxSizeBytes: uint64(storecache.DefaultInMemoryIndexCacheConfig.MaxSize), + } + + blockID := ulid.MustNew(ulid.Now(), nil) + r := rand.New(rand.NewSource(time.Now().Unix())) + // 1KB is a common size for series + seriesData := make([]byte, 1024) + r.Read(seriesData) + // 10MB might happen for large postings. + postingData := make([]byte, 10*1024*1024) + r.Read(postingData) + + b.Run("FastCache", func(b *testing.B) { + cache, err := newInMemoryIndexCache(cfg, logger, prometheus.NewRegistry()) + require.NoError(b, err) + ch := make(chan int) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < 500; i++ { + go func() { + for j := range ch { + cache.StoreSeries(blockID, storage.SeriesRef(j), seriesData, tenancy.DefaultTenant) + testutil.Ok(b, err) + } + }() + } + + for i := 0; i < b.N; i++ { + ch <- i + } + close(ch) + }) + + b.Run("ThanosCache", func(b *testing.B) { + cache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, prometheus.NewRegistry(), storecache.DefaultInMemoryIndexCacheConfig) + require.NoError(b, err) + ch := make(chan int) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < 500; i++ { + go func() { + for j := range ch { + cache.StoreSeries(blockID, storage.SeriesRef(j), seriesData, tenancy.DefaultTenant) + testutil.Ok(b, err) + } + }() + } + + for i := 0; i < b.N; i++ { + ch <- i + } + close(ch) + }) + + b.Run("FastCacheLargeItem", func(b *testing.B) { + cache, err := newInMemoryIndexCache(cfg, logger, prometheus.NewRegistry()) + require.NoError(b, err) + ch := make(chan int) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < 500; i++ { + go func() { + for j := range ch { + cache.StoreSeries(blockID, storage.SeriesRef(j), postingData, tenancy.DefaultTenant) + testutil.Ok(b, err) + } + }() + } + + for i := 0; i < b.N; i++ { + ch <- i + } + close(ch) + }) + + b.Run("ThanosCacheLargeItem", func(b *testing.B) { + cache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, prometheus.NewRegistry(), storecache.DefaultInMemoryIndexCacheConfig) + require.NoError(b, err) + ch := make(chan int) + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < 500; i++ { + go func() { + for j := range ch { + cache.StoreSeries(blockID, storage.SeriesRef(j), postingData, tenancy.DefaultTenant) + testutil.Ok(b, err) + } + }() + } + + for i := 0; i < b.N; i++ { + ch <- i + } + close(ch) + }) +} + +func BenchmarkInMemoryIndexCacheFetch(b *testing.B) { + logger := log.NewNopLogger() + cfg := InMemoryIndexCacheConfig{ + MaxSizeBytes: uint64(storecache.DefaultInMemoryIndexCacheConfig.MaxSize), + } + + blockID := ulid.MustNew(ulid.Now(), nil) + r := rand.New(rand.NewSource(time.Now().Unix())) + // 1KB is a common size for series + seriesData := make([]byte, 1024) + r.Read(seriesData) + ctx := context.Background() + items := 10000 + ids := make([]storage.SeriesRef, items) + for i := 0; i < items; i++ { + ids[i] = storage.SeriesRef(i) + } + + b.Run("FastCache", func(b *testing.B) { + cache, err := newInMemoryIndexCache(cfg, logger, prometheus.NewRegistry()) + require.NoError(b, err) + for i := 0; i < items; i++ { + cache.StoreSeries(blockID, storage.SeriesRef(i), seriesData, tenancy.DefaultTenant) + } + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.FetchMultiSeries(ctx, blockID, ids, tenancy.DefaultTenant) + } + }) + + b.Run("ThanosCache", func(b *testing.B) { + cache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, prometheus.NewRegistry(), storecache.DefaultInMemoryIndexCacheConfig) + require.NoError(b, err) + for i := 0; i < items; i++ { + cache.StoreSeries(blockID, storage.SeriesRef(i), seriesData, tenancy.DefaultTenant) + } + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + cache.FetchMultiSeries(ctx, blockID, ids, tenancy.DefaultTenant) + } + }) +} + +func BenchmarkInMemoryIndexCacheFetchConcurrent(b *testing.B) { + logger := log.NewNopLogger() + cfg := InMemoryIndexCacheConfig{ + MaxSizeBytes: uint64(storecache.DefaultInMemoryIndexCacheConfig.MaxSize), + } + + blockID := ulid.MustNew(ulid.Now(), nil) + r := rand.New(rand.NewSource(time.Now().Unix())) + // 1KB is a common size for series + seriesData := make([]byte, 1024) + r.Read(seriesData) + ctx := context.Background() + items := 10000 + ids := make([]storage.SeriesRef, items) + for i := 0; i < items; i++ { + ids[i] = storage.SeriesRef(i) + } + + b.Run("FastCache", func(b *testing.B) { + cache, err := newInMemoryIndexCache(cfg, logger, prometheus.NewRegistry()) + require.NoError(b, err) + for i := 0; i < items; i++ { + cache.StoreSeries(blockID, storage.SeriesRef(i), seriesData, tenancy.DefaultTenant) + } + b.ReportAllocs() + b.ResetTimer() + + ch := make(chan int) + for i := 0; i < 500; i++ { + go func() { + for range ch { + cache.FetchMultiSeries(ctx, blockID, ids, tenancy.DefaultTenant) + } + }() + } + + for i := 0; i < b.N; i++ { + ch <- i + } + close(ch) + }) + + b.Run("ThanosCache", func(b *testing.B) { + cache, err := storecache.NewInMemoryIndexCacheWithConfig(logger, nil, prometheus.NewRegistry(), storecache.DefaultInMemoryIndexCacheConfig) + require.NoError(b, err) + for i := 0; i < items; i++ { + cache.StoreSeries(blockID, storage.SeriesRef(i), seriesData, tenancy.DefaultTenant) + } + b.ReportAllocs() + b.ResetTimer() + + ch := make(chan int) + for i := 0; i < 500; i++ { + go func() { + for range ch { + cache.FetchMultiSeries(ctx, blockID, ids, tenancy.DefaultTenant) + } + }() + } + + for i := 0; i < b.N; i++ { + ch <- i + } + close(ch) + }) +} From eac871c655a0d162374261a05671f76dc773f4f7 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Fri, 10 Nov 2023 13:01:05 -0800 Subject: [PATCH 4/4] fix conflicts Signed-off-by: Ben Ye --- go.sum | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/go.sum b/go.sum index c00e2291a4e..54dabf50467 100644 --- a/go.sum +++ b/go.sum @@ -646,10 +646,10 @@ github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OneOfOne/xxhash v1.2.6 h1:U68crOE3y3MPttCMQGywZOLrTeF5HHJ3/vDBCJn9/bA= github.com/OneOfOne/xxhash v1.2.6/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q= -github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm/4RlzPXRlREEwqTHAN3T56Bv2ITsFT3gY= -github.com/ajstarks/deck/generate v0.0.0-20210309230005-c3f852c02e19/go.mod h1:T13YZdzov6OU0A1+RfKZiZN9ca6VeKdBdyDV+BY97Tk= github.com/VictoriaMetrics/fastcache v1.12.1 h1:i0mICQuojGDL3KblA7wUNlY5lOK6a4bwt3uRKnkZU40= github.com/VictoriaMetrics/fastcache v1.12.1/go.mod h1:tX04vaqcNoQeGLD+ra5pU5sWkuxnzWhEzLwhP9w653o= +github.com/ajstarks/deck v0.0.0-20200831202436-30c9fc6549a9/go.mod h1:JynElWSGnm/4RlzPXRlREEwqTHAN3T56Bv2ITsFT3gY= +github.com/ajstarks/deck/generate v0.0.0-20210309230005-c3f852c02e19/go.mod h1:T13YZdzov6OU0A1+RfKZiZN9ca6VeKdBdyDV+BY97Tk= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/ajstarks/svgo v0.0.0-20211024235047-1546f124cd8b/go.mod h1:1KcenG0jGWcpt8ov532z81sp/kMMUG485J2InIOyADM= github.com/alecthomas/kingpin v1.3.8-0.20210301060133-17f40c25f497 h1:aDITxVUQ/3KBhpVWX57Vo9ntGTxoRw1F0T6/x/tRzNU= @@ -669,12 +669,9 @@ github.com/alicebob/miniredis/v2 v2.30.4 h1:8S4/o1/KoUArAGbGwPxcwf0krlzceva2XVOS github.com/alicebob/miniredis/v2 v2.30.4/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg= github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible h1:9gWa46nstkJ9miBReJcN8Gq34cBFbzSpQZVVT9N09TM= github.com/aliyun/aliyun-oss-go-sdk v2.2.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= -<<<<<<< HEAD -github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= -======= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156 h1:eMwmnE/GDgah4HI848JfFxHt+iPb26b4zyfspmqY0/8= github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= ->>>>>>> 7e5fd9b4b (replace inmemory index cache to fastcache based implementation) +github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/apache/arrow/go/v10 v10.0.1/go.mod h1:YvhnlEePVnBS4+0z3fhPfUy7W1Ikj0Ih0vcRo/gZ1M0= github.com/apache/arrow/go/v11 v11.0.0/go.mod h1:Eg5OsL5H+e299f7u5ssuXsuHQVEGC4xei5aX110hRiI= @@ -1521,13 +1518,8 @@ github.com/thanos-io/objstore v0.0.0-20230921130928-63a603e651ed h1:iWQdY3S6DpWj github.com/thanos-io/objstore v0.0.0-20230921130928-63a603e651ed/go.mod h1:oJ82xgcBDzGJrEgUsjlTj6n01+ZWUMMUR8BlZzX5xDE= github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591 h1:6bZbFM+Mvy2kL8BeL8TJ5+5pV3sUR2PSLaZyw911rtQ= github.com/thanos-io/promql-engine v0.0.0-20231013104847-4517c0d5f591/go.mod h1:vfXJv1JXNdLfHnjsHsLLJl5tyI7KblF76Wo5lZ9YC4Q= -<<<<<<< HEAD github.com/thanos-io/thanos v0.32.5-0.20231103115946-463a6ce8b53c h1:hMpXd1ybZB/vnR3+zex93va42rQ++2E0qi2wVSf3AwY= github.com/thanos-io/thanos v0.32.5-0.20231103115946-463a6ce8b53c/go.mod h1:q+0MQPBugkBKZBFSOec4WV4EcuKJU6tgMI0i4M2znpY= -======= -github.com/thanos-io/thanos v0.32.5-0.20231025032614-da62b6bf84b8 h1:mWlY64XMYTFeCk4WziW33xerKsp+BWOck6g77cz9ZgA= -github.com/thanos-io/thanos v0.32.5-0.20231025032614-da62b6bf84b8/go.mod h1:eVFfte7jP1aTcTkQcZEj5/P9rCeMFHllEqfNZqirLLA= ->>>>>>> 7e5fd9b4b (replace inmemory index cache to fastcache based implementation) github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab h1:7ZR3hmisBWw77ZpO1/o86g+JV3VKlk3d48jopJxzTjU= github.com/themihai/gomemcache v0.0.0-20180902122335-24332e2d58ab/go.mod h1:eheTFp954zcWZXCU8d0AT76ftsQOTo4DTqkN/h3k1MY= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=