Skip to content

Commit ba5191b

Browse files
committed
WIP
Signed-off-by: Alan Protasio <[email protected]>
1 parent e35add8 commit ba5191b

25 files changed

+3668
-100
lines changed

Makefile

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ pkg/ring/ring.pb.go: pkg/ring/ring.proto
9595
pkg/frontend/v1/frontendv1pb/frontend.pb.go: pkg/frontend/v1/frontendv1pb/frontend.proto
9696
pkg/frontend/v2/frontendv2pb/frontend.pb.go: pkg/frontend/v2/frontendv2pb/frontend.proto
9797
pkg/querier/tripperware/queryrange/queryrange.pb.go: pkg/querier/tripperware/queryrange/queryrange.proto
98+
pkg/querier/tripperware/instantquery/instantquery.pb.go: pkg/querier/tripperware/instantquery/instantquery.proto
9899
pkg/querier/tripperware/query.pb.go: pkg/querier/tripperware/query.proto
99100
pkg/querier/stats/stats.pb.go: pkg/querier/stats/stats.proto
100101
pkg/distributor/ha_tracker.pb.go: pkg/distributor/ha_tracker.proto
@@ -202,6 +203,7 @@ lint:
202203
./pkg/frontend/... \
203204
./pkg/querier/tenantfederation/... \
204205
./pkg/querier/tripperware/... \
206+
./pkg/querier/tripperware/instantquery/... \
205207
./pkg/querier/tripperware/queryrange/...
206208

207209
# Ensure packages that no longer use a global logger don't reintroduce it

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,11 @@ api:
126126
# Cortex query-frontend.
127127
[query_range: <query_range_config>]
128128

129+
query:
130+
# Number of shards to use when distributing shardable PromQL queries.
131+
# CLI flag: -querier.vertical-shard-size
132+
[vertical_shard_size: <int> | default = 0]
133+
129134
# The blocks_storage_config configures the blocks storage.
130135
[blocks_storage: <blocks_storage_config>]
131136

