Skip to content

Commit e43cb34

Browse files
authored
Added per-tenant in-process sharding support to compactor (#2599)
* Added per-tenant in-process sharding support to compactor Signed-off-by: Marco Pracucci <[email protected]> * Added concurrency config option Signed-off-by: Marco Pracucci <[email protected]> * Fixed filter Signed-off-by: Marco Pracucci <[email protected]> * Updated CHANGELOG Signed-off-by: Marco Pracucci <[email protected]> * Improved distribution test Signed-off-by: Marco Pracucci <[email protected]> * Fixed external labels removal at query time Signed-off-by: Marco Pracucci <[email protected]> * Fixed removal of external labels when querying back blocks from storage Signed-off-by: Marco Pracucci <[email protected]> * Added unit test Signed-off-by: Marco Pracucci <[email protected]> * Fixed linter Signed-off-by: Marco Pracucci <[email protected]>
1 parent 7ab5ea6 commit e43cb34

File tree

12 files changed

+398
-19
lines changed

12 files changed

+398
-19
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
* `cortex_storegateway_blocks_last_successful_sync_timestamp_seconds`
8383
* [ENHANCEMENT] Experimental TSDB: added the flag `-experimental.tsdb.wal-compression-enabled` to allow to enable TSDB WAL compression. #2585
8484
* [ENHANCEMENT] Experimental TSDB: Querier and store-gateway components can now use so-called "caching bucket", which can currently cache fetched chunks into shared memcached server. #2572
85+
* [ENHANCEMENT] Experimental TSDB: added per-tenant blocks sharding support in the compactor, in order to parallelize blocks compaction for a single tenant in a single node. Sharding can be enabled via `-compactor.per-tenant-num-shards` while the parallelization can be controlled with `-compactor.per-tenant-shards-concurrency`. #2599
8586
* [BUGFIX] Ruler: Ensure temporary rule files with special characters are properly mapped and cleaned up. #2506
8687
* [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372
8788
* [BUGFIX] Experimental TSDB: fixed chunk data corruption when querying back series using the experimental blocks storage. #2400

docs/configuration/config-file-reference.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2993,6 +2993,15 @@ sharding_ring:
29932993
# the ring.
29942994
# CLI flag: -compactor.ring.heartbeat-timeout
29952995
[heartbeat_timeout: <duration> | default = 1m]
2996+
2997+
# Number of shards a single tenant blocks should be grouped into (0 or 1 means
2998+
# per-tenant blocks sharding is disabled).
2999+
# CLI flag: -compactor.per-tenant-num-shards
3000+
[per_tenant_num_shards: <int> | default = 1]
3001+
3002+
# Number of concurrent shards compacted for a single tenant.
3003+
# CLI flag: -compactor.per-tenant-shards-concurrency
3004+
[per_tenant_shards_concurrency: <int> | default = 1]
29963005
```
29973006

29983007
### `store_gateway_config`

docs/operations/blocks-storage.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,15 @@ compactor:
613613
# within the ring.
614614
# CLI flag: -compactor.ring.heartbeat-timeout
615615
[heartbeat_timeout: <duration> | default = 1m]
616+
617+
# Number of shards a single tenant blocks should be grouped into (0 or 1 means
618+
# per-tenant blocks sharding is disabled).
619+
# CLI flag: -compactor.per-tenant-num-shards
620+
[per_tenant_num_shards: <int> | default = 1]
621+
622+
# Number of concurrent shards compacted for a single tenant.
623+
# CLI flag: -compactor.per-tenant-shards-concurrency
624+
[per_tenant_shards_concurrency: <int> | default = 1]
616625
```
617626

618627
## Known issues
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package compactor
2+
3+
import (
4+
"context"
5+
"strconv"
6+
7+
"github.com/oklog/ulid"
8+
"github.com/thanos-io/thanos/pkg/block/metadata"
9+
"github.com/thanos-io/thanos/pkg/extprom"
10+
11+
"github.com/cortexproject/cortex/pkg/ingester/client"
12+
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
13+
)
14+
15+
type BlocksShardingFilter struct {
16+
shards uint32
17+
}
18+
19+
func NewBlocksShardingFilter(shards uint32) *BlocksShardingFilter {
20+
return &BlocksShardingFilter{
21+
shards: shards,
22+
}
23+
}
24+
25+
func (f *BlocksShardingFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, _ *extprom.TxGaugeVec) error {
26+
for _, m := range metas {
27+
// Just remove the ingester ID label if sharding is disabled.
28+
if f.shards <= 1 {
29+
delete(m.Thanos.Labels, cortex_tsdb.IngesterIDExternalLabel)
30+
continue
31+
}
32+
33+
// Skip any block already containing the shard ID to avoid any manipulation
34+
// (ie. in case the sharding algorithm changes over the time).
35+
if _, ok := m.Thanos.Labels[cortex_tsdb.ShardIDExternalLabel]; ok {
36+
continue
37+
}
38+
39+
// Skip any block without the ingester ID, which means the block has been generated
40+
// before the introduction of the ingester ID.
41+
ingesterID, ok := m.Thanos.Labels[cortex_tsdb.IngesterIDExternalLabel]
42+
if !ok {
43+
continue
44+
}
45+
46+
// Compute the shard ID and replace the ingester label with the shard label
47+
// so that the compactor will group blocks based on the shard.
48+
shardID := shardByIngesterID(ingesterID, f.shards)
49+
delete(m.Thanos.Labels, cortex_tsdb.IngesterIDExternalLabel)
50+
m.Thanos.Labels[cortex_tsdb.ShardIDExternalLabel] = shardID
51+
}
52+
53+
return nil
54+
}
55+
56+
// hashIngesterID returns a 32-bit hash of the ingester ID.
57+
func hashIngesterID(id string) uint32 {
58+
h := client.HashNew32()
59+
h = client.HashAdd32(h, id)
60+
return h
61+
}
62+
63+
// shardByIngesterID returns the shard given an ingester ID.
64+
func shardByIngesterID(id string, numShards uint32) string {
65+
return strconv.Itoa(int(hashIngesterID(id) % numShards))
66+
}
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package compactor
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
8+
"github.com/oklog/ulid"
9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
"github.com/thanos-io/thanos/pkg/block/metadata"
12+
13+
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
14+
)
15+
16+
func TestHashIngesterID(t *testing.T) {
17+
tests := []struct {
18+
first string
19+
second string
20+
expectedEqual bool
21+
}{
22+
{
23+
first: "ingester-0",
24+
second: "ingester-0",
25+
expectedEqual: true,
26+
},
27+
{
28+
first: "ingester-0",
29+
second: "ingester-1",
30+
expectedEqual: false,
31+
},
32+
}
33+
34+
for _, testCase := range tests {
35+
firstHash := hashIngesterID(testCase.first)
36+
secondHash := hashIngesterID(testCase.second)
37+
assert.Equal(t, testCase.expectedEqual, firstHash == secondHash)
38+
}
39+
}
40+
41+
func TestShardByIngesterID_DistributionForKubernetesStatefulSets(t *testing.T) {
42+
const (
43+
numShards = 3
44+
distributionThreshold = 0.8
45+
)
46+
47+
for _, numIngesters := range []int{10, 30, 50, 100} {
48+
// Generate the ingester IDs.
49+
ids := make([]string, numIngesters)
50+
for i := 0; i < numIngesters; i++ {
51+
ids[i] = fmt.Sprintf("ingester-%d", i)
52+
}
53+
54+
// Compute the shard for each ingester.
55+
distribution := map[string][]string{}
56+
for _, id := range ids {
57+
shard := shardByIngesterID(id, numShards)
58+
distribution[shard] = append(distribution[shard], id)
59+
}
60+
61+
// Ensure the distribution is fair.
62+
minSizePerShard := distributionThreshold * (float64(numIngesters) / float64(numShards))
63+
64+
for _, ingesters := range distribution {
65+
assert.GreaterOrEqual(t, len(ingesters), int(minSizePerShard))
66+
}
67+
}
68+
}
69+
70+
func TestBlocksShardingFilter(t *testing.T) {
71+
block1 := ulid.MustNew(1, nil)
72+
block2 := ulid.MustNew(2, nil)
73+
block3 := ulid.MustNew(3, nil)
74+
75+
tests := map[string]struct {
76+
numShards uint32
77+
input map[ulid.ULID]map[string]string
78+
expected map[ulid.ULID]map[string]string
79+
}{
80+
"blocks from the same ingester should go into the same shard": {
81+
numShards: 3,
82+
input: map[ulid.ULID]map[string]string{
83+
block1: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0", cortex_tsdb.TenantIDExternalLabel: "user-1"},
84+
block2: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0", cortex_tsdb.TenantIDExternalLabel: "user-1"},
85+
block3: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0", cortex_tsdb.TenantIDExternalLabel: "user-1"},
86+
},
87+
expected: map[ulid.ULID]map[string]string{
88+
block1: {cortex_tsdb.ShardIDExternalLabel: "2", cortex_tsdb.TenantIDExternalLabel: "user-1"},
89+
block2: {cortex_tsdb.ShardIDExternalLabel: "2", cortex_tsdb.TenantIDExternalLabel: "user-1"},
90+
block3: {cortex_tsdb.ShardIDExternalLabel: "2", cortex_tsdb.TenantIDExternalLabel: "user-1"},
91+
},
92+
},
93+
"blocks from the different ingesters should be sharded": {
94+
numShards: 3,
95+
input: map[ulid.ULID]map[string]string{
96+
block1: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0", cortex_tsdb.TenantIDExternalLabel: "user-1"},
97+
block2: {cortex_tsdb.IngesterIDExternalLabel: "ingester-1", cortex_tsdb.TenantIDExternalLabel: "user-1"},
98+
block3: {cortex_tsdb.IngesterIDExternalLabel: "ingester-2", cortex_tsdb.TenantIDExternalLabel: "user-1"},
99+
},
100+
expected: map[ulid.ULID]map[string]string{
101+
block1: {cortex_tsdb.ShardIDExternalLabel: "2", cortex_tsdb.TenantIDExternalLabel: "user-1"},
102+
block2: {cortex_tsdb.ShardIDExternalLabel: "1", cortex_tsdb.TenantIDExternalLabel: "user-1"},
103+
block3: {cortex_tsdb.ShardIDExternalLabel: "0", cortex_tsdb.TenantIDExternalLabel: "user-1"},
104+
},
105+
},
106+
"blocks without ingester ID should not be mangled": {
107+
numShards: 3,
108+
input: map[ulid.ULID]map[string]string{
109+
block1: {cortex_tsdb.ShardIDExternalLabel: "2", cortex_tsdb.TenantIDExternalLabel: "user-1"},
110+
block2: {cortex_tsdb.ShardIDExternalLabel: "1", cortex_tsdb.TenantIDExternalLabel: "user-1"},
111+
block3: {cortex_tsdb.ShardIDExternalLabel: "0", cortex_tsdb.TenantIDExternalLabel: "user-1"},
112+
},
113+
expected: map[ulid.ULID]map[string]string{
114+
block1: {cortex_tsdb.ShardIDExternalLabel: "2", cortex_tsdb.TenantIDExternalLabel: "user-1"},
115+
block2: {cortex_tsdb.ShardIDExternalLabel: "1", cortex_tsdb.TenantIDExternalLabel: "user-1"},
116+
block3: {cortex_tsdb.ShardIDExternalLabel: "0", cortex_tsdb.TenantIDExternalLabel: "user-1"},
117+
},
118+
},
119+
"should remove the ingester ID external label if sharding is disabled": {
120+
numShards: 1,
121+
input: map[ulid.ULID]map[string]string{
122+
block1: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0", cortex_tsdb.TenantIDExternalLabel: "user-1"},
123+
block2: {cortex_tsdb.IngesterIDExternalLabel: "ingester-1", cortex_tsdb.TenantIDExternalLabel: "user-1"},
124+
block3: {cortex_tsdb.IngesterIDExternalLabel: "ingester-2", cortex_tsdb.TenantIDExternalLabel: "user-1"},
125+
},
126+
expected: map[ulid.ULID]map[string]string{
127+
block1: {cortex_tsdb.TenantIDExternalLabel: "user-1"},
128+
block2: {cortex_tsdb.TenantIDExternalLabel: "user-1"},
129+
block3: {cortex_tsdb.TenantIDExternalLabel: "user-1"},
130+
},
131+
},
132+
}
133+
134+
for testName, testData := range tests {
135+
t.Run(testName, func(t *testing.T) {
136+
metas := map[ulid.ULID]*metadata.Meta{}
137+
for id, lbls := range testData.input {
138+
metas[id] = &metadata.Meta{Thanos: metadata.Thanos{Labels: lbls}}
139+
}
140+
141+
f := NewBlocksShardingFilter(testData.numShards)
142+
err := f.Filter(context.Background(), metas, nil)
143+
require.NoError(t, err)
144+
assert.Len(t, metas, len(testData.expected))
145+
146+
for expectedID, expectedLbls := range testData.expected {
147+
assert.NotNil(t, metas[expectedID])
148+
assert.Equal(t, metas[expectedID].Thanos.Labels, expectedLbls)
149+
}
150+
})
151+
}
152+
}

pkg/compactor/compactor.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,10 @@ type Config struct {
3838
DeletionDelay time.Duration `yaml:"deletion_delay"`
3939

4040
// Compactors sharding.
41-
ShardingEnabled bool `yaml:"sharding_enabled"`
42-
ShardingRing RingConfig `yaml:"sharding_ring"`
41+
ShardingEnabled bool `yaml:"sharding_enabled"`
42+
ShardingRing RingConfig `yaml:"sharding_ring"`
43+
PerTenantNumShards uint `yaml:"per_tenant_num_shards"`
44+
PerTenantShardsConcurrency int `yaml:"per_tenant_shards_concurrency"`
4345

4446
// No need to add options to customize the retry backoff,
4547
// given the defaults should be fine, but allow to override
@@ -64,6 +66,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
6466
f.DurationVar(&cfg.CompactionInterval, "compactor.compaction-interval", time.Hour, "The frequency at which the compaction runs")
6567
f.IntVar(&cfg.CompactionRetries, "compactor.compaction-retries", 3, "How many times to retry a failed compaction during a single compaction interval")
6668
f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.")
69+
f.UintVar(&cfg.PerTenantNumShards, "compactor.per-tenant-num-shards", 1, "Number of shards a single tenant blocks should be grouped into (0 or 1 means per-tenant blocks sharding is disabled).")
70+
f.IntVar(&cfg.PerTenantShardsConcurrency, "compactor.per-tenant-shards-concurrency", 1, "Number of concurrent shards compacted for a single tenant.")
6771
f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+
6872
"If not 0, blocks will be marked for deletion and compactor component will delete blocks marked for deletion from the bucket. "+
6973
"If delete-delay is 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures, "+
@@ -346,6 +350,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
346350
reg,
347351
[]block.MetadataFilter{
348352
// List of filters to apply (order matters).
353+
NewBlocksShardingFilter(uint32(c.compactorCfg.PerTenantNumShards)),
349354
block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg),
350355
ignoreDeletionMarkFilter,
351356
deduplicateBlocksFilter,
@@ -378,10 +383,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
378383
c.tsdbCompactor,
379384
path.Join(c.compactorCfg.DataDir, "compact"),
380385
bucket,
381-
// No compaction concurrency. Due to how Cortex works we don't
382-
// expect to have multiple block groups per tenant, so setting
383-
// a value higher than 1 would be useless.
384-
1,
386+
c.compactorCfg.PerTenantShardsConcurrency,
385387
)
386388
if err != nil {
387389
return errors.Wrap(err, "failed to create bucket compactor")

pkg/ingester/ingester_v2.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,9 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
805805
{
806806
Name: cortex_tsdb.TenantIDExternalLabel,
807807
Value: userID,
808+
}, {
809+
Name: cortex_tsdb.IngesterIDExternalLabel,
810+
Value: i.lifecycler.ID,
808811
},
809812
}
810813

pkg/querier/block.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -208,10 +208,7 @@ func newBlockQuerierSeries(lbls []storepb.Label, chunks []storepb.AggrChunk) *bl
208208

209209
b := labels.NewBuilder(nil)
210210
for _, l := range lbls {
211-
// Ignore external label set by the shipper
212-
if l.Name != tsdb.TenantIDExternalLabel {
213-
b.Set(l.Name, l.Value)
214-
}
211+
b.Set(l.Name, l.Value)
215212
}
216213

217214
return &blockQuerierSeries{labels: b.Labels(), chunks: chunks}

pkg/querier/block_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@ import (
1212
"github.com/stretchr/testify/assert"
1313
"github.com/stretchr/testify/require"
1414
"github.com/thanos-io/thanos/pkg/store/storepb"
15-
16-
"github.com/cortexproject/cortex/pkg/storage/tsdb"
1715
)
1816

1917
func TestBlockQuerierSeries(t *testing.T) {
@@ -34,10 +32,9 @@ func TestBlockQuerierSeries(t *testing.T) {
3432
expectedMetric: labels.Labels(nil),
3533
expectedErr: "no chunks",
3634
},
37-
"should remove the external label added by the shipper": {
35+
"should return series on success": {
3836
series: &storepb.Series{
3937
Labels: []storepb.Label{
40-
{Name: tsdb.TenantIDExternalLabel, Value: "test"},
4138
{Name: "foo", Value: "bar"},
4239
},
4340
Chunks: []storepb.AggrChunk{
@@ -133,7 +130,7 @@ func TestBlockQuerierSeriesSet(t *testing.T) {
133130

134131
// second, with multiple chunks
135132
{
136-
Labels: mkLabels("__name__", "second", tsdb.TenantIDExternalLabel, "to be removed"),
133+
Labels: mkLabels("__name__", "second"),
137134
Chunks: []storepb.AggrChunk{
138135
// unordered chunks
139136
createChunkWithSineSamples(now.Add(400*time.Second), now.Add(600*time.Second), 5*time.Millisecond), // 200 / 0.005 (= 40000 samples, = 120000 in total)

pkg/storage/tsdb/config.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,17 @@ const (
3131
// BackendFilesystem is the value for the filesystem storge backend
3232
BackendFilesystem = "filesystem"
3333

34-
// TenantIDExternalLabel is the external label set when shipping blocks to the storage
34+
// TenantIDExternalLabel is the external label containing the tenant ID,
35+
// set when shipping blocks to the storage.
3536
TenantIDExternalLabel = "__org_id__"
37+
38+
// IngesterIDExternalLabel is the external label containing the ingester ID,
39+
// set when shipping blocks to the storage.
40+
IngesterIDExternalLabel = "__ingester_id__"
41+
42+
// ShardIDExternalLabel is the external label containing the shard ID,
43+
// set by the compactor.
44+
ShardIDExternalLabel = "__shard_id__"
3645
)
3746

3847
// Validation errors

0 commit comments

Comments
 (0)