Skip to content

Commit c0d3995

Browse files
authored
Added shuffle sharding support to generate a subring (#3090)
* Added shuffle sharding support to generate a subring Signed-off-by: Marco Pracucci <[email protected]> * Solved all TODOs Signed-off-by: Marco Pracucci <[email protected]> * Simplified unlock Signed-off-by: Marco Pracucci <[email protected]> * Fixed linter Signed-off-by: Marco Pracucci <[email protected]> * Added benchmark Signed-off-by: Marco Pracucci <[email protected]> * Replaced Subring() with ShuffleShard() Signed-off-by: Marco Pracucci <[email protected]> * Small improvements Signed-off-by: Marco Pracucci <[email protected]> * Shortened CHANGELOG entry Signed-off-by: Marco Pracucci <[email protected]> * Make golang doc happy Signed-off-by: Marco Pracucci <[email protected]> * Fixed flag name and added integration test Signed-off-by: Marco Pracucci <[email protected]>
1 parent b1ee0aa commit c0d3995

20 files changed

+892
-91
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,12 @@
22

33
## master / unreleased
44

5+
* [CHANGE] Improved shuffle sharding support in the write path. This work introduced some config changes: #3090
6+
* Introduced `-distributor.sharding-strategy` CLI flag (and its respective `sharding_strategy` YAML config option) to explicitly specify which sharding strategy should be used in the write path
7+
* `-experimental.distributor.user-subring-size` flag renamed to `-distributor.ingestion-tenant-shard-size`
8+
* `user_subring_size` limit YAML config option renamed to `ingestion_tenant_shard_size`
59
* [FEATURE] Added support for shuffle-sharding queriers in the query-frontend. When configured (`-frontend.max-queriers-per-user` globally, or using per-user limit `max_queriers_per_user`), each user's requests will be handled by different set of queriers. #3113
10+
* [ENHANCEMENT] Shuffle sharding: improved shuffle sharding in the write path. Shuffle sharding now should be explicitly enabled via `-distributor.sharding-strategy` CLI flag (or its respective YAML config option) and guarantees stability, consistency, shuffling and balanced zone-awareness properties. #3090
611
* [ENHANCEMENT] Ingester: added new metric `cortex_ingester_active_series` to track active series more accurately. Also added options to control whether active series tracking is enabled (`-ingester.active-series-enabled`, defaults to false), and how often this metric is updated (`-ingester.active-series-update-period`) and max idle time for series to be considered inactive (`-ingester.active-series-idle-timeout`). #3153
712
* [BUGFIX] No-longer-needed ingester operations for queries triggered by queriers and rulers are now canceled. #3178
813
* [BUGFIX] Ruler: directories in the configured `rules-path` will be removed on startup and shutdown in order to ensure they don't persist between runs. #3195

docs/configuration/config-file-reference.md

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,10 @@ ha_tracker:
392392
# CLI flag: -distributor.extra-query-delay
393393
[extra_queue_delay: <duration> | default = 0s]
394394
395+
# The sharding strategy to use. Supported values are: default, shuffle-sharding.
396+
# CLI flag: -distributor.sharding-strategy
397+
[sharding_strategy: <string> | default = "default"]
398+
395399
# Distribute samples based on all labels, as opposed to solely by user and
396400
# metric name.
397401
# CLI flag: -distributor.shard-by-all-labels
@@ -2748,9 +2752,12 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
27482752
# CLI flag: -validation.enforce-metric-name
27492753
[enforce_metric_name: <boolean> | default = true]
27502754
2751-
# Per-user subring to shard metrics to ingesters. 0 is disabled.
2752-
# CLI flag: -experimental.distributor.user-subring-size
2753-
[user_subring_size: <int> | default = 0]
2755+
# The default tenant's shard size when the shuffle-sharding strategy is used.
2756+
# Must be set both on ingesters and distributors. When this setting is specified
2757+
# in the per-tenant overrides, a value of 0 disables shuffle sharding for the
2758+
# tenant.
2759+
# CLI flag: -distributor.ingestion-tenant-shard-size
2760+
[ingestion_tenant_shard_size: <int> | default = 0]
27542761
27552762
# The maximum number of series for which a query can fetch samples from each
27562763
# ingester. This limit is enforced only in the ingesters (when querying samples

docs/configuration/v1-guarantees.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ Currently experimental features are:
3737

3838
- Azure blob storage.
3939
- Zone awareness based replication.
40-
- User subrings.
40+
- Shuffle sharding (both read and write path).
4141
- Ruler API (to PUT rules).
4242
- Alertmanager API
4343
- Memcached client DNS-based service discovery.

integration/e2e/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func filterMetrics(metrics []*io_prometheus_client.Metric, opts MetricsOptions)
7373
return filtered
7474
}
7575

