From b2c40058b51cf7adc668703c3f0846503c8f3213 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 15 May 2020 15:31:27 +0200 Subject: [PATCH 1/9] Added per-tenant in-process sharding support to compactor Signed-off-by: Marco Pracucci --- pkg/compactor/blocks_sharding_filter.go | 65 +++++++++ pkg/compactor/blocks_sharding_filter_test.go | 134 +++++++++++++++++++ pkg/compactor/compactor.go | 3 + pkg/ingester/ingester_v2.go | 3 + pkg/storage/tsdb/config.go | 11 +- 5 files changed, 215 insertions(+), 1 deletion(-) create mode 100644 pkg/compactor/blocks_sharding_filter.go create mode 100644 pkg/compactor/blocks_sharding_filter_test.go diff --git a/pkg/compactor/blocks_sharding_filter.go b/pkg/compactor/blocks_sharding_filter.go new file mode 100644 index 00000000000..52e03cba006 --- /dev/null +++ b/pkg/compactor/blocks_sharding_filter.go @@ -0,0 +1,65 @@ +package compactor + +import ( + "context" + "strconv" + + "github.com/oklog/ulid" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/extprom" + + "github.com/cortexproject/cortex/pkg/ingester/client" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" +) + +type BlocksShardingFilter struct { + shards uint32 +} + +func NewBlocksShardingFilter(shards uint32) *BlocksShardingFilter { + return &BlocksShardingFilter{ + shards: shards, + } +} + +func (f *BlocksShardingFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, _ *extprom.TxGaugeVec) error { + // Do nothing if sharding is disabled. + if f.shards <= 1 { + return nil + } + + for _, m := range metas { + // Skip any block already containing the shard ID to avoid any manipulation + // (ie. in case the sharding algorithm changes over the time). + if _, ok := m.Thanos.Labels[cortex_tsdb.ShardIDExternalLabel]; ok { + continue + } + + // Skip any block without the ingester ID, which means the block has been generated + // before the introduction of the ingester ID. + ingesterID, ok := m.Thanos.Labels[cortex_tsdb.IngesterIDExternalLabel] + if !ok { + continue + } + + // Compute the shard ID and replace the ingester label with the shard label + // so that the compactor will group blocks based on the shard. + shardID := shardByIngesterID(ingesterID, f.shards) + delete(m.Thanos.Labels, cortex_tsdb.IngesterIDExternalLabel) + m.Thanos.Labels[cortex_tsdb.ShardIDExternalLabel] = shardID + } + + return nil +} + +// hashIngesterID returns a 32-bit hash of the ingester ID. +func hashIngesterID(id string) uint32 { + h := client.HashNew32() + h = client.HashAdd32(h, id) + return h +} + +// shardByIngesterID returns the shard given an ingester ID. +func shardByIngesterID(id string, numShards uint32) string { + return strconv.Itoa(int(hashIngesterID(id) % numShards)) +} diff --git a/pkg/compactor/blocks_sharding_filter_test.go b/pkg/compactor/blocks_sharding_filter_test.go new file mode 100644 index 00000000000..3a8166fa8a1 --- /dev/null +++ b/pkg/compactor/blocks_sharding_filter_test.go @@ -0,0 +1,134 @@ +package compactor + +import ( + "context" + "fmt" + "testing" + + "github.com/oklog/ulid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/block/metadata" + + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" +) + +func TestHashIngesterID(t *testing.T) { + tests := []struct { + first string + second string + expectedEqual bool + }{ + { + first: "ingester-0", + second: "ingester-0", + expectedEqual: true, + }, + { + first: "ingester-0", + second: "ingester-1", + expectedEqual: false, + }, + } + + for _, testCase := range tests { + firstHash := hashIngesterID(testCase.first) + secondHash := hashIngesterID(testCase.second) + assert.Equal(t, testCase.expectedEqual, firstHash == secondHash) + } +} + +func TestShardByIngesterID_DistributionForKubernetesStatefulSets(t *testing.T) { + const ( + numIngesters = 50 + numShards = 3 + distributionThreshold = 0.8 + ) + + // Generate the ingester IDs. + ids := make([]string, numIngesters) + for i := 0; i < numIngesters; i++ { + ids[i] = fmt.Sprintf("ingester-%d", i) + } + + // Compute the shard for each ingester. + distribution := map[string][]string{} + for _, id := range ids { + shard := shardByIngesterID(id, numShards) + distribution[shard] = append(distribution[shard], id) + } + + // Ensure the distribution is fair. + minSizePerShard := distributionThreshold * (float64(numIngesters) / float64(numShards)) + + for _, ingesters := range distribution { + assert.GreaterOrEqual(t, len(ingesters), int(minSizePerShard)) + } +} + +func TestBlocksShardingFilter(t *testing.T) { + block1 := ulid.MustNew(1, nil) + block2 := ulid.MustNew(2, nil) + block3 := ulid.MustNew(3, nil) + + tests := map[string]struct { + input map[ulid.ULID]map[string]string + expected map[ulid.ULID]map[string]string + }{ + "blocks from the same ingester should go into the same shard": { + input: map[ulid.ULID]map[string]string{ + block1: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0"}, + block2: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0"}, + block3: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0"}, + }, + expected: map[ulid.ULID]map[string]string{ + block1: {cortex_tsdb.ShardIDExternalLabel: "2"}, + block2: {cortex_tsdb.ShardIDExternalLabel: "2"}, + block3: {cortex_tsdb.ShardIDExternalLabel: "2"}, + }, + }, + "blocks from the different ingesters should be sharded": { + input: map[ulid.ULID]map[string]string{ + block1: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0"}, + block2: {cortex_tsdb.IngesterIDExternalLabel: "ingester-1"}, + block3: {cortex_tsdb.IngesterIDExternalLabel: "ingester-2"}, + }, + expected: map[ulid.ULID]map[string]string{ + block1: {cortex_tsdb.ShardIDExternalLabel: "2"}, + block2: {cortex_tsdb.ShardIDExternalLabel: "1"}, + block3: {cortex_tsdb.ShardIDExternalLabel: "0"}, + }, + }, + "blocks without ingester ID should not be mangled": { + input: map[ulid.ULID]map[string]string{ + block1: {cortex_tsdb.ShardIDExternalLabel: "2"}, + block2: {cortex_tsdb.ShardIDExternalLabel: "1"}, + block3: {cortex_tsdb.ShardIDExternalLabel: "0"}, + }, + expected: map[ulid.ULID]map[string]string{ + block1: {cortex_tsdb.ShardIDExternalLabel: "2"}, + block2: {cortex_tsdb.ShardIDExternalLabel: "1"}, + block3: {cortex_tsdb.ShardIDExternalLabel: "0"}, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + metas := map[ulid.ULID]*metadata.Meta{} + for id, lbls := range testData.input { + metas[id] = &metadata.Meta{Thanos: metadata.Thanos{Labels: lbls}} + } + + f := NewBlocksShardingFilter(3) + err := f.Filter(context.Background(), metas, nil) + require.NoError(t, err) + assert.Len(t, metas, len(testData.expected)) + + for expectedID, expectedLbls := range testData.expected { + assert.NotNil(t, metas[expectedID]) + assert.Equal(t, metas[expectedID].Thanos.Labels, expectedLbls) + } + }) + } +} diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index cac6034cfbc..a789e54b56a 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -40,6 +40,7 @@ type Config struct { // Compactors sharding. ShardingEnabled bool `yaml:"sharding_enabled"` ShardingRing RingConfig `yaml:"sharding_ring"` + ShardsPerTenant uint `yaml:"shards_per_tenant"` // No need to add options to customize the retry backoff, // given the defaults should be fine, but allow to override @@ -64,6 +65,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.CompactionInterval, "compactor.compaction-interval", time.Hour, "The frequency at which the compaction runs") f.IntVar(&cfg.CompactionRetries, "compactor.compaction-retries", 3, "How many times to retry a failed compaction during a single compaction interval") f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.") + f.UintVar(&cfg.ShardsPerTenant, "compactor.shards-per-tenant", 1, "Number of shards a single tenant blocks should be grouped into (0 or 1 means per-tenant blocks sharding is disabled).") f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+ "If not 0, blocks will be marked for deletion and compactor component will delete blocks marked for deletion from the bucket. "+ "If delete-delay is 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures, "+ @@ -346,6 +348,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { reg, []block.MetadataFilter{ // List of filters to apply (order matters). + NewBlocksShardingFilter(uint32(c.compactorCfg.ShardsPerTenant)), block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg), ignoreDeletionMarkFilter, deduplicateBlocksFilter, diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index b37041a6838..e050e254839 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -805,6 +805,9 @@ func (i *Ingester) createTSDB(userID string) (*userTSDB, error) { { Name: cortex_tsdb.TenantIDExternalLabel, Value: userID, + }, { + Name: cortex_tsdb.IngesterIDExternalLabel, + Value: i.lifecycler.ID, }, } diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index 455d76acb64..8ae93a78590 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -31,8 +31,17 @@ const ( // BackendFilesystem is the value for the filesystem storge backend BackendFilesystem = "filesystem" - // TenantIDExternalLabel is the external label set when shipping blocks to the storage + // TenantIDExternalLabel is the external label containing the tenant ID, + // set when shipping blocks to the storage. TenantIDExternalLabel = "__org_id__" + + // IngesterIDExternalLabel is the external label containing the ingester ID, + // set when shipping blocks to the storage. + IngesterIDExternalLabel = "__ingester_id__" + + // ShardIDExternalLabel is the external label containing the shard ID, + // set by the compactor. + ShardIDExternalLabel = "__shard_id__" ) // Validation errors From 9aa37d9fffff9530a02f5570279eb5675bd5d4c2 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 15 May 2020 15:39:49 +0200 Subject: [PATCH 2/9] Added concurrency config option Signed-off-by: Marco Pracucci --- docs/configuration/config-file-reference.md | 9 +++++++++ docs/operations/blocks-storage.md | 9 +++++++++ pkg/compactor/compactor.go | 17 ++++++++--------- 3 files changed, 26 insertions(+), 9 deletions(-) diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 24c976ea5b5..d7c251353d8 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2993,6 +2993,15 @@ sharding_ring: # the ring. # CLI flag: -compactor.ring.heartbeat-timeout [heartbeat_timeout: | default = 1m] + +# Number of shards a single tenant blocks should be grouped into (0 or 1 means +# per-tenant blocks sharding is disabled). +# CLI flag: -compactor.per-tenant-num-shards +[per_tenant_num_shards: | default = 1] + +# Number of concurrent shards compacted for a single tenant. +# CLI flag: -compactor.per-tenant-shards-concurrency +[per_tenant_shards_concurrency: | default = 1] ``` ### `store_gateway_config` diff --git a/docs/operations/blocks-storage.md b/docs/operations/blocks-storage.md index 6071b09166d..e8239b54ec2 100644 --- a/docs/operations/blocks-storage.md +++ b/docs/operations/blocks-storage.md @@ -613,6 +613,15 @@ compactor: # within the ring. # CLI flag: -compactor.ring.heartbeat-timeout [heartbeat_timeout: | default = 1m] + + # Number of shards a single tenant blocks should be grouped into (0 or 1 means + # per-tenant blocks sharding is disabled). + # CLI flag: -compactor.per-tenant-num-shards + [per_tenant_num_shards: | default = 1] + + # Number of concurrent shards compacted for a single tenant. + # CLI flag: -compactor.per-tenant-shards-concurrency + [per_tenant_shards_concurrency: | default = 1] ``` ## Known issues diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index a789e54b56a..f57dd929968 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -38,9 +38,10 @@ type Config struct { DeletionDelay time.Duration `yaml:"deletion_delay"` // Compactors sharding. - ShardingEnabled bool `yaml:"sharding_enabled"` - ShardingRing RingConfig `yaml:"sharding_ring"` - ShardsPerTenant uint `yaml:"shards_per_tenant"` + ShardingEnabled bool `yaml:"sharding_enabled"` + ShardingRing RingConfig `yaml:"sharding_ring"` + PerTenantNumShards uint `yaml:"per_tenant_num_shards"` + PerTenantShardsConcurrency int `yaml:"per_tenant_shards_concurrency"` // No need to add options to customize the retry backoff, // given the defaults should be fine, but allow to override @@ -65,7 +66,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.CompactionInterval, "compactor.compaction-interval", time.Hour, "The frequency at which the compaction runs") f.IntVar(&cfg.CompactionRetries, "compactor.compaction-retries", 3, "How many times to retry a failed compaction during a single compaction interval") f.BoolVar(&cfg.ShardingEnabled, "compactor.sharding-enabled", false, "Shard tenants across multiple compactor instances. Sharding is required if you run multiple compactor instances, in order to coordinate compactions and avoid race conditions leading to the same tenant blocks simultaneously compacted by different instances.") - f.UintVar(&cfg.ShardsPerTenant, "compactor.shards-per-tenant", 1, "Number of shards a single tenant blocks should be grouped into (0 or 1 means per-tenant blocks sharding is disabled).") + f.UintVar(&cfg.PerTenantNumShards, "compactor.per-tenant-num-shards", 1, "Number of shards a single tenant blocks should be grouped into (0 or 1 means per-tenant blocks sharding is disabled).") + f.IntVar(&cfg.PerTenantShardsConcurrency, "compactor.per-tenant-shards-concurrency", 1, "Number of concurrent shards compacted for a single tenant.") f.DurationVar(&cfg.DeletionDelay, "compactor.deletion-delay", 12*time.Hour, "Time before a block marked for deletion is deleted from bucket. "+ "If not 0, blocks will be marked for deletion and compactor component will delete blocks marked for deletion from the bucket. "+ "If delete-delay is 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures, "+ @@ -348,7 +350,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { reg, []block.MetadataFilter{ // List of filters to apply (order matters). - NewBlocksShardingFilter(uint32(c.compactorCfg.ShardsPerTenant)), + NewBlocksShardingFilter(uint32(c.compactorCfg.PerTenantNumShards)), block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg), ignoreDeletionMarkFilter, deduplicateBlocksFilter, @@ -381,10 +383,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { c.tsdbCompactor, path.Join(c.compactorCfg.DataDir, "compact"), bucket, - // No compaction concurrency. Due to how Cortex works we don't - // expect to have multiple block groups per tenant, so setting - // a value higher than 1 would be useless. - 1, + c.compactorCfg.PerTenantShardsConcurrency, ) if err != nil { return errors.Wrap(err, "failed to create bucket compactor") From 3a5666b1be0bb0b9dc309078677ab17b130e6184 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 15 May 2020 15:45:03 +0200 Subject: [PATCH 3/9] Fixed filter Signed-off-by: Marco Pracucci --- pkg/compactor/blocks_sharding_filter.go | 11 ++-- pkg/compactor/blocks_sharding_filter_test.go | 59 +++++++++++++------- 2 files changed, 44 insertions(+), 26 deletions(-) diff --git a/pkg/compactor/blocks_sharding_filter.go b/pkg/compactor/blocks_sharding_filter.go index 52e03cba006..c9b2cf9e34c 100644 --- a/pkg/compactor/blocks_sharding_filter.go +++ b/pkg/compactor/blocks_sharding_filter.go @@ -23,12 +23,13 @@ func NewBlocksShardingFilter(shards uint32) *BlocksShardingFilter { } func (f *BlocksShardingFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, _ *extprom.TxGaugeVec) error { - // Do nothing if sharding is disabled. - if f.shards <= 1 { - return nil - } - for _, m := range metas { + // Just remove the ingester ID label if sharding is disabled. + if f.shards <= 1 { + delete(m.Thanos.Labels, cortex_tsdb.IngesterIDExternalLabel) + continue + } + // Skip any block already containing the shard ID to avoid any manipulation // (ie. in case the sharding algorithm changes over the time). if _, ok := m.Thanos.Labels[cortex_tsdb.ShardIDExternalLabel]; ok { diff --git a/pkg/compactor/blocks_sharding_filter_test.go b/pkg/compactor/blocks_sharding_filter_test.go index 3a8166fa8a1..5f43324c1e1 100644 --- a/pkg/compactor/blocks_sharding_filter_test.go +++ b/pkg/compactor/blocks_sharding_filter_test.go @@ -72,43 +72,60 @@ func TestBlocksShardingFilter(t *testing.T) { block3 := ulid.MustNew(3, nil) tests := map[string]struct { - input map[ulid.ULID]map[string]string - expected map[ulid.ULID]map[string]string + numShards uint32 + input map[ulid.ULID]map[string]string + expected map[ulid.ULID]map[string]string }{ "blocks from the same ingester should go into the same shard": { + numShards: 3, input: map[ulid.ULID]map[string]string{ - block1: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0"}, - block2: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0"}, - block3: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0"}, + block1: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0", cortex_tsdb.TenantIDExternalLabel: "user-1"}, + block2: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0", cortex_tsdb.TenantIDExternalLabel: "user-1"}, + block3: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0", cortex_tsdb.TenantIDExternalLabel: "user-1"}, }, expected: map[ulid.ULID]map[string]string{ - block1: {cortex_tsdb.ShardIDExternalLabel: "2"}, - block2: {cortex_tsdb.ShardIDExternalLabel: "2"}, - block3: {cortex_tsdb.ShardIDExternalLabel: "2"}, + block1: {cortex_tsdb.ShardIDExternalLabel: "2", cortex_tsdb.TenantIDExternalLabel: "user-1"}, + block2: {cortex_tsdb.ShardIDExternalLabel: "2", cortex_tsdb.TenantIDExternalLabel: "user-1"}, + block3: {cortex_tsdb.ShardIDExternalLabel: "2", cortex_tsdb.TenantIDExternalLabel: "user-1"}, }, }, "blocks from the different ingesters should be sharded": { + numShards: 3, input: map[ulid.ULID]map[string]string{ - block1: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0"}, - block2: {cortex_tsdb.IngesterIDExternalLabel: "ingester-1"}, - block3: {cortex_tsdb.IngesterIDExternalLabel: "ingester-2"}, + block1: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0", cortex_tsdb.TenantIDExternalLabel: "user-1"}, + block2: {cortex_tsdb.IngesterIDExternalLabel: "ingester-1", cortex_tsdb.TenantIDExternalLabel: "user-1"}, + block3: {cortex_tsdb.IngesterIDExternalLabel: "ingester-2", cortex_tsdb.TenantIDExternalLabel: "user-1"}, }, expected: map[ulid.ULID]map[string]string{ - block1: {cortex_tsdb.ShardIDExternalLabel: "2"}, - block2: {cortex_tsdb.ShardIDExternalLabel: "1"}, - block3: {cortex_tsdb.ShardIDExternalLabel: "0"}, + block1: {cortex_tsdb.ShardIDExternalLabel: "2", cortex_tsdb.TenantIDExternalLabel: "user-1"}, + block2: {cortex_tsdb.ShardIDExternalLabel: "1", cortex_tsdb.TenantIDExternalLabel: "user-1"}, + block3: {cortex_tsdb.ShardIDExternalLabel: "0", cortex_tsdb.TenantIDExternalLabel: "user-1"}, }, }, "blocks without ingester ID should not be mangled": { + numShards: 3, input: map[ulid.ULID]map[string]string{ - block1: {cortex_tsdb.ShardIDExternalLabel: "2"}, - block2: {cortex_tsdb.ShardIDExternalLabel: "1"}, - block3: {cortex_tsdb.ShardIDExternalLabel: "0"}, + block1: {cortex_tsdb.ShardIDExternalLabel: "2", cortex_tsdb.TenantIDExternalLabel: "user-1"}, + block2: {cortex_tsdb.ShardIDExternalLabel: "1", cortex_tsdb.TenantIDExternalLabel: "user-1"}, + block3: {cortex_tsdb.ShardIDExternalLabel: "0", cortex_tsdb.TenantIDExternalLabel: "user-1"}, }, expected: map[ulid.ULID]map[string]string{ - block1: {cortex_tsdb.ShardIDExternalLabel: "2"}, - block2: {cortex_tsdb.ShardIDExternalLabel: "1"}, - block3: {cortex_tsdb.ShardIDExternalLabel: "0"}, + block1: {cortex_tsdb.ShardIDExternalLabel: "2", cortex_tsdb.TenantIDExternalLabel: "user-1"}, + block2: {cortex_tsdb.ShardIDExternalLabel: "1", cortex_tsdb.TenantIDExternalLabel: "user-1"}, + block3: {cortex_tsdb.ShardIDExternalLabel: "0", cortex_tsdb.TenantIDExternalLabel: "user-1"}, + }, + }, + "should remove the ingester ID external label if sharding is disabled": { + numShards: 1, + input: map[ulid.ULID]map[string]string{ + block1: {cortex_tsdb.IngesterIDExternalLabel: "ingester-0", cortex_tsdb.TenantIDExternalLabel: "user-1"}, + block2: {cortex_tsdb.IngesterIDExternalLabel: "ingester-1", cortex_tsdb.TenantIDExternalLabel: "user-1"}, + block3: {cortex_tsdb.IngesterIDExternalLabel: "ingester-2", cortex_tsdb.TenantIDExternalLabel: "user-1"}, + }, + expected: map[ulid.ULID]map[string]string{ + block1: {cortex_tsdb.TenantIDExternalLabel: "user-1"}, + block2: {cortex_tsdb.TenantIDExternalLabel: "user-1"}, + block3: {cortex_tsdb.TenantIDExternalLabel: "user-1"}, }, }, } @@ -120,7 +137,7 @@ func TestBlocksShardingFilter(t *testing.T) { metas[id] = &metadata.Meta{Thanos: metadata.Thanos{Labels: lbls}} } - f := NewBlocksShardingFilter(3) + f := NewBlocksShardingFilter(testData.numShards) err := f.Filter(context.Background(), metas, nil) require.NoError(t, err) assert.Len(t, metas, len(testData.expected)) From 046786800971f323981c168b513e65ad0ebf00e1 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 15 May 2020 15:50:48 +0200 Subject: [PATCH 4/9] Updated CHANGELOG Signed-off-by: Marco Pracucci --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e93b33b126..d02e796531b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -82,6 +82,7 @@ * `cortex_storegateway_blocks_last_successful_sync_timestamp_seconds` * [ENHANCEMENT] Experimental TSDB: added the flag `-experimental.tsdb.wal-compression-enabled` to allow to enable TSDB WAL compression. #2585 * [ENHANCEMENT] Experimental TSDB: Querier and store-gateway components can now use so-called "caching bucket", which can currently cache fetched chunks into shared memcached server. #2572 +* [ENHANCEMENT] Experimental TSDB: added per-tenant blocks sharding support in the compactor, in order to parallelize blocks compaction for a single tenant in a single node. Sharding can be enabled via `-compactor.per-tenant-num-shards` while the parallelization can be controlled with `-compactor.per-tenant-shards-concurrency`. #2599 * [BUGFIX] Ruler: Ensure temporary rule files with special characters are properly mapped and cleaned up. #2506 * [BUGFIX] Fixes #2411, Ensure requests are properly routed to the prometheus api embedded in the query if `-server.path-prefix` is set. #2372 * [BUGFIX] Experimental TSDB: fixed chunk data corruption when querying back series using the experimental blocks storage. #2400 From 20ad436ed278c6f0539dad7de2b86cfba16369c9 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 15 May 2020 15:55:20 +0200 Subject: [PATCH 5/9] Improved distribution test Signed-off-by: Marco Pracucci --- pkg/compactor/blocks_sharding_filter_test.go | 33 ++++++++++---------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/pkg/compactor/blocks_sharding_filter_test.go b/pkg/compactor/blocks_sharding_filter_test.go index 5f43324c1e1..0dd6adcdd9b 100644 --- a/pkg/compactor/blocks_sharding_filter_test.go +++ b/pkg/compactor/blocks_sharding_filter_test.go @@ -40,29 +40,30 @@ func TestHashIngesterID(t *testing.T) { func TestShardByIngesterID_DistributionForKubernetesStatefulSets(t *testing.T) { const ( - numIngesters = 50 numShards = 3 distributionThreshold = 0.8 ) - // Generate the ingester IDs. - ids := make([]string, numIngesters) - for i := 0; i < numIngesters; i++ { - ids[i] = fmt.Sprintf("ingester-%d", i) - } + for _, numIngesters := range []int{10, 30, 50, 100} { + // Generate the ingester IDs. + ids := make([]string, numIngesters) + for i := 0; i < numIngesters; i++ { + ids[i] = fmt.Sprintf("ingester-%d", i) + } - // Compute the shard for each ingester. - distribution := map[string][]string{} - for _, id := range ids { - shard := shardByIngesterID(id, numShards) - distribution[shard] = append(distribution[shard], id) - } + // Compute the shard for each ingester. + distribution := map[string][]string{} + for _, id := range ids { + shard := shardByIngesterID(id, numShards) + distribution[shard] = append(distribution[shard], id) + } - // Ensure the distribution is fair. - minSizePerShard := distributionThreshold * (float64(numIngesters) / float64(numShards)) + // Ensure the distribution is fair. + minSizePerShard := distributionThreshold * (float64(numIngesters) / float64(numShards)) - for _, ingesters := range distribution { - assert.GreaterOrEqual(t, len(ingesters), int(minSizePerShard)) + for _, ingesters := range distribution { + assert.GreaterOrEqual(t, len(ingesters), int(minSizePerShard)) + } } } From e9e9f89ebc96f227b2ce4127d60fe6c5b1440395 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 15 May 2020 16:40:51 +0200 Subject: [PATCH 6/9] Fixed external labels removal at query time Signed-off-by: Marco Pracucci --- pkg/querier/block.go | 4 ++-- pkg/querier/block_test.go | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/querier/block.go b/pkg/querier/block.go index e5b48c949dd..ed76ae6005f 100644 --- a/pkg/querier/block.go +++ b/pkg/querier/block.go @@ -208,8 +208,8 @@ func newBlockQuerierSeries(lbls []storepb.Label, chunks []storepb.AggrChunk) *bl b := labels.NewBuilder(nil) for _, l := range lbls { - // Ignore external label set by the shipper - if l.Name != tsdb.TenantIDExternalLabel { + // Ignore external label set by the ingester or compactor. + if l.Name != tsdb.TenantIDExternalLabel && l.Name != tsdb.IngesterIDExternalLabel && l.Name != tsdb.ShardIDExternalLabel { b.Set(l.Name, l.Value) } } diff --git a/pkg/querier/block_test.go b/pkg/querier/block_test.go index 9005bc0cc28..1ccfa13c4e8 100644 --- a/pkg/querier/block_test.go +++ b/pkg/querier/block_test.go @@ -34,10 +34,12 @@ func TestBlockQuerierSeries(t *testing.T) { expectedMetric: labels.Labels(nil), expectedErr: "no chunks", }, - "should remove the external label added by the shipper": { + "should remove the external label added by the ingester and compactor": { series: &storepb.Series{ Labels: []storepb.Label{ {Name: tsdb.TenantIDExternalLabel, Value: "test"}, + {Name: tsdb.IngesterIDExternalLabel, Value: "test"}, + {Name: tsdb.ShardIDExternalLabel, Value: "test"}, {Name: "foo", Value: "bar"}, }, Chunks: []storepb.AggrChunk{ @@ -133,7 +135,7 @@ func TestBlockQuerierSeriesSet(t *testing.T) { // second, with multiple chunks { - Labels: mkLabels("__name__", "second", tsdb.TenantIDExternalLabel, "to be removed"), + Labels: mkLabels("__name__", "second", tsdb.TenantIDExternalLabel, "to be removed", tsdb.IngesterIDExternalLabel, "to be removed", tsdb.ShardIDExternalLabel, "to be removed"), Chunks: []storepb.AggrChunk{ // unordered chunks createChunkWithSineSamples(now.Add(400*time.Second), now.Add(600*time.Second), 5*time.Millisecond), // 200 / 0.005 (= 40000 samples, = 120000 in total) From b559cc324592ca36b494cda92aa679df42018109 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Fri, 15 May 2020 20:45:19 +0200 Subject: [PATCH 7/9] Fixed removal of external labels when querying back blocks from storage Signed-off-by: Marco Pracucci --- pkg/querier/block.go | 5 +---- pkg/querier/block_test.go | 9 ++------- pkg/storegateway/bucket_stores.go | 9 ++++++++- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/pkg/querier/block.go b/pkg/querier/block.go index ed76ae6005f..76bd49c7ff7 100644 --- a/pkg/querier/block.go +++ b/pkg/querier/block.go @@ -208,10 +208,7 @@ func newBlockQuerierSeries(lbls []storepb.Label, chunks []storepb.AggrChunk) *bl b := labels.NewBuilder(nil) for _, l := range lbls { - // Ignore external label set by the ingester or compactor. - if l.Name != tsdb.TenantIDExternalLabel && l.Name != tsdb.IngesterIDExternalLabel && l.Name != tsdb.ShardIDExternalLabel { - b.Set(l.Name, l.Value) - } + b.Set(l.Name, l.Value) } return &blockQuerierSeries{labels: b.Labels(), chunks: chunks} diff --git a/pkg/querier/block_test.go b/pkg/querier/block_test.go index 1ccfa13c4e8..e808a1fb967 100644 --- a/pkg/querier/block_test.go +++ b/pkg/querier/block_test.go @@ -12,8 +12,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/store/storepb" - - "github.com/cortexproject/cortex/pkg/storage/tsdb" ) func TestBlockQuerierSeries(t *testing.T) { @@ -34,12 +32,9 @@ func TestBlockQuerierSeries(t *testing.T) { expectedMetric: labels.Labels(nil), expectedErr: "no chunks", }, - "should remove the external label added by the ingester and compactor": { + "should return series on success": { series: &storepb.Series{ Labels: []storepb.Label{ - {Name: tsdb.TenantIDExternalLabel, Value: "test"}, - {Name: tsdb.IngesterIDExternalLabel, Value: "test"}, - {Name: tsdb.ShardIDExternalLabel, Value: "test"}, {Name: "foo", Value: "bar"}, }, Chunks: []storepb.AggrChunk{ @@ -135,7 +130,7 @@ func TestBlockQuerierSeriesSet(t *testing.T) { // second, with multiple chunks { - Labels: mkLabels("__name__", "second", tsdb.TenantIDExternalLabel, "to be removed", tsdb.IngesterIDExternalLabel, "to be removed", tsdb.ShardIDExternalLabel, "to be removed"), + Labels: mkLabels("__name__", "second"), Chunks: []storepb.AggrChunk{ // unordered chunks createChunkWithSineSamples(now.Add(400*time.Second), now.Add(600*time.Second), 5*time.Millisecond), // 200 / 0.005 (= 40000 samples, = 120000 in total) diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index d727f38bd14..1180844a01d 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -245,7 +245,14 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro // TODO(pracucci) can this cause troubles with the upcoming blocks sharding in the store-gateway? block.NewDeduplicateFilter(), }...), - nil, + []block.MetadataModifier{ + // Remove Cortex external labels so that they're not injected when querying blocks. + block.NewReplicaLabelRemover(userLogger, []string{ + tsdb.TenantIDExternalLabel, + tsdb.IngesterIDExternalLabel, + tsdb.ShardIDExternalLabel, + }), + }, ) if err != nil { return nil, err From bf51fd494c26ee1c9d9f2f2ce60531eb0aadd277 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Mon, 18 May 2020 09:27:35 +0200 Subject: [PATCH 8/9] Added unit test Signed-off-by: Marco Pracucci --- pkg/storegateway/gateway_test.go | 131 ++++++++++++++++++++++++++++++- 1 file changed, 129 insertions(+), 2 deletions(-) diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index 1269db4f6ef..8a2121d67b7 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -6,8 +6,10 @@ import ( "io/ioutil" "os" "path" + "path/filepath" "sort" "strconv" + "strings" "testing" "time" @@ -16,8 +18,11 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv/consul" @@ -524,6 +529,93 @@ func TestStoreGateway_RingLifecyclerShouldAutoForgetUnhealthyInstances(t *testin }) } +func TestStoreGateway_SeriesQueryingShouldRemoveExternalLabels(t *testing.T) { + ctx := context.Background() + logger := log.NewNopLogger() + userID := "user-1" + + storageDir, err := ioutil.TempDir(os.TempDir(), "") + require.NoError(t, err) + defer os.RemoveAll(storageDir) //nolint:errcheck + + // Generate 2 TSDB blocks with the same exact series (and data points). + numSeries := 2 + now := time.Now() + minT := now.Add(-1*time.Hour).Unix() * 1000 + maxT := now.Unix() * 1000 + step := (maxT - minT) / int64(numSeries) + require.NoError(t, mockTSDB(path.Join(storageDir, userID), numSeries, minT, maxT)) + require.NoError(t, mockTSDB(path.Join(storageDir, userID), numSeries, minT, maxT)) + + bucketClient, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + require.NoError(t, err) + + // Find the created blocks (we expect 2). + var blockIDs []string + bucketClient.Iter(ctx, "user-1/", func(key string) error { + blockIDs = append(blockIDs, strings.TrimSuffix(strings.TrimPrefix(key, userID+"/"), "/")) + return nil + }) + require.Len(t, blockIDs, 2) + + // Inject different external labels for each block. + for idx, blockID := range blockIDs { + meta := metadata.Thanos{ + Labels: map[string]string{ + cortex_tsdb.TenantIDExternalLabel: userID, + cortex_tsdb.IngesterIDExternalLabel: fmt.Sprintf("ingester-%d", idx), + cortex_tsdb.ShardIDExternalLabel: fmt.Sprintf("shard-%d", idx), + }, + Source: metadata.TestSource, + } + + _, err := metadata.InjectThanos(logger, filepath.Join(storageDir, userID, blockID), meta, nil) + require.NoError(t, err) + } + + // Create a store-gateway used to query back the series from the blocks. + gatewayCfg := mockGatewayConfig() + gatewayCfg.ShardingEnabled = false + storageCfg, cleanup := mockStorageConfig(t) + defer cleanup() + + g, err := newStoreGateway(gatewayCfg, storageCfg, bucketClient, nil, mockLoggingLevel(), logger, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(ctx, g)) + defer services.StopAndAwaitTerminated(ctx, g) //nolint:errcheck + + // Query back all series. + req := &storepb.SeriesRequest{ + MinTime: minT, + MaxTime: maxT, + Matchers: []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "__name__", Value: ".*"}, + }, + } + + srv := NewBucketStoreSeriesServer(setUserIDToGRPCContext(ctx, userID)) + err = g.Series(req, srv) + require.NoError(t, err) + assert.Empty(t, srv.Warnings) + assert.Len(t, srv.SeriesSet, numSeries) + + for seriesID := 0; seriesID < numSeries; seriesID++ { + actual := srv.SeriesSet[seriesID] + + // Ensure Cortex external labels have been removed. + assert.Equal(t, []storepb.Label{{Name: "series_id", Value: strconv.Itoa(seriesID)}}, actual.Labels) + + // Ensure samples have been correctly queried (it's OK having duplicated samples + // at this stage, but should be correctly grouped together). + samples, err := readSamplesFromChunks(actual.Chunks) + require.NoError(t, err) + assert.Equal(t, []sample{ + {ts: minT + (step * int64(seriesID)), value: float64(seriesID)}, + {ts: minT + (step * int64(seriesID)), value: float64(seriesID)}, + }, samples) + } +} + func mockGatewayConfig() Config { cfg := Config{} flagext.DefaultValues(&cfg) @@ -551,8 +643,8 @@ func mockStorageConfig(t *testing.T) (cortex_tsdb.Config, func()) { return cfg, cleanup } -// mockTSDB create 1+ TSDB blocks storing numSeries of series with -// timestamp evenly distributed between minT and maxT. +// mockTSDB create 1+ TSDB blocks storing numSeries of series, each series +// with 1 sample and its timestamp evenly distributed between minT and maxT. func mockTSDB(dir string, numSeries int, minT, maxT int64) error { // Create a new TSDB on a temporary directory. The blocks // will be then snapshotted to the input dir. @@ -606,3 +698,38 @@ func generateSortedTokens(numTokens int) ring.Tokens { return ring.Tokens(tokens) } + +func readSamplesFromChunks(rawChunks []storepb.AggrChunk) ([]sample, error) { + var samples []sample + + for _, rawChunk := range rawChunks { + c, err := chunkenc.FromData(chunkenc.EncXOR, rawChunk.Raw.Data) + if err != nil { + return nil, err + } + + it := c.Iterator(nil) + for it.Next() { + if it.Err() != nil { + return nil, it.Err() + } + + ts, v := it.At() + samples = append(samples, sample{ + ts: ts, + value: v, + }) + } + + if it.Err() != nil { + return nil, it.Err() + } + } + + return samples, nil +} + +type sample struct { + ts int64 + value float64 +} From df62410e35cab57f19eeb47804ae11ce1c57e089 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Mon, 18 May 2020 09:43:25 +0200 Subject: [PATCH 9/9] Fixed linter Signed-off-by: Marco Pracucci --- pkg/storegateway/gateway_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index 8a2121d67b7..3569b43ef59 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -552,10 +552,10 @@ func TestStoreGateway_SeriesQueryingShouldRemoveExternalLabels(t *testing.T) { // Find the created blocks (we expect 2). var blockIDs []string - bucketClient.Iter(ctx, "user-1/", func(key string) error { + require.NoError(t, bucketClient.Iter(ctx, "user-1/", func(key string) error { blockIDs = append(blockIDs, strings.TrimSuffix(strings.TrimPrefix(key, userID+"/"), "/")) return nil - }) + })) require.Len(t, blockIDs, 2) // Inject different external labels for each block.