Skip to content

Cache Expended Posting on ingesters #6296

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Nov 5, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
* [ENHANCEMENT] Ingester: Add matchers to ingester LabelNames() and LabelNamesStream() RPC. #6209
* [ENHANCEMENT] Ingester/Store Gateway Clients: Introduce an experimental HealthCheck handler to quickly fail requests directed to unhealthy targets. #6225 #6257
* [ENHANCEMENT] Upgrade build image and Go version to 1.23.2. #6261 #6262
* [ENHANCEMENT] Ingester: Introduce a new experimental feature for caching expanded postings on the ingester. #6296
* [ENHANCEMENT] Querier/Ruler: Expose `store_gateway_consistency_check_max_attempts` for max retries when querying store gateway in consistency check. #6276
* [ENHANCEMENT] StoreGateway: Add new `cortex_bucket_store_chunk_pool_inuse_bytes` metric to track the usage in chunk pool. #6310
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
Expand Down
34 changes: 34 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -1544,4 +1544,38 @@ blocks_storage:
# [EXPERIMENTAL] True to enable native histogram.
# CLI flag: -blocks-storage.tsdb.enable-native-histograms
[enable_native_histograms: <boolean> | default = false]

# [EXPERIMENTAL] If enabled, ingesters will cache expanded postings when
# querying blocks. Caching can be configured separately for the head and
# compacted blocks.
expanded_postings_cache:
# If enabled, ingesters will cache expanded postings for the head block.
# Only queries with with an equal matcher for metric __name__ are cached.
head:
# Whether the postings cache is enabled or not
# CLI flag: -blocks-storage.expanded_postings_cache.head.enabled
[enabled: <boolean> | default = false]

# Max bytes for postings cache
# CLI flag: -blocks-storage.expanded_postings_cache.head.max-bytes
[max_bytes: <int> | default = 10485760]

# TTL for postings cache
# CLI flag: -blocks-storage.expanded_postings_cache.head.ttl
[ttl: <duration> | default = 10m]

# If enabled, ingesters will cache expanded postings for the compacted
# blocks. The cache is shared between all blocks.
blocks:
# Whether the postings cache is enabled or not
# CLI flag: -blocks-storage.expanded_postings_cache.block.enabled
[enabled: <boolean> | default = false]

# Max bytes for postings cache
# CLI flag: -blocks-storage.expanded_postings_cache.block.max-bytes
[max_bytes: <int> | default = 10485760]

# TTL for postings cache
# CLI flag: -blocks-storage.expanded_postings_cache.block.ttl
[ttl: <duration> | default = 10m]
```
34 changes: 34 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -1635,4 +1635,38 @@ blocks_storage:
# [EXPERIMENTAL] True to enable native histogram.
# CLI flag: -blocks-storage.tsdb.enable-native-histograms
[enable_native_histograms: <boolean> | default = false]

# [EXPERIMENTAL] If enabled, ingesters will cache expanded postings when
# querying blocks. Caching can be configured separately for the head and
# compacted blocks.
expanded_postings_cache:
# If enabled, ingesters will cache expanded postings for the head block.
# Only queries with with an equal matcher for metric __name__ are cached.
head:
# Whether the postings cache is enabled or not
# CLI flag: -blocks-storage.expanded_postings_cache.head.enabled
[enabled: <boolean> | default = false]

# Max bytes for postings cache
# CLI flag: -blocks-storage.expanded_postings_cache.head.max-bytes
[max_bytes: <int> | default = 10485760]

# TTL for postings cache
# CLI flag: -blocks-storage.expanded_postings_cache.head.ttl
[ttl: <duration> | default = 10m]

# If enabled, ingesters will cache expanded postings for the compacted
# blocks. The cache is shared between all blocks.
blocks:
# Whether the postings cache is enabled or not
# CLI flag: -blocks-storage.expanded_postings_cache.block.enabled
[enabled: <boolean> | default = false]

# Max bytes for postings cache
# CLI flag: -blocks-storage.expanded_postings_cache.block.max-bytes
[max_bytes: <int> | default = 10485760]

# TTL for postings cache
# CLI flag: -blocks-storage.expanded_postings_cache.block.ttl
[ttl: <duration> | default = 10m]
```
34 changes: 34 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2081,6 +2081,40 @@ tsdb:
# [EXPERIMENTAL] True to enable native histogram.
# CLI flag: -blocks-storage.tsdb.enable-native-histograms
[enable_native_histograms: <boolean> | default = false]

