Skip to content

Added per-tenant in-process sharding support to compactor #2599

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
* `cortex_storegateway_blocks_last_successful_sync_timestamp_seconds`
* [ENHANCEMENT] Experimental TSDB: added the flag `-experimental.tsdb.wal-compression-enabled` to allow to enable TSDB WAL compression. #2585
* [ENHANCEMENT] Experimental TSDB: Querier and store-gateway components can now use so-called "caching bucket", which can currently cache fetched chunks into shared memcached server. #2572
* [ENHANCEMENT] Experimental TSDB: added per-tenant blocks sharding support in the compactor, in order to parallelize blocks compaction for a single tenant in a single node. Sharding can be enabled via `-compactor.per-tenant-num-shards` while the parallelization can be controlled with `-compactor.per-tenant-shards-concurrency`. #2599
* [BUGFIX] Ruler: Ensure temporary rule files with special characters are properly mapped and cleaned up. #2506
* [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372
* [BUGFIX] Experimental TSDB: fixed chunk data corruption when querying back series using the experimental blocks storage. #2400
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -2993,6 +2993,15 @@ sharding_ring:
# the ring.
# CLI flag: -compactor.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

# Number of shards a single tenant blocks should be grouped into (0 or 1 means
# per-tenant blocks sharding is disabled).
# CLI flag: -compactor.per-tenant-num-shards
[per_tenant_num_shards: <int> | default = 1]

# Number of concurrent shards compacted for a single tenant.
# CLI flag: -compactor.per-tenant-shards-concurrency
[per_tenant_shards_concurrency: <int> | default = 1]
```

### `store_gateway_config`
Expand Down
9 changes: 9 additions & 0 deletions docs/operations/blocks-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,15 @@ compactor:
# within the ring.
# CLI flag: -compactor.ring.heartbeat-timeout
[heartbeat_timeout: <duration> | default = 1m]

# Number of shards a single tenant blocks should be grouped into (0 or 1 means
# per-tenant blocks sharding is disabled).
# CLI flag: -compactor.per-tenant-num-shards
[per_tenant_num_shards: <int> | default = 1]

# Number of concurrent shards compacted for a single tenant.
# CLI flag: -compactor.per-tenant-shards-concurrency
[per_tenant_shards_concurrency: <int> | default = 1]
```

## Known issues
Expand Down
66 changes: 66 additions & 0 deletions pkg/compactor/blocks_sharding_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package compactor

import (
"context"
"strconv"

"github.com/oklog/ulid"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/extprom"

"github.com/cortexproject/cortex/pkg/ingester/client"
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
)

type BlocksShardingFilter struct {
shards uint32
}

func NewBlocksShardingFilter(shards uint32) *BlocksShardingFilter {
return &BlocksShardingFilter{
shards: shards,
}
}

func (f *BlocksShardingFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, _ *extprom.TxGaugeVec) error {
for _, m := range metas {
// Just remove the ingester ID label if sharding is disabled.
if f.shards <= 1 {
delete(m.Thanos.Labels, cortex_tsdb.IngesterIDExternalLabel)
continue
}

// Skip any block already containing the shard ID to avoid any manipulation
// (ie. in case the sharding algorithm changes over the time).
if _, ok := m.Thanos.Labels[cortex_tsdb.ShardIDExternalLabel]; ok {
continue
}

// Skip any block without the ingester ID, which means the block has been generated
// before the introduction of the ingester ID.
ingesterID, ok := m.Thanos.Labels[cortex_tsdb.IngesterIDExternalLabel]
if !ok {
continue
}

// Compute the shard ID and replace the ingester label with the shard label
// so that the compactor will group blocks based on the shard.
shardID := shardByIngesterID(ingesterID, f.shards)
delete(m.Thanos.Labels, cortex_tsdb.IngesterIDExternalLabel)
m.Thanos.Labels[cortex_tsdb.ShardIDExternalLabel] = shardID
}

return nil
}

// hashIngesterID returns a 32-bit hash of the ingester ID.
func hashIngesterID(id string) uint32 {
h := client.HashNew32()
h = client.HashAdd32(h, id)
return h
}

// shardByIngesterID returns the shard given an ingester ID.
func shardByIngesterID(id string, numShards uint32) string {
return strconv.Itoa(int(hashIngesterID(id) % numShards))
}
152 changes: 152 additions & 0 deletions pkg/compactor/blocks_sharding_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package compactor

import (
"context"
"fmt"
"testing"

"github.com/oklog/ulid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/block/metadata"

cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
)

func TestHashIngesterID(t *testing.T) {
tests := []struct {
first string
second string
expectedEqual bool
}{
{
first: "ingester-0",
second: "ingester-0",
expectedEqual: true,
},
{
first: "ingester-0",
second: "ingester-1",
expectedEqual: false,
},
}

for _, testCase := range tests {
firstHash := hashIngesterID(testCase.first)
secondHash := hashIngesterID(testCase.second)
assert.Equal(t, testCase.expectedEqual, firstHash == secondHash)
}
}

func TestShardByIngesterID_DistributionForKubernetesStatefulSets(t *testing.T) {
const (
numShards = 3
distributionThreshold = 0.8
)

for _, numIngesters := range []int{10, 30, 50, 100} {
// Generate the ingester IDs.
ids := make([]string, numIngesters)
for i := 0; i < numIngesters; i++ {
ids[i] = fmt.Sprintf("ingester-%d", i)
}

// Compute the shard for each ingester.
distribution := map[string][]string{}
for _, id := range ids {
shard := shardByIngesterID(id, numShards)
distribution[shard] = append(distribution[shard], id)
}

// Ensure the distribution is fair.
minSizePerShard := distributionThreshold * (float64(numIngesters) / float64(numShards))

for _, ingesters := range distribution {
assert.GreaterOrEqual(t, len(ingesters), int(minSizePerShard))
}
}
}

func TestBlocksShardingFilter(t *testing.T) {
block1 := ulid.MustNew(1, nil)
block2 := ulid.MustNew(2, nil)
block3 := ulid.MustNew(3, nil)

tests := map[string]struct {
numShards uint32
input map[ulid.ULID]map[string]string
expected map[ulid.ULID]map[string]string
}{
"blocks from the same ingester should go into the same shard": {
numShards: 3,
input: map[ulid.ULID]map[string]string{
block1: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0", cortex_tsdb.TenantIDExternalLabel: "user-1"},
block2: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0", cortex_tsdb.TenantIDExternalLabel: "user-1"},
block3: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0", cortex_tsdb.TenantIDExternalLabel: "user-1"},
},
expected: map[ulid.ULID]map[string]string{
block1: {cortex_tsdb.ShardIDExternalLabel: "2", cortex_tsdb.TenantIDExternalLabel: "user-1"},
block2: {cortex_tsdb.ShardIDExternalLabel: "2", cortex_tsdb.TenantIDExternalLabel: "user-1"},
block3: {cortex_tsdb.ShardIDExternalLabel: "2", cortex_tsdb.TenantIDExternalLabel: "user-1"},
},
},
"blocks from the different ingesters should be sharded": {
numShards: 3,
input: map[ulid.ULID]map[string]string{
block1: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0", cortex_tsdb.TenantIDExternalLabel: "user-1"},
block2: {cortex_tsdb.IngesterIDExternalLabel: "ingester-1", cortex_tsdb.TenantIDExternalLabel: "user-1"},
block3: {cortex_tsdb.IngesterIDExternalLabel: "ingester-2", cortex_tsdb.TenantIDExternalLabel: "user-1"},
},
expected: map[ulid.ULID]map[string]string{
block1: {cortex_tsdb.ShardIDExternalLabel: "2", cortex_tsdb.TenantIDExternalLabel: "user-1"},
block2: {cortex_tsdb.ShardIDExternalLabel: "1", cortex_tsdb.TenantIDExternalLabel: "user-1"},
block3: {cortex_tsdb.ShardIDExternalLabel: "0", cortex_tsdb.TenantIDExternalLabel: "user-1"},
},
},
"blocks without ingester ID should not be mangled": {
numShards: 3,
input: map[ulid.ULID]map[string]string{
block1: {cortex_tsdb.ShardIDExternalLabel: "2", cortex_tsdb.TenantIDExternalLabel: "user-1"},
block2: {cortex_tsdb.ShardIDExternalLabel: "1", cortex_tsdb.TenantIDExternalLabel: "user-1"},
block3: {cortex_tsdb.ShardIDExternalLabel: "0", cortex_tsdb.TenantIDExternalLabel: "user-1"},
},
expected: map[ulid.ULID]map[string]string{
block1: {cortex_tsdb.ShardIDExternalLabel: "2", cortex_tsdb.TenantIDExternalLabel: "user-1"},
block2: {cortex_tsdb.ShardIDExternalLabel: "1", cortex_tsdb.TenantIDExternalLabel: "user-1"},
block3: {cortex_tsdb.ShardIDExternalLabel: "0", cortex_tsdb.TenantIDExternalLabel: "user-1"},
},
},
"should remove the ingester ID external label if sharding is disabled": {
numShards: 1,
input: map[ulid.ULID]map[string]string{
block1: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0", cortex_tsdb.TenantIDExternalLabel: "user-1"},
block2: {cortex_tsdb.IngesterIDExternalLabel: "ingester-1", cortex_tsdb.TenantIDExternalLabel: "user-1"},
block3: {cortex_tsdb.IngesterIDExternalLabel: "ingester-2", cortex_tsdb.TenantIDExternalLabel: "user-1"},
},
expected: map[ulid.ULID]map[string]string{
block1: {cortex_tsdb.TenantIDExternalLabel: "user-1"},
block2: {cortex_tsdb.TenantIDExternalLabel: "user-1"},
block3: {cortex_tsdb.TenantIDExternalLabel: "user-1"},
},
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
metas := map[ulid.ULID]*metadata.Meta{}
for id, lbls := range testData.input {
metas[id] = &metadata.Meta{Thanos: metadata.Thanos{Labels: lbls}}
}

f := NewBlocksShardingFilter(testData.numShards)
err := f.Filter(context.Background(), metas, nil)
require.NoError(t, err)
assert.Len(t, metas, len(testData.expected))

for expectedID, expectedLbls := range testData.expected {
assert.NotNil(t, metas[expectedID])
assert.Equal(t, metas[expectedID].Thanos.Labels, expectedLbls)
}
})
}
}
14 changes: 8 additions & 6 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ type Config struct {
DeletionDelay time.Duration `yaml:"deletion_delay"`

// Compactors sharding.
ShardingEnabled bool `yaml:"sharding_enabled"`
ShardingRing RingConfig `yaml:"sharding_ring"`
ShardingEnabled bool `yaml:"sharding_enabled"`
ShardingRing RingConfig `yaml:"sharding_ring"`
PerTenantNumShards uint `yaml:"per_tenant_num_shards"`
PerTenantShardsConcurrency int `yaml:"per_tenant_shards_concurrency"`

// No need to add options to customize the retry backoff,
// given the defaults should be fine, but allow to override
Expand All @@ -64,6 +66,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.CompactionInterval, "compactor.compaction-interval", time.Hour, "The frequency at which the compaction runs")
f.IntVar(&cfg.CompactionRetries, "compactor.compaction-retries", 3, "How many times to retry a failed compaction during a single compaction interval")
f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.")
f.UintVar(&cfg.PerTenantNumShards, "compactor.per-tenant-num-shards", 1, "Number of shards a single tenant blocks should be grouped into (0 or 1 means per-tenant blocks sharding is disabled).")
f.IntVar(&cfg.PerTenantShardsConcurrency, "compactor.per-tenant-shards-concurrency", 1, "Number of concurrent shards compacted for a single tenant.")
f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+
"If not 0, blocks will be marked for deletion and compactor component will delete blocks marked for deletion from the bucket. "+
"If delete-delay is 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures, "+
Expand Down Expand Up @@ -346,6 +350,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
reg,
[]block.MetadataFilter{
// List of filters to apply (order matters).
NewBlocksShardingFilter(uint32(c.compactorCfg.PerTenantNumShards)),
block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg),
ignoreDeletionMarkFilter,
deduplicateBlocksFilter,
Expand Down Expand Up @@ -378,10 +383,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error {
c.tsdbCompactor,
path.Join(c.compactorCfg.DataDir, "compact"),
bucket,
// No compaction concurrency. Due to how Cortex works we don't
// expect to have multiple block groups per tenant, so setting
// a value higher than 1 would be useless.
1,
c.compactorCfg.PerTenantShardsConcurrency,
)
if err != nil {
return errors.Wrap(err, "failed to create bucket compactor")
Expand Down
3 changes: 3 additions & 0 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,9 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) {
{
Name: cortex_tsdb.TenantIDExternalLabel,
Value: userID,
}, {
Name: cortex_tsdb.IngesterIDExternalLabel,
Value: i.lifecycler.ID,
},
}

