Skip to content

Commit 45ccce0

Browse files
committed
Implement matcher cache
Signed-off-by: alanprot <[email protected]>
1 parent 42028f7 commit 45ccce0

File tree

10 files changed

+192
-41
lines changed

10 files changed

+192
-41
lines changed

docs/configuration/config-file-reference.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3181,6 +3181,10 @@ instance_limits:
31813181
# change by changing this option.
31823182
# CLI flag: -ingester.disable-chunk-trimming
31833183
[disable_chunk_trimming: <boolean> | default = false]
3184+
3185+
# Maximum number of entries in the matchers cache. 0 to disable.
3186+
# CLI flag: -ingester.matchers-cache-max-items
3187+
[matchers_cache_max_items: <int> | default = 0]
31843188
```
31853189

31863190
### `ingester_client_config`

integration/query_fuzz_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,8 +378,9 @@ func TestExpandedPostingsCacheFuzz(t *testing.T) {
378378
"-blocks-storage.expanded_postings_cache.head.enabled": "true",
379379
"-blocks-storage.expanded_postings_cache.block.enabled": "true",
380380
// Ingester.
381-
"-ring.store": "consul",
382-
"-consul.hostname": consul2.NetworkHTTPEndpoint(),
381+
"-ring.store": "consul",
382+
"-consul.hostname": consul2.NetworkHTTPEndpoint(),
383+
"-ingester.matchers-cache-max-items": "10000",
383384
// Distributor.
384385
"-distributor.replication-factor": "1",
385386
// Store-gateway.

pkg/distributor/distributor_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/prometheus/prometheus/tsdb/tsdbutil"
2727
"github.com/stretchr/testify/assert"
2828
"github.com/stretchr/testify/require"
29+
storecache "github.com/thanos-io/thanos/pkg/store/cache"
2930
"github.com/weaveworks/common/httpgrpc"
3031
"github.com/weaveworks/common/user"
3132
"go.uber.org/atomic"
@@ -3374,7 +3375,7 @@ func (i *mockIngester) Query(ctx context.Context, req *client.QueryRequest, opts
33743375
return nil, errFail
33753376
}
33763377

3377-
_, _, matchers, err := client.FromQueryRequest(req)
3378+
_, _, matchers, err := client.FromQueryRequest(storecache.NewNoopMatcherCache(), req)
33783379
if err != nil {
33793380
return nil, err
33803381
}
@@ -3400,7 +3401,7 @@ func (i *mockIngester) QueryStream(ctx context.Context, req *client.QueryRequest
34003401
return nil, errFail
34013402
}
34023403

3403-
_, _, matchers, err := client.FromQueryRequest(req)
3404+
_, _, matchers, err := client.FromQueryRequest(storecache.NewNoopMatcherCache(), req)
34043405
if err != nil {
34053406
return nil, err
34063407
}
@@ -3459,7 +3460,7 @@ func (i *mockIngester) MetricsForLabelMatchersStream(ctx context.Context, req *c
34593460
return nil, errFail
34603461
}
34613462

3462-
_, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req)
3463+
_, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(storecache.NewNoopMatcherCache(), req)
34633464
if err != nil {
34643465
return nil, err
34653466
}
@@ -3491,7 +3492,7 @@ func (i *mockIngester) MetricsForLabelMatchers(ctx context.Context, req *client.
34913492
return nil, errFail
34923493
}
34933494

3494-
_, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(req)
3495+
_, _, _, multiMatchers, err := client.FromMetricsForLabelMatchersRequest(storecache.NewNoopMatcherCache(), req)
34953496
if err != nil {
34963497
return nil, err
34973498
}

pkg/ingester/client/compat.go

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@ package client
33
import (
44
"fmt"
55

6+
"github.com/cortexproject/cortex/pkg/cortexpb"
67
"github.com/prometheus/common/model"
78
"github.com/prometheus/prometheus/model/labels"
89
"github.com/prometheus/prometheus/storage"
910
"github.com/prometheus/prometheus/tsdb/chunkenc"
10-
11-
"github.com/cortexproject/cortex/pkg/cortexpb"
11+
storecache "github.com/thanos-io/thanos/pkg/store/cache"
1212
)
1313

1414
// ToQueryRequest builds a QueryRequest proto.
@@ -26,8 +26,8 @@ func ToQueryRequest(from, to model.Time, matchers []*labels.Matcher) (*QueryRequ
2626
}
2727

2828
// FromQueryRequest unpacks a QueryRequest proto.
29-
func FromQueryRequest(req *QueryRequest) (model.Time, model.Time, []*labels.Matcher, error) {
30-
matchers, err := FromLabelMatchers(req.Matchers)
29+
func FromQueryRequest(cache storecache.MatchersCache, req *QueryRequest) (model.Time, model.Time, []*labels.Matcher, error) {
30+
matchers, err := FromLabelMatchers(cache, req.Matchers)
3131
if err != nil {
3232
return 0, 0, nil, err
3333
}
@@ -55,10 +55,10 @@ func ToExemplarQueryRequest(from, to model.Time, matchers ...[]*labels.Matcher)
5555
}
5656

5757
// FromExemplarQueryRequest unpacks a ExemplarQueryRequest proto.
58-
func FromExemplarQueryRequest(req *ExemplarQueryRequest) (int64, int64, [][]*labels.Matcher, error) {
58+
func FromExemplarQueryRequest(cache storecache.MatchersCache, req *ExemplarQueryRequest) (int64, int64, [][]*labels.Matcher, error) {
5959
var result [][]*labels.Matcher
6060
for _, m := range req.Matchers {
61-
matchers, err := FromLabelMatchers(m.Matchers)
61+
matchers, err := FromLabelMatchers(cache, m.Matchers)
6262
if err != nil {
6363
return 0, 0, nil, err
6464
}
@@ -175,10 +175,10 @@ func SeriesSetToQueryResponse(s storage.SeriesSet) (*QueryResponse, error) {
175175
}
176176

177177
// FromMetricsForLabelMatchersRequest unpacks a MetricsForLabelMatchersRequest proto
178-
func FromMetricsForLabelMatchersRequest(req *MetricsForLabelMatchersRequest) (model.Time, model.Time, int, [][]*labels.Matcher, error) {
178+
func FromMetricsForLabelMatchersRequest(cache storecache.MatchersCache, req *MetricsForLabelMatchersRequest) (model.Time, model.Time, int, [][]*labels.Matcher, error) {
179179
matchersSet := make([][]*labels.Matcher, 0, len(req.MatchersSet))
180180
for _, matchers := range req.MatchersSet {
181-
matchers, err := FromLabelMatchers(matchers.Matchers)
181+
matchers, err := FromLabelMatchers(cache, matchers.Matchers)
182182
if err != nil {
183183
return 0, 0, 0, nil, err
184184
}
@@ -206,12 +206,12 @@ func ToLabelValuesRequest(labelName model.LabelName, from, to model.Time, limit
206206
}
207207

208208
// FromLabelValuesRequest unpacks a LabelValuesRequest proto
209-
func FromLabelValuesRequest(req *LabelValuesRequest) (string, int64, int64, int, []*labels.Matcher, error) {
209+
func FromLabelValuesRequest(cache storecache.MatchersCache, req *LabelValuesRequest) (string, int64, int64, int, []*labels.Matcher, error) {
210210
var err error
211211
var matchers []*labels.Matcher
212212

213213
if req.Matchers != nil {
214-
matchers, err = FromLabelMatchers(req.Matchers.Matchers)
214+
matchers, err = FromLabelMatchers(cache, req.Matchers.Matchers)
215215
if err != nil {
216216
return "", 0, 0, 0, nil, err
217217
}
@@ -236,12 +236,12 @@ func ToLabelNamesRequest(from, to model.Time, limit int, matchers []*labels.Matc
236236
}
237237

238238
// FromLabelNamesRequest unpacks a LabelNamesRequest proto
239-
func FromLabelNamesRequest(req *LabelNamesRequest) (int64, int64, int, []*labels.Matcher, error) {
239+
func FromLabelNamesRequest(cache storecache.MatchersCache, req *LabelNamesRequest) (int64, int64, int, []*labels.Matcher, error) {
240240
var err error
241241
var matchers []*labels.Matcher
242242

243243
if req.Matchers != nil {
244-
matchers, err = FromLabelMatchers(req.Matchers.Matchers)
244+
matchers, err = FromLabelMatchers(cache, req.Matchers.Matchers)
245245
if err != nil {
246246
return 0, 0, 0, nil, err
247247
}
@@ -275,27 +275,31 @@ func toLabelMatchers(matchers []*labels.Matcher) ([]*LabelMatcher, error) {
275275
return result, nil
276276
}
277277

278-
func FromLabelMatchers(matchers []*LabelMatcher) ([]*labels.Matcher, error) {
278+
func FromLabelMatchers(cache storecache.MatchersCache, matchers []*LabelMatcher) ([]*labels.Matcher, error) {
279279
result := make([]*labels.Matcher, 0, len(matchers))
280280
for _, matcher := range matchers {
281-
var mtype labels.MatchType
282-
switch matcher.Type {
283-
case EQUAL:
284-
mtype = labels.MatchEqual
285-
case NOT_EQUAL:
286-
mtype = labels.MatchNotEqual
287-
case REGEX_MATCH:
288-
mtype = labels.MatchRegexp
289-
case REGEX_NO_MATCH:
290-
mtype = labels.MatchNotRegexp
291-
default:
292-
return nil, fmt.Errorf("invalid matcher type")
293-
}
294-
matcher, err := labels.NewMatcher(mtype, matcher.Name, matcher.Value)
281+
m, err := cache.GetOrSet(matcher.String(), func() (*labels.Matcher, error) {
282+
var mtype labels.MatchType
283+
switch matcher.Type {
284+
case EQUAL:
285+
mtype = labels.MatchEqual
286+
case NOT_EQUAL:
287+
mtype = labels.MatchNotEqual
288+
case REGEX_MATCH:
289+
mtype = labels.MatchRegexp
290+
case REGEX_NO_MATCH:
291+
mtype = labels.MatchNotRegexp
292+
default:
293+
return nil, fmt.Errorf("invalid matcher type")
294+
}
295+
return labels.NewMatcher(mtype, matcher.GetName(), matcher.GetValue())
296+
})
297+
295298
if err != nil {
296299
return nil, err
297300
}
298-
result = append(result, matcher)
301+
302+
result = append(result, m)
299303
}
300304
return result, nil
301305
}

pkg/ingester/client/compat_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/prometheus/common/model"
99
"github.com/prometheus/prometheus/model/labels"
10+
storecache "github.com/thanos-io/thanos/pkg/store/cache"
1011
)
1112

1213
func TestQueryRequest(t *testing.T) {
@@ -41,7 +42,7 @@ func TestQueryRequest(t *testing.T) {
4142
t.Fatal(err)
4243
}
4344

44-
haveFrom, haveTo, haveMatchers, err := FromQueryRequest(req)
45+
haveFrom, haveTo, haveMatchers, err := FromQueryRequest(storecache.NewNoopMatcherCache(), req)
4546
if err != nil {
4647
t.Fatal(err)
4748
}

pkg/ingester/ingester.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"flag"
66
"fmt"
7+
storecache "github.com/thanos-io/thanos/pkg/store/cache"
78
"html"
89
"io"
910
"math"
@@ -145,6 +146,9 @@ type Config struct {
145146
// When disabled, the result may contain samples outside the queried time range but Select() performances
146147
// may be improved.
147148
DisableChunkTrimming bool `yaml:"disable_chunk_trimming"`
149+
150+
// Maximum number of entries in the matchers cache. 0 to disable.
151+
MatchersCacheMaxItems int `yaml:"matchers_cache_max_items"`
148152
}
149153

150154
// RegisterFlags adds the flags required to config this to the given FlagSet
@@ -173,6 +177,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
173177
f.BoolVar(&cfg.LabelsStringInterningEnabled, "ingester.labels-string-interning-enabled", false, "Experimental: Enable string interning for metrics labels.")
174178

175179
f.BoolVar(&cfg.DisableChunkTrimming, "ingester.disable-chunk-trimming", false, "Disable trimming of matching series chunks based on query Start and End time. When disabled, the result may contain samples outside the queried time range but select performances may be improved. Note that certain query results might change by changing this option.")
180+
f.IntVar(&cfg.MatchersCacheMaxItems, "ingester.matchers-cache-max-items", 0, "Maximum number of entries in the matchers cache. 0 to disable.")
176181
}
177182

178183
func (cfg *Config) Validate() error {
@@ -243,6 +248,7 @@ type Ingester struct {
243248
inflightQueryRequests atomic.Int64
244249
maxInflightQueryRequests util_math.MaxTracker
245250

251+
matchersCache storecache.MatchersCache
246252
expandedPostingsCacheFactory *cortex_tsdb.ExpandedPostingsCacheFactory
247253
}
248254

@@ -708,7 +714,15 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
708714
logger: logger,
709715
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval),
710716
expandedPostingsCacheFactory: cortex_tsdb.NewExpandedPostingsCacheFactory(cfg.BlocksStorageConfig.TSDB.PostingsCache),
717+
matchersCache: storecache.NewNoopMatcherCache(),
718+
}
719+
720+
if cfg.MatchersCacheMaxItems > 0 {
721+
r := prometheus.NewRegistry()
722+
registerer.MustRegister(newMatchCacheMetrics(r))
723+
i.matchersCache, err = storecache.NewMatchersCache(storecache.WithSize(cfg.MatchersCacheMaxItems), storecache.WithPromRegistry(r))
711724
}
725+
712726
i.metrics = newIngesterMetrics(registerer,
713727
false,
714728
cfg.ActiveSeriesMetricsEnabled,
@@ -1474,7 +1488,7 @@ func (i *Ingester) QueryExemplars(ctx context.Context, req *client.ExemplarQuery
14741488
return nil, err
14751489
}
14761490

1477-
from, through, matchers, err := client.FromExemplarQueryRequest(req)
1491+
from, through, matchers, err := client.FromExemplarQueryRequest(i.matchersCache, req)
14781492
if err != nil {
14791493
return nil, err
14801494
}
@@ -1564,7 +1578,7 @@ func (i *Ingester) labelsValuesCommon(ctx context.Context, req *client.LabelValu
15641578
return nil, cleanup, err
15651579
}
15661580

1567-
labelName, startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelValuesRequest(req)
1581+
labelName, startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelValuesRequest(i.matchersCache, req)
15681582
if err != nil {
15691583
return nil, cleanup, err
15701584
}
@@ -1654,7 +1668,7 @@ func (i *Ingester) labelNamesCommon(ctx context.Context, req *client.LabelNamesR
16541668
return nil, cleanup, err
16551669
}
16561670

1657-
startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelNamesRequest(req)
1671+
startTimestampMs, endTimestampMs, limit, matchers, err := client.FromLabelNamesRequest(i.matchersCache, req)
16581672
if err != nil {
16591673
return nil, cleanup, err
16601674
}
@@ -1768,7 +1782,7 @@ func (i *Ingester) metricsForLabelMatchersCommon(ctx context.Context, req *clien
17681782
}
17691783

17701784
// Parse the request
1771-
_, _, limit, matchersSet, err := client.FromMetricsForLabelMatchersRequest(req)
1785+
_, _, limit, matchersSet, err := client.FromMetricsForLabelMatchersRequest(i.matchersCache, req)
17721786
if err != nil {
17731787
return cleanup, err
17741788
}
@@ -1982,7 +1996,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
19821996
return err
19831997
}
19841998

1985-
from, through, matchers, err := client.FromQueryRequest(req)
1999+
from, through, matchers, err := client.FromQueryRequest(i.matchersCache, req)
19862000
if err != nil {
19872001
return err
19882002
}

pkg/ingester/ingester_test.go

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,62 @@ func seriesSetFromResponseStream(s *mockQueryStreamServer) (storage.SeriesSet, e
119119
return set, nil
120120
}
121121

122+
func TestMatcherCache(t *testing.T) {
123+
limits := defaultLimitsTestConfig()
124+
userID := "1"
125+
tenantLimits := newMockTenantLimits(map[string]*validation.Limits{userID: &limits})
126+
registry := prometheus.NewRegistry()
127+
128+
dir := t.TempDir()
129+
chunksDir := filepath.Join(dir, "chunks")
130+
blocksDir := filepath.Join(dir, "blocks")
131+
require.NoError(t, os.Mkdir(chunksDir, os.ModePerm))
132+
require.NoError(t, os.Mkdir(blocksDir, os.ModePerm))
133+
134+
ing, err := prepareIngesterWithBlocksStorageAndLimits(t, defaultIngesterTestConfig(t), limits, tenantLimits, blocksDir, registry, true)
135+
require.NoError(t, err)
136+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
137+
138+
defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck
139+
140+
// Wait until it's ACTIVE
141+
test.Poll(t, time.Second, ring.ACTIVE, func() interface{} {
142+
return ing.lifecycler.GetState()
143+
})
144+
ctx := user.InjectOrgID(context.Background(), userID)
145+
numberOfDifferentMatchers := 50
146+
callPerMatcher := 10
147+
for j := 0; j < numberOfDifferentMatchers; j++ {
148+
for i := 0; i < callPerMatcher; i++ {
149+
s := &mockQueryStreamServer{ctx: ctx}
150+
err = ing.QueryStream(&client.QueryRequest{
151+
StartTimestampMs: math.MinInt64,
152+
EndTimestampMs: math.MaxInt64,
153+
Matchers: []*client.LabelMatcher{{Type: client.EQUAL, Name: labels.MetricName, Value: fmt.Sprintf("%d", j)}},
154+
}, s)
155+
require.NoError(t, err)
156+
}
157+
}
158+
159+
require.NoError(t, testutil.GatherAndCompare(registry, bytes.NewBufferString(fmt.Sprintf(`
160+
# HELP ingester_matchers_cache_evicted_total Total number of items evicted from the cache
161+
# TYPE ingester_matchers_cache_evicted_total counter
162+
ingester_matchers_cache_evicted_total 0
163+
# HELP ingester_matchers_cache_hits_total Total number of cache hits for series matchers
164+
# TYPE ingester_matchers_cache_hits_total counter
165+
ingester_matchers_cache_hits_total %v
166+
# HELP ingester_matchers_cache_items Total number of cached items
167+
# TYPE ingester_matchers_cache_items gauge
168+
ingester_matchers_cache_items %v
169+
# HELP ingester_matchers_cache_max_items Maximum number of items that can be cached
170+
# TYPE ingester_matchers_cache_max_items gauge
171+
ingester_matchers_cache_max_items 0
172+
# HELP ingester_matchers_cache_requests_total Total number of cache requests for series matchers
173+
# TYPE ingester_matchers_cache_requests_total counter
174+
ingester_matchers_cache_requests_total %v
175+
`, callPerMatcher*numberOfDifferentMatchers-numberOfDifferentMatchers, numberOfDifferentMatchers, callPerMatcher*numberOfDifferentMatchers)), "ingester_matchers_cache_requests_total", "ingester_matchers_cache_hits_total", "ingester_matchers_cache_items", "ingester_matchers_cache_max_items", "ingester_matchers_cache_evicted_total"))
176+
}
177+
122178
func TestIngesterPerLabelsetLimitExceeded(t *testing.T) {
123179
limits := defaultLimitsTestConfig()
124180
userID := "1"

pkg/ingester/lifecycle_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ func defaultIngesterTestConfig(t testing.TB) Config {
4343
cfg.LifecyclerConfig.FinalSleep = 0
4444
cfg.ActiveSeriesMetricsEnabled = true
4545
cfg.LabelsStringInterningEnabled = true
46+
cfg.MatchersCacheMaxItems = 1024
4647
return cfg
4748
}
4849

0 commit comments

Comments
 (0)