76-
func sumValues(values []float64) float64 {
76+
func SumValues(values []float64) float64 {
7777
sum := 0.0
7878
for _, v := range values {
7979
sum += v

integration/e2e/service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,7 @@ func (s *HTTPService) SumMetrics(metricNames []string, opts ...MetricsOption) ([
586586
return nil, errors.Wrapf(errMissingMetric, "metric=%s service=%s", m, s.name)
587587
}
588588

589-
sums[i] = sumValues(getValues(metrics, options))
589+
sums[i] = SumValues(getValues(metrics, options))
590590
}
591591

592592
return sums, nil
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// +build requires_docker
2+
3+
package integration
4+
5+
import (
6+
"fmt"
7+
"strconv"
8+
"testing"
9+
"time"
10+
11+
"github.com/prometheus/prometheus/pkg/labels"
12+
"github.com/stretchr/testify/require"
13+
14+
"github.com/cortexproject/cortex/integration/e2e"
15+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
16+
"github.com/cortexproject/cortex/integration/e2ecortex"
17+
)
18+
19+
func TestIngesterSharding(t *testing.T) {
20+
const numSeriesToPush = 1000
21+
22+
tests := map[string]struct {
23+
shardingStrategy string
24+
tenantShardSize int
25+
expectedIngestersWithSeries int
26+
}{
27+
"default sharding strategy should spread series across all ingesters": {
28+
shardingStrategy: "default",
29+
tenantShardSize: 2, // Ignored by default strategy.
30+
expectedIngestersWithSeries: 3,
31+
},
32+
"shuffle-sharding strategy should spread series across the configured shard size number of ingesters": {
33+
shardingStrategy: "shuffle-sharding",
34+
tenantShardSize: 2,
35+
expectedIngestersWithSeries: 2,
36+
},
37+
}
38+
39+
for testName, testData := range tests {
40+
t.Run(testName, func(t *testing.T) {
41+
s, err := e2e.NewScenario(networkName)
42+
require.NoError(t, err)
43+
defer s.Close()
44+
45+
flags := BlocksStorageFlags
46+
flags["-distributor.sharding-strategy"] = testData.shardingStrategy
47+
flags["-distributor.ingestion-tenant-shard-size"] = strconv.Itoa(testData.tenantShardSize)
48+
49+
// Start dependencies.
50+
consul := e2edb.NewConsul()
51+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
52+
require.NoError(t, s.StartAndWaitReady(consul, minio))
53+
54+
// Start Cortex components.
55+
distributor := e2ecortex.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags, "")
56+
ingester1 := e2ecortex.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), flags, "")
57+
ingester2 := e2ecortex.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), flags, "")
58+
ingester3 := e2ecortex.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), flags, "")
59+
querier := e2ecortex.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags, "")
60+
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3, querier))
61+
62+
// Wait until distributor and queriers have updated the ring.
63+
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
64+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
65+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
66+
67+
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
68+
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
69+
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))
70+
71+
// Push series.
72+
now := time.Now()
73+
74+
client, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", userID)
75+
require.NoError(t, err)
76+
77+
for i := 1; i <= numSeriesToPush; i++ {
78+
series, _ := generateSeries(fmt.Sprintf("series_%d", i), now)
79+
res, err := client.Push(series)
80+
require.NoError(t, err)
81+
require.Equal(t, 200, res.StatusCode)
82+
}
83+
84+
// Extract metrics from ingesters.
85+
numIngestersWithSeries := 0
86+
totalIngestedSeries := 0
87+
88+
for _, ing := range []*e2ecortex.CortexService{ingester1, ingester2, ingester3} {
89+
values, err := ing.SumMetrics([]string{"cortex_ingester_memory_series"})
90+
require.NoError(t, err)
91+
92+
numMemorySeries := e2e.SumValues(values)
93+
totalIngestedSeries += int(numMemorySeries)
94+
if numMemorySeries > 0 {
95+
numIngestersWithSeries++
96+
}
97+
}
98+
99+
require.Equal(t, testData.expectedIngestersWithSeries, numIngestersWithSeries)
100+
require.Equal(t, numSeriesToPush, totalIngestedSeries)
101+
102+
// Ensure no service-specific metrics prefix is used by the wrong service.
103+
assertServiceMetricsPrefixes(t, Distributor, distributor)
104+
assertServiceMetricsPrefixes(t, Ingester, ingester1)
105+
assertServiceMetricsPrefixes(t, Ingester, ingester2)
106+
assertServiceMetricsPrefixes(t, Ingester, ingester3)
107+
assertServiceMetricsPrefixes(t, Querier, querier)
108+
})
109+
}
110+
}