pkg/cortex/cortex.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ import (
4040
"github.com/cortexproject/cortex/pkg/querier"
4141
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
4242
"github.com/cortexproject/cortex/pkg/querier/tripperware"
43+
"github.com/cortexproject/cortex/pkg/querier/tripperware/instantquery"
4344
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
4445
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
4546
"github.com/cortexproject/cortex/pkg/ring"
@@ -107,6 +108,7 @@ type Config struct {
107108
Worker querier_worker.Config `yaml:"frontend_worker"`
108109
Frontend frontend.CombinedFrontendConfig `yaml:"frontend"`
109110
QueryRange queryrange.Config `yaml:"query_range"`
111+
Query instantquery.Config `yaml:"query"`
110112
BlocksStorage tsdb.BlocksStorageConfig `yaml:"blocks_storage"`
111113
Compactor compactor.Config `yaml:"compactor"`
112114
StoreGateway storegateway.Config `yaml:"store_gateway"`
@@ -153,6 +155,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
153155
c.Worker.RegisterFlags(f)
154156
c.Frontend.RegisterFlags(f)
155157
c.QueryRange.RegisterFlags(f)
158+
c.Query.RegisterFlags(f)
156159
c.BlocksStorage.RegisterFlags(f)
157160
c.Compactor.RegisterFlags(f)
158161
c.StoreGateway.RegisterFlags(f)

pkg/cortex/modules.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"github.com/cortexproject/cortex/pkg/querier"
3535
"github.com/cortexproject/cortex/pkg/querier/tenantfederation"
3636
"github.com/cortexproject/cortex/pkg/querier/tripperware"
37+
"github.com/cortexproject/cortex/pkg/querier/tripperware/instantquery"
3738
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
3839
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
3940
"github.com/cortexproject/cortex/pkg/ring"
@@ -434,15 +435,20 @@ func (t *Cortex) initDeleteRequestsStore() (serv services.Service, err error) {
434435
// initQueryFrontendTripperware instantiates the tripperware used by the query frontend
435436
// to optimize Prometheus query requests.
436437
func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) {
438+
t.Cfg.QueryRange.VerticalShardSize = t.Cfg.Query.VerticalShardSize
437439
queryRangeMiddlewares, cache, err := queryrange.Middlewares(
438440
t.Cfg.QueryRange,
439441
util_log.Logger,
440442
t.Overrides,
441-
queryrange.PrometheusCodec,
442443
queryrange.PrometheusResponseExtractor{},
443444
prometheus.DefaultRegisterer,
444445
t.TombstonesLoader,
445446
)
447+
if err != nil {
448+
return nil, err
449+
}
450+
451+
instantQueryMiddlewares, err := instantquery.Middlewares(t.Cfg.Query, util_log.Logger, t.Overrides)
446452

447453
if err != nil {
448454
return nil, err
@@ -452,7 +458,9 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
452458
prometheus.DefaultRegisterer,
453459
t.Cfg.QueryRange.ForwardHeaders,
454460
queryRangeMiddlewares,
461+
instantQueryMiddlewares,
455462
queryrange.PrometheusCodec,
463+
instantquery.InstantQueryCodec,
456464
)
457465

458466
return services.NewIdleService(nil, func(_ error) error {

pkg/ingester/ingester.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/thanos-io/objstore"
3030
"github.com/thanos-io/thanos/pkg/block/metadata"
3131
"github.com/thanos-io/thanos/pkg/shipper"
32+
"github.com/thanos-io/thanos/pkg/store/storepb"
3233
"github.com/weaveworks/common/httpgrpc"
3334
"go.uber.org/atomic"
3435
"golang.org/x/sync/errgroup"
@@ -37,6 +38,7 @@ import (
3738
"github.com/cortexproject/cortex/pkg/chunk/encoding"
3839
"github.com/cortexproject/cortex/pkg/cortexpb"
3940
"github.com/cortexproject/cortex/pkg/ingester/client"
41+
"github.com/cortexproject/cortex/pkg/querysharding"
4042
"github.com/cortexproject/cortex/pkg/ring"
4143
"github.com/cortexproject/cortex/pkg/storage/bucket"
4244
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
@@ -1173,6 +1175,13 @@ func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client
11731175
return nil, err
11741176
}
11751177

1178+
matchers, sm, err := querysharding.ExtractShardingMatchers(matchers)
1179+
if err != nil {
1180+
return nil, err
1181+
}
1182+
1183+
defer sm.Close()
1184+
11761185
i.metrics.queries.Inc()
11771186

11781187
db := i.getTSDB(userID)
@@ -1197,6 +1206,9 @@ func (i *Ingester) Query(ctx context.Context, req *client.QueryRequest) (*client
11971206
result := &client.QueryResponse{}
11981207
for ss.Next() {
11991208
series := ss.At()
1209+
if sm.IsSharded() && !sm.MatchesLabels(series.Labels()) {
1210+
continue
1211+
}
12001212

12011213
ts := cortexpb.TimeSeries{
12021214
Labels: cortexpb.FromLabelsToLabelAdapters(series.Labels()),
@@ -1600,6 +1612,13 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
16001612
return err
16011613
}
16021614

1615+
matchers, shardMatcher, err := querysharding.ExtractShardingMatchers(matchers)
1616+
if err != nil {
1617+
return err
1618+
}
1619+
1620+
defer shardMatcher.Close()
1621+
16031622
i.metrics.queries.Inc()
16041623

16051624
db := i.getTSDB(userID)
@@ -1609,7 +1628,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
16091628

16101629
numSamples := 0
16111630
numSeries := 0
1612-
numSeries, numSamples, err = i.queryStreamChunks(ctx, db, int64(from), int64(through), matchers, stream)
1631+
numSeries, numSamples, err = i.queryStreamChunks(ctx, db, int64(from), int64(through), matchers, shardMatcher, stream)
16131632

16141633
if err != nil {
16151634
return err
@@ -1622,7 +1641,7 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
16221641
}
16231642

16241643
// queryStreamChunks streams metrics from a TSDB. This implements the client.IngesterServer interface
1625-
func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) {
1644+
func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, sm *storepb.ShardMatcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples int, _ error) {
16261645
q, err := db.ChunkQuerier(ctx, from, through)
16271646
if err != nil {
16281647
return 0, 0, err
@@ -1640,6 +1659,10 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
16401659
for ss.Next() {
16411660
series := ss.At()
16421661

1662+
if sm.IsSharded() && !sm.MatchesLabels(series.Labels()) {
1663+
continue
1664+
}
1665+
16431666
// convert labels to LabelAdapter
16441667
ts := client.TimeSeriesChunk{
16451668
Labels: cortexpb.FromLabelsToLabelAdapters(series.Labels()),

pkg/querier/blocks_store_queryable.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/cortexproject/cortex/pkg/cortexpb"
3434
"github.com/cortexproject/cortex/pkg/querier/series"
3535
"github.com/cortexproject/cortex/pkg/querier/stats"
36+
"github.com/cortexproject/cortex/pkg/querysharding"
3637
"github.com/cortexproject/cortex/pkg/ring"
3738
"github.com/cortexproject/cortex/pkg/ring/kv"
3839
"github.com/cortexproject/cortex/pkg/storage/bucket"
@@ -409,9 +410,8 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
409410
}
410411

411412
var (
412-
convertedMatchers = convertMatchersToLabelMatcher(matchers)
413-
resSeriesSets = []storage.SeriesSet(nil)
414-
resWarnings = storage.Warnings(nil)
413+
resSeriesSets = []storage.SeriesSet(nil)
414+
resWarnings = storage.Warnings(nil)
415415

416416
maxChunksLimit = q.limits.MaxChunksPerQueryFromStore(q.userID)
417417
leftChunksLimit = maxChunksLimit
@@ -420,7 +420,7 @@ func (q *blocksStoreQuerier) selectSorted(sp *storage.SelectHints, matchers ...*
420420
)
421421

422422
queryFunc := func(clients map[BlocksStoreClient][]ulid.ULID, minT, maxT int64) ([]ulid.ULID, error) {
423-
seriesSets, queriedBlocks, warnings, numChunks, err := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, convertedMatchers, maxChunksLimit, leftChunksLimit)
423+
seriesSets, queriedBlocks, warnings, numChunks, err := q.fetchSeriesFromStores(spanCtx, sp, clients, minT, maxT, matchers, maxChunksLimit, leftChunksLimit)
424424
if err != nil {
425425

426426
return nil, err
@@ -562,7 +562,6 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
562562
minT int64,
563563
maxT int64,
564564
matchers []*labels.Matcher,
565-
convertedMatchers []storepb.LabelMatcher,
566565
maxChunksLimit int,
567566
leftChunksLimit int,
568567
) ([]storage.SeriesSet, []ulid.ULID, storage.Warnings, int, error) {
@@ -578,6 +577,12 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
578577
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx)
579578
reqStats = stats.FromContext(ctx)
580579
)
580+
matchers, shardingInfo, err := querysharding.ExtractShardingInfo(matchers)
581+
convertedMatchers := convertMatchersToLabelMatcher(matchers)
582+
583+
if err != nil {
584+
return nil, nil, nil, 0, err
585+
}
581586

582587
// Concurrently fetch series from all clients.
583588
for c, blockIDs := range clients {
@@ -595,7 +600,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(
595600
// retrieved.
596601
skipChunks := sp != nil && sp.Func == "series"
597602

598-
req, err := createSeriesRequest(minT, maxT, convertedMatchers, skipChunks, blockIDs)
603+
req, err := createSeriesRequest(minT, maxT, convertedMatchers, shardingInfo, skipChunks, blockIDs)
599604
if err != nil {
600605
return errors.Wrapf(err, "failed to create series request")
601606
}
@@ -875,7 +880,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore(
875880
return valueSets, warnings, queriedBlocks, nil
876881
}
877882

878-
func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, skipChunks bool, blockIDs []ulid.ULID) (*storepb.SeriesRequest, error) {
883+
func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, shardingInfo *storepb.ShardInfo, skipChunks bool, blockIDs []ulid.ULID) (*storepb.SeriesRequest, error) {
879884
// Selectively query only specific blocks.
880885
hints := &hintspb.SeriesRequestHints{
881886
BlockMatchers: []storepb.LabelMatcher{
@@ -899,6 +904,7 @@ func createSeriesRequest(minT, maxT int64, matchers []storepb.LabelMatcher, skip
899904
PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT,
900905
Hints: anyHints,
901906
SkipChunks: skipChunks,
907+
ShardInfo: shardingInfo,
902908
}, nil
903909
}
904910

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package instantquery
2+
3+
import (
4+
"flag"
5+
6+
"github.com/go-kit/log"
7+
8+
"github.com/cortexproject/cortex/pkg/querier/tripperware"
9+
)
10+
11+
// Config for query_range middleware chain.
12+
type Config struct {
13+
VerticalShardSize int `yaml:"vertical_shard_size"`
14+
}
15+
16+
// RegisterFlags adds the flags required to config this to the given FlagSet.
17+
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
18+
f.IntVar(&cfg.VerticalShardSize, "querier.vertical-shard-size", 0, "Number of shards to use when distributing shardable PromQL queries.")
19+
}
20+
21+
func Middlewares(
22+
cfg Config,
23+
log log.Logger,
24+
limits tripperware.Limits,
25+
) ([]tripperware.Middleware, error) {
26+
var m []tripperware.Middleware
27+
28+
if cfg.VerticalShardSize > 1 {
29+
m = append(m, tripperware.ShardByMiddleware(log, limits, InstantQueryCodec, cfg.VerticalShardSize))
30+
}
31+
return m, nil
32+
}

0 commit comments

Comments
 (0)