Skip to content

Commit 795dd59

Browse files
authored
Merge pull request #1482 from cortexproject/chunk-cache-stubs
Optionally write stub entries to the chunk cache from ingesters
2 parents 6d684f6 + 1afc6b4 commit 795dd59

File tree

5 files changed

+25
-10
lines changed

5 files changed

+25
-10
lines changed

docs/arguments.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ The ingester query API was improved over time, but defaults to the old behaviour
116116

117117
When `push` requests arrive, pre-allocate this many slots to decode them. Tune this setting to reduce memory allocations and garbage. The optimum value will depend on how many labels are sent with your timeseries samples.
118118

119+
- `-store.chunk-cache-stubs`
120+
121+
Where you don't want to cache every chunk written by ingesters, but you do want to take advantage of chunk write deduplication, this option will make ingesters write a placeholder to the cache for each chunk.
122+
Make sure you configure ingesters with a different cache to queriers, which need the whole value.
123+
119124
## Ingester, Distributor & Querier limits.
120125

121126
Cortex implements various limits on the requests it can process, in order to prevent a single tenant overwhelming the cluster. There are various default global limits which apply to all tenants which can be set on the command line. These limits can also be overridden on a per-tenant basis, using a configuration file. Specify the filename for the override configuration file using the `-limits.per-user-override-config=<filename>` flag. The override file will be re-read every 10 seconds by default - this can also be controlled using the `-limits.per-user-override-period=10s` flag.

pkg/chunk/cache/cache_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func testCacheMultiple(t *testing.T, cache cache.Cache, keys []string, chunks []
9696
func testChunkFetcher(t *testing.T, c cache.Cache, keys []string, chunks []chunk.Chunk) {
9797
fetcher, err := chunk.NewChunkFetcher(cache.Config{
9898
Cache: c,
99-
}, nil)
99+
}, false, nil)
100100
require.NoError(t, err)
101101
defer fetcher.Stop()
102102

pkg/chunk/chunk_store.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,15 @@ type StoreConfig struct {
6464

6565
// Limits query start time to be greater than now() - MaxLookBackPeriod, if set.
6666
MaxLookBackPeriod time.Duration `yaml:"max_look_back_period"`
67+
68+
// Not visible in yaml because the setting shouldn't be common between ingesters and queriers
69+
chunkCacheStubs bool // don't write the full chunk to cache, just a stub entry
6770
}
6871

6972
// RegisterFlags adds the flags required to config this to the given FlagSet
7073
func (cfg *StoreConfig) RegisterFlags(f *flag.FlagSet) {
7174
cfg.ChunkCacheConfig.RegisterFlagsWithPrefix("", "Cache config for chunks. ", f)
75+
f.BoolVar(&cfg.chunkCacheStubs, "store.chunk-cache-stubs", false, "If true, don't write the full chunk to cache, just a stub entry.")
7276
cfg.WriteDedupeCacheConfig.RegisterFlagsWithPrefix("store.index-cache-write.", "Cache config for index entry writing. ", f)
7377

7478
f.DurationVar(&cfg.MinChunkAge, "store.min-chunk-age", 0, "Minimum time between chunk update and being saved to the store.")
@@ -92,7 +96,7 @@ type store struct {
9296
}
9397

9498
func newStore(cfg StoreConfig, schema Schema, index IndexClient, chunks ObjectClient, limits *validation.Overrides) (Store, error) {
95-
fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, chunks)
99+
fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, cfg.chunkCacheStubs, chunks)
96100
if err != nil {
97101
return nil, err
98102
}

pkg/chunk/chunk_store_utils.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,9 @@ outer:
5454
// and writing back any misses to the cache. Also responsible for decoding
5555
// chunks from the cache, in parallel.
5656
type Fetcher struct {
57-
storage ObjectClient
58-
cache cache.Cache
57+
storage ObjectClient
58+
cache cache.Cache
59+
cacheStubs bool
5960

6061
wait sync.WaitGroup
6162
decodeRequests chan decodeRequest
@@ -72,7 +73,7 @@ type decodeResponse struct {
7273
}
7374

7475
// NewChunkFetcher makes a new ChunkFetcher.
75-
func NewChunkFetcher(cfg cache.Config, storage ObjectClient) (*Fetcher, error) {
76+
func NewChunkFetcher(cfg cache.Config, cacheStubs bool, storage ObjectClient) (*Fetcher, error) {
7677
cache, err := cache.New(cfg)
7778
if err != nil {
7879
return nil, err
@@ -81,6 +82,7 @@ func NewChunkFetcher(cfg cache.Config, storage ObjectClient) (*Fetcher, error) {
8182
c := &Fetcher{
8283
storage: storage,
8384
cache: cache,
85+
cacheStubs: cacheStubs,
8486
decodeRequests: make(chan decodeRequest),
8587
}
8688

@@ -149,10 +151,14 @@ func (c *Fetcher) writeBackCache(ctx context.Context, chunks []Chunk) error {
149151
keys := make([]string, 0, len(chunks))
150152
bufs := make([][]byte, 0, len(chunks))
151153
for i := range chunks {
152-
encoded, err := chunks[i].Encoded()
153-
// TODO don't fail, just log and conitnue?
154-
if err != nil {
155-
return err
154+
var encoded []byte
155+
var err error
156+
if !c.cacheStubs {
157+
encoded, err = chunks[i].Encoded()
158+
// TODO don't fail, just log and conitnue?
159+
if err != nil {
160+
return err
161+
}
156162
}
157163

158164
keys = append(keys, chunks[i].ExternalKey())

pkg/chunk/series_store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ type seriesStore struct {
6868
}
6969

7070
func newSeriesStore(cfg StoreConfig, schema Schema, index IndexClient, chunks ObjectClient, limits *validation.Overrides) (Store, error) {
71-
fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, chunks)
71+
fetcher, err := NewChunkFetcher(cfg.ChunkCacheConfig, cfg.chunkCacheStubs, chunks)
7272
if err != nil {
7373
return nil, err
7474
}

0 commit comments

Comments
 (0)