# [EXPERIMENTAL] If enabled, ingesters will cache expanded postings when
# querying blocks. Caching can be configured separately for the head and
# compacted blocks.
expanded_postings_cache:
# If enabled, ingesters will cache expanded postings for the head block.
# Only queries with with an equal matcher for metric __name__ are cached.
head:
# Whether the postings cache is enabled or not
# CLI flag: -blocks-storage.expanded_postings_cache.head.enabled
[enabled: <boolean> | default = false]

# Max bytes for postings cache
# CLI flag: -blocks-storage.expanded_postings_cache.head.max-bytes
[max_bytes: <int> | default = 10485760]

# TTL for postings cache
# CLI flag: -blocks-storage.expanded_postings_cache.head.ttl
[ttl: <duration> | default = 10m]

# If enabled, ingesters will cache expanded postings for the compacted
# blocks. The cache is shared between all blocks.
blocks:
# Whether the postings cache is enabled or not
# CLI flag: -blocks-storage.expanded_postings_cache.block.enabled
[enabled: <boolean> | default = false]

# Max bytes for postings cache
# CLI flag: -blocks-storage.expanded_postings_cache.block.max-bytes
[max_bytes: <int> | default = 10485760]

# TTL for postings cache
# CLI flag: -blocks-storage.expanded_postings_cache.block.ttl
[ttl: <duration> | default = 10m]
```

### `compactor_config`
Expand Down
197 changes: 197 additions & 0 deletions integration/query_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,203 @@ func TestDisableChunkTrimmingFuzz(t *testing.T) {
}
}

func TestExpandedPostingsCacheFuzz(t *testing.T) {
stableCortexImage := "quay.io/cortexproject/cortex:v1.18.0"
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul1 := e2edb.NewConsulWithName("consul1")
consul2 := e2edb.NewConsulWithName("consul2")
require.NoError(t, s.StartAndWaitReady(consul1, consul2))

flags1 := mergeFlags(
AlertmanagerLocalFlags(),
map[string]string{
"-store.engine": blocksStorageEngine,
"-blocks-storage.backend": "filesystem",
"-blocks-storage.tsdb.head-compaction-interval": "4m",
"-blocks-storage.tsdb.block-ranges-period": "2h",
"-blocks-storage.tsdb.ship-interval": "1h",
"-blocks-storage.bucket-store.sync-interval": "15m",
"-blocks-storage.tsdb.retention-period": "2h",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
"-querier.query-store-for-labels-enabled": "true",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul1.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
},
)
flags2 := mergeFlags(
AlertmanagerLocalFlags(),
map[string]string{
"-store.engine": blocksStorageEngine,
"-blocks-storage.backend": "filesystem",
"-blocks-storage.tsdb.head-compaction-interval": "4m",
"-blocks-storage.tsdb.block-ranges-period": "2h",
"-blocks-storage.tsdb.ship-interval": "1h",
"-blocks-storage.bucket-store.sync-interval": "15m",
"-blocks-storage.tsdb.retention-period": "2h",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
"-querier.query-store-for-labels-enabled": "true",
"-blocks-storage.expanded_postings_cache.head.enabled": "true",
"-blocks-storage.expanded_postings_cache.block.enabled": "true",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul2.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
},
)
// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

path1 := path.Join(s.SharedDir(), "cortex-1")
path2 := path.Join(s.SharedDir(), "cortex-2")

flags1 = mergeFlags(flags1, map[string]string{"-blocks-storage.filesystem.dir": path1})
flags2 = mergeFlags(flags2, map[string]string{"-blocks-storage.filesystem.dir": path2})
// Start Cortex replicas.
cortex1 := e2ecortex.NewSingleBinary("cortex-1", flags1, stableCortexImage)
cortex2 := e2ecortex.NewSingleBinary("cortex-2", flags2, "")
require.NoError(t, s.StartAndWaitReady(cortex1, cortex2))

// Wait until Cortex replicas have updated the ring state.
require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))

var clients []*e2ecortex.Client
c1, err := e2ecortex.NewClient(cortex1.HTTPEndpoint(), cortex1.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)
c2, err := e2ecortex.NewClient(cortex2.HTTPEndpoint(), cortex2.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

clients = append(clients, c1, c2)

now := time.Now()
// Push some series to Cortex.
start := now.Add(-24 * time.Hour)
scrapeInterval := 30 * time.Second

numSeries := 10
numberOfLabelsPerSeries := 5
numSamples := 10
ss := make([]prompb.TimeSeries, numSeries*numberOfLabelsPerSeries)
lbls := make([]labels.Labels, numSeries*numberOfLabelsPerSeries)

for i := 0; i < numSeries; i++ {
for j := 0; j < numberOfLabelsPerSeries; j++ {
series := e2e.GenerateSeriesWithSamples(
fmt.Sprintf("test_series_%d", i),
start,
scrapeInterval,
i*numSamples,
numSamples,
prompb.Label{Name: "j", Value: fmt.Sprintf("%d", j)},
)
ss[i*numberOfLabelsPerSeries+j] = series

builder := labels.NewBuilder(labels.EmptyLabels())
for _, lbl := range series.Labels {
builder.Set(lbl.Name, lbl.Value)
}
lbls[i*numberOfLabelsPerSeries+j] = builder.Labels()
}
}

rnd := rand.New(rand.NewSource(now.Unix()))
opts := []promqlsmith.Option{
promqlsmith.WithEnableOffset(true),
promqlsmith.WithEnableAtModifier(true),
}
ps := promqlsmith.New(rnd, lbls, opts...)

// Create the queries with the original labels
testRun := 100
queries := make([]string, testRun)
for i := 0; i < testRun; i++ {
expr := ps.WalkRangeQuery()
queries[i] = expr.Pretty(0)
}

// Lets run multiples iterations and create new series every iteration
for k := 0; k < 5; k++ {

nss := make([]prompb.TimeSeries, numSeries*numberOfLabelsPerSeries)
for i := 0; i < numSeries; i++ {
for j := 0; j < numberOfLabelsPerSeries; j++ {
nss[i*numberOfLabelsPerSeries+j] = e2e.GenerateSeriesWithSamples(
fmt.Sprintf("test_series_%d", i),
start.Add(scrapeInterval*time.Duration(numSamples*j)),
scrapeInterval,
i*numSamples,
numSamples,
prompb.Label{Name: "j", Value: fmt.Sprintf("%d", j)},
prompb.Label{Name: "k", Value: fmt.Sprintf("%d", k)},
)
}
}

for _, client := range clients {
res, err := client.Push(nss)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)
}

type testCase struct {
query string
res1, res2 model.Value
err1, err2 error
}

queryStart := time.Now().Add(-time.Hour * 24)
queryEnd := time.Now()
cases := make([]*testCase, 0, 200)

for _, query := range queries {
res1, err1 := c1.QueryRange(query, queryStart, queryEnd, scrapeInterval)
res2, err2 := c2.QueryRange(query, queryStart, queryEnd, scrapeInterval)
cases = append(cases, &testCase{
query: query,
res1: res1,
res2: res2,
err1: err1,
err2: err2,
})
}

failures := 0
for i, tc := range cases {
qt := "range query"
if tc.err1 != nil || tc.err2 != nil {
if !cmp.Equal(tc.err1, tc.err2) {
t.Logf("case %d error mismatch.\n%s: %s\nerr1: %v\nerr2: %v\n", i, qt, tc.query, tc.err1, tc.err2)
failures++
}
} else if !cmp.Equal(tc.res1, tc.res2, comparer) {
t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String())
failures++
}
}
if failures > 0 {
require.Failf(t, "finished query fuzzing tests", "%d test cases failed", failures)
}
}
}

func TestVerticalShardingFuzz(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down
Loading
Loading