pkg/cortex/cortex.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ func (c *Config) Validate(log log.Logger) error {
176176
if err := c.LimitsConfig.Validate(c.Distributor.ShardByAllLabels); err != nil {
177177
return errors.Wrap(err, "invalid limits config")
178178
}
179-
if err := c.Distributor.Validate(); err != nil {
179+
if err := c.Distributor.Validate(c.LimitsConfig); err != nil {
180180
return errors.Wrap(err, "invalid distributor config")
181181
}
182182
if err := c.Querier.Validate(); err != nil {

pkg/distributor/distributor.go

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package distributor
33
import (
44
"context"
55
"flag"
6+
"fmt"
67
"net/http"
78
"sort"
89
"strings"
@@ -104,11 +105,21 @@ var (
104105
Help: "Unix timestamp of latest received sample per user.",
105106
}, []string{"user"})
106107
emptyPreallocSeries = ingester_client.PreallocTimeseries{}
108+
109+
supportedShardingStrategies = []string{ShardingStrategyDefault, ShardingStrategyShuffle}
110+
111+
// Validation errors.
112+
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
113+
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")
107114
)
108115

109116
const (
110117
typeSamples = "samples"
111118
typeMetadata = "metadata"
119+
120+
// Supported sharding strategies.
121+
ShardingStrategyDefault = "default"
122+
ShardingStrategyShuffle = "shuffle-sharding"
112123
)
113124

114125
// Distributor is a storage.SampleAppender and a client.Querier which
@@ -147,7 +158,8 @@ type Config struct {
147158
RemoteTimeout time.Duration `yaml:"remote_timeout"`
148159
ExtraQueryDelay time.Duration `yaml:"extra_queue_delay"`
149160

150-
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
161+
ShardingStrategy string `yaml:"sharding_strategy"`
162+
ShardByAllLabels bool `yaml:"shard_by_all_labels"`
151163

152164
// Distributors ring
153165
DistributorRing RingConfig `yaml:"ring"`
@@ -170,10 +182,19 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
170182
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
171183
f.DurationVar(&cfg.ExtraQueryDelay, "distributor.extra-query-delay", 0, "Time to wait before sending more than the minimum successful query requests.")
172184
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
185+
f.StringVar(&cfg.ShardingStrategy, "distributor.sharding-strategy", ShardingStrategyDefault, fmt.Sprintf("The sharding strategy to use. Supported values are: %s.", strings.Join(supportedShardingStrategies, ", ")))
173186
}
174187

175188
// Validate config and returns error on failure
176-
func (cfg *Config) Validate() error {
189+
func (cfg *Config) Validate(limits validation.Limits) error {
190+
if !util.StringsContain(supportedShardingStrategies, cfg.ShardingStrategy) {
191+
return errInvalidShardingStrategy
192+
}
193+
194+
if cfg.ShardingStrategy == ShardingStrategyShuffle && limits.IngestionTenantShardSize <= 0 {
195+
return errInvalidTenantShardSize
196+
}
197+
177198
return cfg.HATrackerConfig.Validate()
178199
}
179200

@@ -508,13 +529,11 @@ func (d *Distributor) Push(ctx context.Context, req *client.WriteRequest) (*clie
508529
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), validatedSamples, len(validatedMetadata))
509530
}
510531

511-
var subRing ring.ReadRing
512-
subRing = d.ingestersRing
532+
subRing := d.ingestersRing.(ring.ReadRing)
513533

514-
// Obtain a subring if required
515-
if size := d.limits.SubringSize(userID); size > 0 {
516-
h := client.HashAdd32a(client.HashNew32a(), userID)
517-
subRing = d.ingestersRing.Subring(h, size)
534+
// Obtain a subring if required.
535+
if d.cfg.ShardingStrategy == ShardingStrategyShuffle {
536+
subRing = d.ingestersRing.ShuffleShard(userID, d.limits.IngestionTenantShardSize(userID))
518537
}
519538

520539
keys := append(seriesKeys, metadataKeys...)

pkg/distributor/distributor_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,58 @@ var (
4646
ctx = user.InjectOrgID(context.Background(), "user")
4747
)
4848

49+
func TestConfig_Validate(t *testing.T) {
50+
tests := map[string]struct {
51+
initConfig func(*Config)
52+
initLimits func(*validation.Limits)
53+
expected error
54+
}{
55+
"default config should pass": {
56+
initConfig: func(_ *Config) {},
57+
initLimits: func(_ *validation.Limits) {},
58+
expected: nil,
59+
},
60+
"should fail on invalid sharding strategy": {
61+
initConfig: func(cfg *Config) {
62+
cfg.ShardingStrategy = "xxx"
63+
},
64+
initLimits: func(_ *validation.Limits) {},
65+
expected: errInvalidShardingStrategy,
66+
},
67+
"should fail if the default shard size is 0 on when sharding strategy = shuffle-sharding": {
68+
initConfig: func(cfg *Config) {
69+
cfg.ShardingStrategy = "shuffle-sharding"
70+
},
71+
initLimits: func(limits *validation.Limits) {
72+
limits.IngestionTenantShardSize = 0
73+
},
74+
expected: errInvalidTenantShardSize,
75+
},
76+
"should pass if the default shard size > 0 on when sharding strategy = shuffle-sharding": {
77+
initConfig: func(cfg *Config) {
78+
cfg.ShardingStrategy = "shuffle-sharding"
79+
},
80+
initLimits: func(limits *validation.Limits) {
81+
limits.IngestionTenantShardSize = 3
82+
},
83+
expected: nil,
84+
},
85+
}
86+
87+
for testName, testData := range tests {
88+
t.Run(testName, func(t *testing.T) {
89+
cfg := Config{}
90+
limits := validation.Limits{}
91+
flagext.DefaultValues(&cfg, &limits)
92+
93+
testData.initConfig(&cfg)
94+
testData.initLimits(&limits)
95+
96+
assert.Equal(t, testData.expected, cfg.Validate(limits))
97+
})
98+
}
99+
}
100+
49101
func TestDistributor_Push(t *testing.T) {
50102
// Metrics to assert on.
51103
lastSeenTimestamp := "cortex_distributor_latest_seen_sample_timestamp_seconds"

pkg/querier/blocks_store_replicated_set_test.go

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {
3737
block3Hash := cortex_tsdb.HashBlockID(block3)
3838
block4Hash := cortex_tsdb.HashBlockID(block4)
3939

40-
// Ensure the user ID we use belongs to the instances holding the token for the block 1
41-
// (it's expected by the assertions below).
4240
userID := "user-A"
43-
require.LessOrEqual(t, cortex_tsdb.HashTenantID(userID), block1Hash)
4441

4542
tests := map[string]struct {
4643
shardingStrategy string
@@ -250,7 +247,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {
250247
queryBlocks: []ulid.ULID{block1, block2, block4},
251248
expectedClients: map[string][]ulid.ULID{
252249
"127.0.0.1": {block1, block4},
253-
"127.0.0.2": {block2},
250+
"127.0.0.3": {block2},
254251
},
255252
},
256253
"shuffle sharding, multiple instances in the ring with RF = 1, SS = 4": {
@@ -286,7 +283,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {
286283
block2: {"127.0.0.1"},
287284
},
288285
expectedClients: map[string][]ulid.ULID{
289-
"127.0.0.2": {block1, block2},
286+
"127.0.0.3": {block1, block2},
290287
},
291288
},
292289
"shuffle sharding, multiple instances in the ring with RF = 2, SS = 2 with excluded blocks and no replacement available": {
@@ -301,7 +298,7 @@ func TestBlocksStoreReplicationSet_GetClientsFor(t *testing.T) {
301298
},
302299
queryBlocks: []ulid.ULID{block1, block2},
303300
exclude: map[ulid.ULID][]string{
304-
block1: {"127.0.0.1", "127.0.0.2"},
301+
block1: {"127.0.0.1", "127.0.0.3"},
305302
block2: {"127.0.0.1"},
306303
},
307304
expectedErr: fmt.Errorf("no store-gateway instance left after checking exclude for block %s", block1.String()),

0 commit comments

Comments
 (0)