Expand Down
5 changes: 1 addition & 4 deletions pkg/querier/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,7 @@ func newBlockQuerierSeries(lbls []storepb.Label, chunks []storepb.AggrChunk) *bl

b := labels.NewBuilder(nil)
for _, l := range lbls {
// Ignore external label set by the shipper
if l.Name != tsdb.TenantIDExternalLabel {
b.Set(l.Name, l.Value)
}
b.Set(l.Name, l.Value)
}

return &blockQuerierSeries{labels: b.Labels(), chunks: chunks}
Expand Down
7 changes: 2 additions & 5 deletions pkg/querier/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/store/storepb"

"github.com/cortexproject/cortex/pkg/storage/tsdb"
)

func TestBlockQuerierSeries(t *testing.T) {
Expand All @@ -34,10 +32,9 @@ func TestBlockQuerierSeries(t *testing.T) {
expectedMetric: labels.Labels(nil),
expectedErr: "no chunks",
},
"should remove the external label added by the shipper": {
"should return series on success": {
series: &storepb.Series{
Labels: []storepb.Label{
{Name: tsdb.TenantIDExternalLabel, Value: "test"},
{Name: "foo", Value: "bar"},
},
Chunks: []storepb.AggrChunk{
Expand Down Expand Up @@ -133,7 +130,7 @@ func TestBlockQuerierSeriesSet(t *testing.T) {

// second, with multiple chunks
{
Labels: mkLabels("__name__", "second", tsdb.TenantIDExternalLabel, "to be removed"),
Labels: mkLabels("__name__", "second"),
Chunks: []storepb.AggrChunk{
// unordered chunks
createChunkWithSineSamples(now.Add(400*time.Second), now.Add(600*time.Second), 5*time.Millisecond), // 200 / 0.005 (= 40000 samples, = 120000 in total)
Expand Down
11 changes: 10 additions & 1 deletion pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,17 @@ const (
// BackendFilesystem is the value for the filesystem storge backend
BackendFilesystem = "filesystem"

// TenantIDExternalLabel is the external label set when shipping blocks to the storage
// TenantIDExternalLabel is the external label containing the tenant ID,
// set when shipping blocks to the storage.
TenantIDExternalLabel = "__org_id__"

// IngesterIDExternalLabel is the external label containing the ingester ID,
// set when shipping blocks to the storage.
IngesterIDExternalLabel = "__ingester_id__"

// ShardIDExternalLabel is the external label containing the shard ID,
// set by the compactor.
ShardIDExternalLabel = "__shard_id__"
)

// Validation errors
Expand Down
9 changes: 8 additions & 1 deletion pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,14 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
// TODO(pracucci) can this cause troubles with the upcoming blocks sharding in the store-gateway?
block.NewDeduplicateFilter(),
}...),
nil,
[]block.MetadataModifier{
// Remove Cortex external labels so that they're not injected when querying blocks.
block.NewReplicaLabelRemover(userLogger, []string{
tsdb.TenantIDExternalLabel,
tsdb.IngesterIDExternalLabel,
tsdb.ShardIDExternalLabel,
}),
},
)
if err != nil {
return nil, err
Expand Down
Loading