Skip to content

Commit b1ee0aa

Browse files
pstibranypracucci
andauthored
Shuffle-sharding of queriers in the query-frontend (#3113)
* Added ID to the querier. Signed-off-by: Peter Štibraný <[email protected]> * Extended frontend protocol to add request type. Modified worker to respond to new GET_ID request type. Signed-off-by: Peter Štibraný <[email protected]> * Frontend now asks querier for its ID before running process loop. Signed-off-by: Peter Štibraný <[email protected]> * Close gRPC connection when stopping manager. Signed-off-by: Peter Štibraný <[email protected]> * Shuffle shard queriers between users. Signed-off-by: Peter Štibraný <[email protected]> * Added MaxQueriersPerUser to overrides. Signed-off-by: Peter Štibraný <[email protected]> * Fixes. Signed-off-by: Peter Štibraný <[email protected]> * Fix querier.id default value. Signed-off-by: Peter Štibraný <[email protected]> * CHANGELOG.md Signed-off-by: Peter Štibraný <[email protected]> * Make lint happy. Signed-off-by: Peter Štibraný <[email protected]> * Fix protos. Signed-off-by: Peter Štibraný <[email protected]> * Fixed bug in getNextRequestForQuerier, modified benchmarks to add queriers. Signed-off-by: Peter Štibraný <[email protected]> * Fixed spelling. Signed-off-by: Peter Štibraný <[email protected]> * Move metrics increment in register/unregister querier connection. Signed-off-by: Peter Štibraný <[email protected]> * When select queriers for tenant, use shuffling. Signed-off-by: Peter Štibraný <[email protected]> * Use rand numbers to find queriers. Signed-off-by: Peter Štibraný <[email protected]> * Fixed docs. Signed-off-by: Peter Štibraný <[email protected]> * Add integration test for sharding queriers. Signed-off-by: Peter Štibraný <[email protected]> * Use shuffling for selecting queriers per user. This is similar to rnd.Shuffle(), but stops early after selecting enough queriers. Signed-off-by: Peter Štibraný <[email protected]> * Updated documentation. Signed-off-by: Peter Štibraný <[email protected]> * Fixed docs after rebase. Signed-off-by: Peter Štibraný <[email protected]> * Unify seed computation with subring PR. Signed-off-by: Peter Štibraný <[email protected]> * Added unit test to verify that selected queriers are unique and come from supplied input. Signed-off-by: Peter Štibraný <[email protected]> * Review feedback. Signed-off-by: Peter Štibraný <[email protected]> * Mention shuffle sharding in v1 guarantees. Signed-off-by: Peter Štibraný <[email protected]> * Review feedback. Signed-off-by: Peter Štibraný <[email protected]> * Fix flag after master merge. Signed-off-by: Peter Štibraný <[email protected]> * Fixed TestQueuesConsistency test. Signed-off-by: Peter Štibraný <[email protected]> Co-authored-by: Marco Pracucci <[email protected]>
1 parent ad5e958 commit b1ee0aa

19 files changed

+1165
-414
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## master / unreleased
44

5+
* [FEATURE] Added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-user` globally, or using per-user limit `max_queriers_per_user`), each user's requests will be handled by different set of queriers. #3113
56
* [ENHANCEMENT] Ingester: added new metric `cortex_ingester_active_series` to track active series more accurately. Also added options to control whether active series tracking is enabled (`-ingester.active-series-enabled`, defaults to false), and how often this metric is updated (`-ingester.active-series-update-period`) and max idle time for series to be considered inactive (`-ingester.active-series-idle-timeout`). #3153
67
* [BUGFIX] No-longer-needed ingester operations for queries triggered by queriers and rulers are now canceled. #3178
78
* [BUGFIX] Ruler: directories in the configured `rules-path` will be removed on startup and shutdown in order to ensure they don't persist between runs. #3195

docs/configuration/config-file-reference.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2414,6 +2414,11 @@ The `frontend_worker_config` configures the worker - running within the Cortex q
24142414
# CLI flag: -querier.dns-lookup-period
24152415
[dns_lookup_duration: <duration> | default = 10s]
24162416
2417+
# Querier ID, sent to frontend service to identify requests from the same
2418+
# querier. Defaults to hostname.
2419+
# CLI flag: -querier.id
2420+
[id: <string> | default = ""]
2421+
24172422
grpc_client_config:
24182423
# gRPC client max receive message size (bytes).
24192424
# CLI flag: -querier.frontend-client.grpc-max-recv-msg-size
@@ -2831,6 +2836,15 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
28312836
# CLI flag: -frontend.max-cache-freshness
28322837
[max_cache_freshness: <duration> | default = 1m]
28332838
2839+
# Maximum number of queriers that can handle requests for a single user. If set
2840+
# to 0 or value higher than number of available queriers, *all* queriers will
2841+
# handle requests for the user. Each frontend will select the same set of
2842+
# queriers for the same user (given that all queriers are connected to all
2843+
# frontends). This option only works with queriers connecting to the
2844+
# query-frontend, not when using downstream URL.
2845+
# CLI flag: -frontend.max-queriers-per-user
2846+
[max_queriers_per_user: <int> | default = 0]
2847+
28342848
# Duration to delay the evaluation of rules to ensure the underlying metrics
28352849
# have been pushed to Cortex.
28362850
# CLI flag: -ruler.evaluation-delay-duration

docs/configuration/v1-guarantees.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,4 @@ Currently experimental features are:
4747
- gRPC Store.
4848
- Querier support for querying chunks and blocks store at the same time.
4949
- Tracking of active series and exporting them as metrics (`-ingester.active-series-metrics-enabled` and related flags)
50+
- Shuffle-sharding of queriers in the query-frontend (i.e. use of `-frontend.max-queriers-per-user` flag with non-zero value).
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
// +build requires_docker
2+
3+
package integration
4+
5+
import (
6+
"strconv"
7+
"sync"
8+
"testing"
9+
"time"
10+
11+
"github.com/prometheus/common/model"
12+
"github.com/prometheus/prometheus/prompb"
13+
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
15+
16+
"github.com/cortexproject/cortex/integration/e2e"
17+
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
18+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
19+
"github.com/cortexproject/cortex/integration/e2ecortex"
20+
)
21+
22+
func TestQuerierSharding(t *testing.T) {
23+
runQuerierShardingTest(t, true)
24+
}
25+
26+
func TestQuerierNoSharding(t *testing.T) {
27+
runQuerierShardingTest(t, false)
28+
}
29+
30+
func runQuerierShardingTest(t *testing.T, sharding bool) {
31+
// Going to high starts hitting filedescriptor limit, since we run all queriers concurrently.
32+
const numQueries = 100
33+
34+
s, err := e2e.NewScenario(networkName)
35+
require.NoError(t, err)
36+
defer s.Close()
37+
38+
memcached := e2ecache.NewMemcached()
39+
consul := e2edb.NewConsul()
40+
require.NoError(t, s.StartAndWaitReady(consul, memcached))
41+
42+
minio := e2edb.NewMinio(9000, BlocksStorageFlags["-blocks-storage.s3.bucket-name"])
43+
require.NoError(t, s.StartAndWaitReady(minio))
44+
45+
flags := BlocksStorageFlags
46+
47+
flags = mergeFlags(flags, map[string]string{
48+
"-querier.cache-results": "true",
49+
"-querier.split-queries-by-interval": "24h",
50+
"-querier.query-ingesters-within": "12h", // Required by the test on query /series out of ingesters time range
51+
"-frontend.memcached.addresses": "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort),
52+
"-querier.max-outstanding-requests-per-tenant": strconv.Itoa(numQueries), // To avoid getting errors.
53+
})
54+
55+
if sharding {
56+
// Use only single querier for each user.
57+
flags["-frontend.max-queriers-per-user"] = "1"
58+
}
59+
60+
// Start Cortex components.
61+
queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
62+
ingester := e2ecortex.NewIngesterWithConfigFile("ingester", consul.NetworkHTTPEndpoint(), "", flags, "")
63+
distributor := e2ecortex.NewDistributorWithConfigFile("distributor", consul.NetworkHTTPEndpoint(), "", flags, "")
64+
65+
require.NoError(t, s.Start(queryFrontend))
66+
67+
querier1 := e2ecortex.NewQuerierWithConfigFile("querier-1", consul.NetworkHTTPEndpoint(), "", mergeFlags(flags, map[string]string{
68+
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
69+
}), "")
70+
querier2 := e2ecortex.NewQuerierWithConfigFile("querier-2", consul.NetworkHTTPEndpoint(), "", mergeFlags(flags, map[string]string{
71+
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
72+
}), "")
73+
74+
require.NoError(t, s.StartAndWaitReady(querier1, querier2, ingester, distributor))
75+
require.NoError(t, s.WaitReady(queryFrontend))
76+
77+
// Wait until distributor and queriers have updated the ring.
78+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
79+
require.NoError(t, querier1.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
80+
require.NoError(t, querier2.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
81+
82+
// Push a series for each user to Cortex.
83+
now := time.Now()
84+
85+
distClient, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userID)
86+
require.NoError(t, err)
87+
88+
var series []prompb.TimeSeries
89+
series, expectedVector := generateSeries("series_1", now)
90+
91+
res, err := distClient.Push(series)
92+
require.NoError(t, err)
93+
require.Equal(t, 200, res.StatusCode)
94+
95+
// Send both queriers a single query, so that they both initialize their cortex_querier_request_duration_seconds metrics.
96+
for _, q := range []*e2ecortex.CortexService{querier1, querier2} {
97+
c, err := e2ecortex.NewClient("", q.HTTPEndpoint(), "", "", userID)
98+
require.NoError(t, err)
99+
100+
_, err = c.Query("series_1", now)
101+
require.NoError(t, err)
102+
}
103+
104+
wg := sync.WaitGroup{}
105+
106+
// Run all queries concurrently to get better distribution of requests between queriers.
107+
for i := 0; i < numQueries; i++ {
108+
wg.Add(1)
109+
110+
go func() {
111+
defer wg.Done()
112+
c, err := e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", userID)
113+
require.NoError(t, err)
114+
115+
result, err := c.Query("series_1", now)
116+
require.NoError(t, err)
117+
require.Equal(t, model.ValVector, result.Type())
118+
assert.Equal(t, expectedVector, result.(model.Vector))
119+
}()
120+
}
121+
122+
wg.Wait()
123+
124+
require.NoError(t, queryFrontend.WaitSumMetrics(e2e.Equals(numQueries), "cortex_query_frontend_queries_total"))
125+
126+
// Verify that only single querier handled all the queries when sharding is enabled, otherwise queries have been fairly distributed across queriers.
127+
q1Values, err := querier1.SumMetrics([]string{"cortex_querier_request_duration_seconds"}, e2e.WithMetricCount)
128+
require.NoError(t, err)
129+
require.Len(t, q1Values, 1)
130+
131+
q2Values, err := querier2.SumMetrics([]string{"cortex_querier_request_duration_seconds"}, e2e.WithMetricCount)
132+
require.NoError(t, err)
133+
require.Len(t, q2Values, 1)
134+
135+
total := q1Values[0] + q2Values[0]
136+
diff := q1Values[0] - q2Values[0]
137+
if diff < 0 {
138+
diff = -diff
139+
}
140+
141+
require.Equal(t, float64(numQueries), total-2) // Remove 2 requests used for metrics initialization.
142+
143+
if sharding {
144+
require.Equal(t, float64(numQueries), diff)
145+
} else {
146+
require.InDelta(t, 0, diff, numQueries*0.20) // Both queriers should have roughly equal number of requests, with possible delta.
147+
}
148+
149+
// Ensure no service-specific metrics prefix is used by the wrong service.
150+
assertServiceMetricsPrefixes(t, Distributor, distributor)
151+
assertServiceMetricsPrefixes(t, Ingester, ingester)
152+
assertServiceMetricsPrefixes(t, Querier, querier1)
153+
assertServiceMetricsPrefixes(t, Querier, querier2)
154+
assertServiceMetricsPrefixes(t, QueryFrontend, queryFrontend)
155+
}

pkg/cortex/modules.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,7 @@ func (t *Cortex) initQueryFrontend() (serv services.Service, err error) {
405405
}
406406
}
407407

408-
t.Frontend, err = frontend.New(t.Cfg.Frontend, util.Logger, prometheus.DefaultRegisterer)
408+
t.Frontend, err = frontend.New(t.Cfg.Frontend, t.Overrides, util.Logger, prometheus.DefaultRegisterer)
409409
if err != nil {
410410
return
411411
}

0 commit comments

Comments
 (0)