From fed5bdf8b95c8075c227a39b375d41802c4d58e4 Mon Sep 17 00:00:00 2001 From: Albert Date: Thu, 10 Jun 2021 09:53:32 -0700 Subject: [PATCH 1/7] Add planner interface and unit tests Signed-off-by: Albert --- pkg/compactor/planner.go | 58 +++++++++++++++++++++++++++++++++++ pkg/compactor/planner_test.go | 58 +++++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+) create mode 100644 pkg/compactor/planner.go create mode 100644 pkg/compactor/planner_test.go diff --git a/pkg/compactor/planner.go b/pkg/compactor/planner.go new file mode 100644 index 00000000000..7fc07495c08 --- /dev/null +++ b/pkg/compactor/planner.go @@ -0,0 +1,58 @@ +package compactor + +import ( + "context" + "errors" + "fmt" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/compact" +) + +var ( + errPlannerNotImplemented = errors.New("planner type not implemented") +) + +type Planner struct { + blocksPlanner compact.Planner + logger log.Logger +} + +func NewPlanner(logger log.Logger, ranges []int64, plannerType string) (*Planner, error) { + if logger == nil { + logger = log.NewNopLogger() + } + + var blocksPlanner compact.Planner + switch plannerType { + case "parallel": + blocksPlanner = NewParallelPlanner(logger, ranges) + case "default": + blocksPlanner = compact.NewTSDBBasedPlanner(logger, ranges) + default: + return nil, errPlannerNotImplemented + } + + p := &Planner{ + logger: logger, + blocksPlanner: blocksPlanner, + } + + return p, nil +} + +func (p *Planner) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { + return p.blocksPlanner.Plan(ctx, metasByMinTime) +} + +func (p *Planner) PrintPlan(ctx context.Context, metasByMinTime []*metadata.Meta) { + toCompact, err := p.Plan(ctx, metasByMinTime) + + if err != nil { + return + } + + level.Info(p.logger).Log("msg", "Compaction plan generated: ", "plan", fmt.Sprintf("%v", toCompact)) +} diff --git a/pkg/compactor/planner_test.go b/pkg/compactor/planner_test.go new file mode 100644 index 00000000000..6c4d27f53ba --- /dev/null +++ b/pkg/compactor/planner_test.go @@ -0,0 +1,58 @@ +package compactor + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +type PlannerMock struct { + mock.Mock +} + +func (p *PlannerMock) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { + args := p.Called(ctx, metasByMinTime) + return args.Get(0).([]*metadata.Meta), args.Error(1) +} + +func TestPlanner_ShouldReturnParallelPlanner(t *testing.T) { + var ranges []int64 + p, _ := NewPlanner(nil, ranges, "parallel") + + parallelPlanner := &ParallelPlanner{} + + assert.IsType(t, parallelPlanner, p.blocksPlanner) +} + +func TestPlanner_ShouldReturnDefaultPlanner(t *testing.T) { + var ranges []int64 + p, _ := NewPlanner(nil, ranges, "default") + + parallelPlanner := &ParallelPlanner{} + + assert.IsType(t, parallelPlanner, p.blocksPlanner) +} + +func TestPlanner_ShouldErrorOnNonExistentPlanner(t *testing.T) { + var ranges []int64 + _, err := NewPlanner(nil, ranges, "non-existent") + + assert.ErrorIs(t, err, errPlannerNotImplemented) +} + +func TestPlanner_PlanShouldCallBlocksPlannerPlan(t *testing.T) { + blockPlannerMock := &PlannerMock{} + blockPlannerMock.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) + + p := &Planner{ + blocksPlanner: blockPlannerMock, + } + + blockMetas := []*metadata.Meta{} + p.Plan(context.Background(), blockMetas) + + blockPlannerMock.AssertCalled(t, "Plan", context.Background(), blockMetas) +} From 6d03ff5d8e0af08b1324d37be7068b97e0b2d181 Mon Sep 17 00:00:00 2001 From: Albert Date: Thu, 10 Jun 2021 10:08:00 -0700 Subject: [PATCH 2/7] Change compactor to use planner interface Signed-off-by: Albert --- pkg/compactor/compactor.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 51952688cb4..321f9e48ff2 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -62,7 +62,8 @@ var ( return nil, nil, err } - planner := compact.NewTSDBBasedPlanner(logger, cfg.BlockRanges.ToMilliseconds()) + // planner := compact.NewTSDBBasedPlanner(logger, cfg.BlockRanges.ToMilliseconds()) + planner, _ := NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), "default") return compactor, planner, nil } ) From a8604cd51888b36d087c31f0e654954059801633 Mon Sep 17 00:00:00 2001 From: Albert Date: Thu, 10 Jun 2021 10:11:31 -0700 Subject: [PATCH 3/7] Remove commented code Signed-off-by: Albert --- pkg/compactor/compactor.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 321f9e48ff2..cd58888da92 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -62,7 +62,6 @@ var ( return nil, nil, err } - // planner := compact.NewTSDBBasedPlanner(logger, cfg.BlockRanges.ToMilliseconds()) planner, _ := NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), "default") return compactor, planner, nil } From bab9b8cc663c05eba0259107183fec748e58b426 Mon Sep 17 00:00:00 2001 From: Albert Date: Fri, 18 Jun 2021 09:04:08 -0700 Subject: [PATCH 4/7] Add planner filter --- pkg/compactor/compactor.go | 52 +++- pkg/compactor/planner.go | 58 ----- pkg/compactor/planner_filter.go | 333 ++++++++++++++++++++++++++ pkg/compactor/planner_filter_test.go | 342 +++++++++++++++++++++++++++ pkg/compactor/planner_test.go | 58 ----- 5 files changed, 718 insertions(+), 125 deletions(-) delete mode 100644 pkg/compactor/planner.go create mode 100644 pkg/compactor/planner_filter.go create mode 100644 pkg/compactor/planner_filter_test.go delete mode 100644 pkg/compactor/planner_test.go diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index cd58888da92..1b41ff64552 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -62,7 +62,7 @@ var ( return nil, nil, err } - planner, _ := NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), "default") + planner := compact.NewTSDBBasedPlanner(logger, cfg.BlockRanges.ToMilliseconds()) return compactor, planner, nil } ) @@ -120,6 +120,9 @@ type Config struct { // Allow downstream projects to customise the blocks compactor. BlocksGrouperFactory BlocksGrouperFactory `yaml:"-"` BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"` + + // Flag to enable planner filter + PlannerFilterEnabled bool `yaml:"planner_filter_enabled"` } // RegisterFlags registers the Compactor flags. @@ -146,6 +149,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { "If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.") f.DurationVar(&cfg.TenantCleanupDelay, "compactor.tenant-cleanup-delay", 6*time.Hour, "For tenants marked for deletion, this is time between deleting of last block, and doing final cleanup (marker files, debug files) of the tenant.") f.BoolVar(&cfg.BlockDeletionMarksMigrationEnabled, "compactor.block-deletion-marks-migration-enabled", true, "When enabled, at compactor startup the bucket will be scanned and all found deletion marks inside the block location will be copied to the markers global location too. This option can (and should) be safely disabled as soon as the compactor has successfully run at least once.") + f.BoolVar(&cfg.PlannerFilterEnabled, "compactor.planner-filter-enabled", false, "Filter and plan blocks within PlannerFilter instead of through Thanos planner and grouper.") f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.") f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.") @@ -606,6 +610,43 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { time.Duration(c.compactorCfg.DeletionDelay.Seconds()/2)*time.Second, c.compactorCfg.MetaSyncConcurrency) + fetcherFilters := []block.MetadataFilter{ + // Remove the ingester ID because we don't shard blocks anymore, while still + // honoring the shard ID if sharding was done in the past. + NewLabelRemoverFilter([]string{cortex_tsdb.IngesterIDExternalLabel}), + block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg), + ignoreDeletionMarkFilter, + deduplicateBlocksFilter, + } + + // If config is set to use planner filter then generate plans and append it to the fetcherFilters + if c.compactorCfg.PlannerFilterEnabled { + level.Info(c.logger).Log("msg", "Compactor using planner filter") + + // Create a new planner filter + f, err := NewPlannerFilter( + ctx, + userID, + ulogger, + bucket, + fetcherFilters, + c.compactorCfg, + c.metaSyncDirForUser(userID), + ) + if err != nil { + return err + } + + // Generate all parallelizable plans + err = f.fetchBlocksAndGeneratePlans(ctx) + if err != nil { + return err + } + + // Add the planner filter to the fetcher's filters + fetcherFilters = append(fetcherFilters, f) + } + fetcher, err := block.NewMetaFetcher( ulogger, c.compactorCfg.MetaSyncConcurrency, @@ -613,14 +654,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { c.metaSyncDirForUser(userID), reg, // List of filters to apply (order matters). - []block.MetadataFilter{ - // Remove the ingester ID because we don't shard blocks anymore, while still - // honoring the shard ID if sharding was done in the past. - NewLabelRemoverFilter([]string{cortex_tsdb.IngesterIDExternalLabel}), - block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg), - ignoreDeletionMarkFilter, - deduplicateBlocksFilter, - }, + fetcherFilters, nil, ) if err != nil { diff --git a/pkg/compactor/planner.go b/pkg/compactor/planner.go deleted file mode 100644 index 7fc07495c08..00000000000 --- a/pkg/compactor/planner.go +++ /dev/null @@ -1,58 +0,0 @@ -package compactor - -import ( - "context" - "errors" - "fmt" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/thanos-io/thanos/pkg/block/metadata" - "github.com/thanos-io/thanos/pkg/compact" -) - -var ( - errPlannerNotImplemented = errors.New("planner type not implemented") -) - -type Planner struct { - blocksPlanner compact.Planner - logger log.Logger -} - -func NewPlanner(logger log.Logger, ranges []int64, plannerType string) (*Planner, error) { - if logger == nil { - logger = log.NewNopLogger() - } - - var blocksPlanner compact.Planner - switch plannerType { - case "parallel": - blocksPlanner = NewParallelPlanner(logger, ranges) - case "default": - blocksPlanner = compact.NewTSDBBasedPlanner(logger, ranges) - default: - return nil, errPlannerNotImplemented - } - - p := &Planner{ - logger: logger, - blocksPlanner: blocksPlanner, - } - - return p, nil -} - -func (p *Planner) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { - return p.blocksPlanner.Plan(ctx, metasByMinTime) -} - -func (p *Planner) PrintPlan(ctx context.Context, metasByMinTime []*metadata.Meta) { - toCompact, err := p.Plan(ctx, metasByMinTime) - - if err != nil { - return - } - - level.Info(p.logger).Log("msg", "Compaction plan generated: ", "plan", fmt.Sprintf("%v", toCompact)) -} diff --git a/pkg/compactor/planner_filter.go b/pkg/compactor/planner_filter.go new file mode 100644 index 00000000000..1569c6a6b1d --- /dev/null +++ b/pkg/compactor/planner_filter.go @@ -0,0 +1,333 @@ +package compactor + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/compact" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/objstore" +) + +type PlannerFilter struct { + userID string + ulogger log.Logger + bucket objstore.InstrumentedBucket + fetcherFilters []block.MetadataFilter + compactorCfg Config + metaSyncDir string + + plans []*blocksGroup +} + +func NewPlannerFilter(ctx context.Context, userID string, ulogger log.Logger, bucket objstore.InstrumentedBucket, fetcherFilters []block.MetadataFilter, compactorCfg Config, metaSyncDir string) (*PlannerFilter, error) { + f := &PlannerFilter{ + userID: userID, + ulogger: ulogger, + bucket: bucket, + fetcherFilters: fetcherFilters, + compactorCfg: compactorCfg, + metaSyncDir: metaSyncDir, + } + + return f, nil +} + +// Gets the blocks of the user. +func (f *PlannerFilter) getUserBlocks(ctx context.Context) (map[ulid.ULID]*metadata.Meta, error) { + fetcher, err := block.NewMetaFetcher( + f.ulogger, + f.compactorCfg.MetaSyncConcurrency, + f.bucket, + f.metaSyncDir, + prometheus.NewRegistry(), + // List of filters to apply (order matters). + f.fetcherFilters, + nil, + ) + if err != nil { + return nil, err + } + + metas, _, err := fetcher.Fetch(ctx) + if err != nil { + return nil, err + } + + return metas, nil +} + +// Fetches blocks and generates plans that can be parallized and saves them +func (f *PlannerFilter) fetchBlocksAndGeneratePlans(ctx context.Context) error { + // Get blocks + blocks, err := f.getUserBlocks(ctx) + if err != nil { + return err + } + + return f.generatePlans(ctx, blocks) +} + +// Generates plans that can be parallized and saves them +func (f *PlannerFilter) generatePlans(ctx context.Context, blocks map[ulid.ULID]*metadata.Meta) error { + // First of all we have to group blocks using the Thanos default + // grouping (based on downsample resolution + external labels). + mainGroups := map[string][]*metadata.Meta{} + for _, b := range blocks { + key := compact.DefaultGroupKey(b.Thanos) + mainGroups[key] = append(mainGroups[key], b) + } + + var plans []*blocksGroup + + for _, mainBlocks := range mainGroups { + for i, plan := range groupBlocksByCompactableRanges(mainBlocks, f.compactorCfg.BlockRanges.ToMilliseconds()) { + // Nothing to do if we don't have at least 2 blocks. + if len(plan.blocks) < 2 { + continue + } + + plan.key = i + + level.Info(f.ulogger).Log("msg", "Found plan for user", "user", f.userID, "plan", plan.String()) + + plans = append(plans, &plan) + } + } + + // Ensure groups are sorted by smallest range, oldest min time first. The rationale + // is that we wanna favor smaller ranges first (ie. to deduplicate samples sooner + // than later) and older ones are more likely to be "complete" (no missing block still + // to be uploaded). + sort.SliceStable(plans, func(i, j int) bool { + iLength := plans[i].maxTime() - plans[i].minTime() + jLength := plans[j].maxTime() - plans[j].minTime() + + if iLength != jLength { + return iLength < jLength + } + if plans[i].minTime() != plans[j].minTime() { + return plans[i].minTime() < plans[j].minTime() + } + + // Guarantee stable sort for tests. + return plans[i].key < plans[j].key + }) + + f.plans = plans + + return nil +} + +// Filter removes the blocks for every single plan except one. +// Currently we are just using the first plan every single time. +// TODO: Filter plans by putting each plan on the ring and having the compactor select a plan (filter the rest) +func (f *PlannerFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, _ *extprom.TxGaugeVec) error { + // Plans have to exist to be filtered, if no blocks need to be compacted then nothing needs to be filtered. + if len(f.plans) < 1 { + return nil + } + + // Delete blocks for each plan except the first one. + for _, p := range f.plans[1:] { + for _, b := range p.blocks { + delete(metas, b.BlockMeta.ULID) // check what happens if it tries to delete a key that doesn't exist + } + } + + return nil +} + +// blocksGroup struct and functions copied and adjusted from https://github.com/cortexproject/cortex/pull/2616 +type blocksGroup struct { + rangeStart int64 // Included. + rangeEnd int64 // Excluded. + blocks []*metadata.Meta + key int +} + +// overlaps returns whether the group range overlaps with the input group. +func (g blocksGroup) overlaps(other blocksGroup) bool { + if g.rangeStart >= other.rangeEnd || other.rangeStart >= g.rangeEnd { + return false + } + + return true +} + +func (g blocksGroup) String() string { + out := strings.Builder{} + out.WriteString(fmt.Sprintf("Group range start: %d, range end: %d, blocks: ", g.rangeStart, g.rangeEnd)) + + for i, b := range g.blocks { + if i > 0 { + out.WriteString(", ") + } + + minT := time.Unix(0, b.MinTime*int64(time.Millisecond)).UTC() + maxT := time.Unix(0, b.MaxTime*int64(time.Millisecond)).UTC() + out.WriteString(fmt.Sprintf("%s (min time: %s, max time: %s)", b.ULID.String(), minT.String(), maxT.String())) + } + + return out.String() +} + +func (g blocksGroup) rangeLength() int64 { + return g.rangeEnd - g.rangeStart +} + +// minTime returns the MinTime across all blocks in the group. +func (g blocksGroup) minTime() int64 { + // Blocks are expected to be sorted by MinTime. + return g.blocks[0].MinTime +} + +// maxTime returns the MaxTime across all blocks in the group. +func (g blocksGroup) maxTime() int64 { + max := g.blocks[0].MaxTime + + for _, b := range g.blocks[1:] { + if b.MaxTime > max { + max = b.MaxTime + } + } + + return max +} + +// groupBlocksByCompactableRanges groups input blocks by compactable ranges, giving preference +// to smaller ranges. If a smaller range contains more than 1 block (and thus it should +// be compacted), the larger range block group is not generated until each of its +// smaller ranges have 1 block each at most. +func groupBlocksByCompactableRanges(blocks []*metadata.Meta, ranges []int64) []blocksGroup { + if len(blocks) == 0 { + return nil + } + + // Sort blocks by min time. + sortMetasByMinTime(blocks) + + var groups []blocksGroup + + for _, tr := range ranges { + nextGroup: + for _, group := range groupBlocksByRange(blocks, tr) { + // Exclude groups with a single block, because no compaction is required. + if len(group.blocks) < 2 { + continue + } + + // Ensure this group's range does not overlap with any group already scheduled + // for compaction by a smaller range, because we need to guarantee that smaller ranges + // are compacted first. + for _, c := range groups { + if group.overlaps(c) { + continue nextGroup + } + } + + groups = append(groups, group) + } + } + + // Ensure we don't compact the most recent blocks prematurely when another one of + // the same size still fits in the range. To do it, we consider valid a group only + // if it's before the most recent block or if it fully covers the range. + highestMinTime := blocks[len(blocks)-1].MinTime + + for idx := 0; idx < len(groups); { + group := groups[idx] + + // If the group covers a range before the most recent block, it's fine. + if group.rangeEnd <= highestMinTime { + idx++ + continue + } + + // If the group covers the full range, it's fine. + if group.maxTime()-group.minTime() == group.rangeLength() { + idx++ + continue + } + + // We hit into a group which would compact recent blocks prematurely, + // so we need to filter it out. + groups = append(groups[:idx], groups[idx+1:]...) + } + + return groups +} + +// groupBlocksByRange splits the blocks by the time range. The range sequence starts at 0. +// Input blocks are expected to be sorted by MinTime. +// +// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30 +// it returns [0-10, 10-20], [50-60], [90-100]. +func groupBlocksByRange(blocks []*metadata.Meta, tr int64) []blocksGroup { + var ret []blocksGroup + + for i := 0; i < len(blocks); { + var ( + group blocksGroup + m = blocks[i] + ) + + group.rangeStart = getRangeStart(m, tr) + group.rangeEnd = group.rangeStart + tr + + // Skip blocks that don't fall into the range. This can happen via mis-alignment or + // by being the multiple of the intended range. + if m.MaxTime > group.rangeEnd { + i++ + continue + } + + // Add all blocks to the current group that are within [t0, t0+tr]. + for ; i < len(blocks); i++ { + // If the block does not start within this group, then we should break the iteration + // and move it to the next group. + if blocks[i].MinTime >= group.rangeEnd { + break + } + + // If the block doesn't fall into this group, but it started within this group then it + // means it spans across multiple ranges and we should skip it. + if blocks[i].MaxTime > group.rangeEnd { + continue + } + + group.blocks = append(group.blocks, blocks[i]) + } + + if len(group.blocks) > 0 { + ret = append(ret, group) + } + } + + return ret +} + +func getRangeStart(m *metadata.Meta, tr int64) int64 { + // Compute start of aligned time range of size tr closest to the current block's start. + // This code has been copied from TSDB. + if m.MinTime >= 0 { + return tr * (m.MinTime / tr) + } else { + return tr * ((m.MinTime - tr + 1) / tr) + } +} + +func sortMetasByMinTime(metas []*metadata.Meta) { + sort.Slice(metas, func(i, j int) bool { + return metas[i].BlockMeta.MinTime < metas[j].BlockMeta.MinTime + }) +} diff --git a/pkg/compactor/planner_filter_test.go b/pkg/compactor/planner_filter_test.go new file mode 100644 index 00000000000..fceb4f2e351 --- /dev/null +++ b/pkg/compactor/planner_filter_test.go @@ -0,0 +1,342 @@ +package compactor + +import ( + "testing" + + "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/assert" + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +func TestGroupBlocksByCompactableRanges(t *testing.T) { + tests := map[string]struct { + ranges []int64 + blocks []*metadata.Meta + expected []blocksGroup + }{ + "no input blocks": { + ranges: []int64{20}, + blocks: nil, + expected: nil, + }, + "only 1 block in input": { + ranges: []int64{20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }, + expected: nil, + }, + "only 1 block for each range (single range)": { + ranges: []int64{20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }, + expected: nil, + }, + "only 1 block for each range (multiple ranges)": { + ranges: []int64{10, 20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }, + expected: nil, + }, + "input blocks can be compacted on the 1st range only": { + ranges: []int64{20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 25, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 50}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + }, + expected: []blocksGroup{ + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 25, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + {rangeStart: 40, rangeEnd: 60, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 50}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 60}}, + }}, + }, + }, + "input blocks can be compacted on the 2nd range only": { + ranges: []int64{10, 20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + }, + expected: []blocksGroup{ + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + {rangeStart: 60, rangeEnd: 80, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + }}, + }, + }, + "input blocks can be compacted on a mix of 1st and 2nd ranges, guaranteeing no overlaps and giving preference to smaller ranges": { + ranges: []int64{10, 20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 75, MaxTime: 80}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 10, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 10}}, + }}, + {rangeStart: 70, rangeEnd: 80, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 75, MaxTime: 80}}, + }}, + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + }, + }, + "input blocks have already been compacted with the largest range": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }, + expected: nil, + }, + "input blocks match the largest range but can be compacted because overlapping": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }, + expected: []blocksGroup{ + {rangeStart: 80, rangeEnd: 120, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }}, + }, + }, + "a block with time range crossing two 1st level ranges should be NOT considered for 1st level compaction": { + ranges: []int64{20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 30}}, // This block spans across two 1st level ranges. + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }, + expected: []blocksGroup{ + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + }, + }, + "a block with time range crossing two 1st level ranges should BE considered for 2nd level compaction": { + ranges: []int64{20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 30}}, // This block spans across two 1st level ranges. + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 40}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 40}}, + }}, + }, + }, + "a block with time range larger then the largest compaction range should NOT be considered for compaction": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 150}}, // This block is larger then the largest compaction range. + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }, + expected: []blocksGroup{ + {rangeStart: 80, rangeEnd: 120, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }}, + }, + }, + "a range containg the most recent block shouldn't be prematurely compacted if doesn't cover the full range": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 5, MaxTime: 8}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 9}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 12}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 13, MaxTime: 15}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 10, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 5, MaxTime: 8}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 9}}, + }}, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expected, groupBlocksByCompactableRanges(testData.blocks, testData.ranges)) + }) + } +} + +func TestGroupBlocksByRange(t *testing.T) { + tests := map[string]struct { + timeRange int64 + blocks []*metadata.Meta + expected []blocksGroup + }{ + "no input blocks": { + timeRange: 20, + blocks: nil, + expected: nil, + }, + "only 1 block in input": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }}, + }, + }, + "only 1 block per range": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + }}, + {rangeStart: 40, rangeEnd: 60, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }}, + }, + }, + "multiple blocks per range": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 55}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }}, + {rangeStart: 40, rangeEnd: 60, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 55}}, + }}, + }, + }, + "a block with time range larger then the range should be excluded": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, // This block is larger then the range. + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }}, + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }}, + }, + }, + "blocks with different time ranges but all fitting within the input range": { + timeRange: 40, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }}, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expected, groupBlocksByRange(testData.blocks, testData.timeRange)) + }) + } +} + +func TestBlocksGroup_overlaps(t *testing.T) { + tests := []struct { + first blocksGroup + second blocksGroup + expected bool + }{ + { + first: blocksGroup{rangeStart: 10, rangeEnd: 20}, + second: blocksGroup{rangeStart: 20, rangeEnd: 30}, + expected: false, + }, { + first: blocksGroup{rangeStart: 10, rangeEnd: 20}, + second: blocksGroup{rangeStart: 19, rangeEnd: 30}, + expected: true, + }, { + first: blocksGroup{rangeStart: 10, rangeEnd: 21}, + second: blocksGroup{rangeStart: 20, rangeEnd: 30}, + expected: true, + }, { + first: blocksGroup{rangeStart: 10, rangeEnd: 20}, + second: blocksGroup{rangeStart: 12, rangeEnd: 18}, + expected: true, + }, + } + + for _, tc := range tests { + assert.Equal(t, tc.expected, tc.first.overlaps(tc.second)) + assert.Equal(t, tc.expected, tc.second.overlaps(tc.first)) + } +} diff --git a/pkg/compactor/planner_test.go b/pkg/compactor/planner_test.go deleted file mode 100644 index 6c4d27f53ba..00000000000 --- a/pkg/compactor/planner_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package compactor - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/thanos-io/thanos/pkg/block/metadata" -) - -type PlannerMock struct { - mock.Mock -} - -func (p *PlannerMock) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { - args := p.Called(ctx, metasByMinTime) - return args.Get(0).([]*metadata.Meta), args.Error(1) -} - -func TestPlanner_ShouldReturnParallelPlanner(t *testing.T) { - var ranges []int64 - p, _ := NewPlanner(nil, ranges, "parallel") - - parallelPlanner := &ParallelPlanner{} - - assert.IsType(t, parallelPlanner, p.blocksPlanner) -} - -func TestPlanner_ShouldReturnDefaultPlanner(t *testing.T) { - var ranges []int64 - p, _ := NewPlanner(nil, ranges, "default") - - parallelPlanner := &ParallelPlanner{} - - assert.IsType(t, parallelPlanner, p.blocksPlanner) -} - -func TestPlanner_ShouldErrorOnNonExistentPlanner(t *testing.T) { - var ranges []int64 - _, err := NewPlanner(nil, ranges, "non-existent") - - assert.ErrorIs(t, err, errPlannerNotImplemented) -} - -func TestPlanner_PlanShouldCallBlocksPlannerPlan(t *testing.T) { - blockPlannerMock := &PlannerMock{} - blockPlannerMock.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) - - p := &Planner{ - blocksPlanner: blockPlannerMock, - } - - blockMetas := []*metadata.Meta{} - p.Plan(context.Background(), blockMetas) - - blockPlannerMock.AssertCalled(t, "Plan", context.Background(), blockMetas) -} From da811ff2f94837eeb199a39c1aa69cb5b1a5ef8f Mon Sep 17 00:00:00 2001 From: Albert Date: Thu, 24 Jun 2021 10:11:53 -0700 Subject: [PATCH 5/7] Add planner filter unit tests Signed-off-by: Albert --- docs/blocks-storage/compactor.md | 5 ++ pkg/compactor/planner_filter.go | 20 ++--- pkg/compactor/planner_filter_test.go | 119 +++++++++++++++++++++++++++ 3 files changed, 134 insertions(+), 10 deletions(-) diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index bcea40d0e30..df985f09e08 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -166,6 +166,11 @@ compactor: # CLI flag: -compactor.disabled-tenants [disabled_tenants: | default = ""] + # Enable planner filter which will filter groups of blocks within the Cortex + # compactor instead of using the Thanos to group blocks. + # CLI flag: -compactor.planner-filter-enabled + [planner_filter_enabled: | default = false] + # Shard tenants across multiple compactor instances. Sharding is required if # you run multiple compactor instances, in order to coordinate compactions and # avoid race conditions leading to the same tenant blocks simultaneously diff --git a/pkg/compactor/planner_filter.go b/pkg/compactor/planner_filter.go index 1569c6a6b1d..08ba071b58e 100644 --- a/pkg/compactor/planner_filter.go +++ b/pkg/compactor/planner_filter.go @@ -26,7 +26,7 @@ type PlannerFilter struct { compactorCfg Config metaSyncDir string - plans []*blocksGroup + plans []blocksGroup } func NewPlannerFilter(ctx context.Context, userID string, ulogger log.Logger, bucket objstore.InstrumentedBucket, fetcherFilters []block.MetadataFilter, compactorCfg Config, metaSyncDir string) (*PlannerFilter, error) { @@ -66,7 +66,7 @@ func (f *PlannerFilter) getUserBlocks(ctx context.Context) (map[ulid.ULID]*metad return metas, nil } -// Fetches blocks and generates plans that can be parallized and saves them +// Fetches blocks and generates plans that can be parallized and saves them in the PlannerFilter struct. func (f *PlannerFilter) fetchBlocksAndGeneratePlans(ctx context.Context) error { // Get blocks blocks, err := f.getUserBlocks(ctx) @@ -87,20 +87,20 @@ func (f *PlannerFilter) generatePlans(ctx context.Context, blocks map[ulid.ULID] mainGroups[key] = append(mainGroups[key], b) } - var plans []*blocksGroup + var plans []blocksGroup - for _, mainBlocks := range mainGroups { - for i, plan := range groupBlocksByCompactableRanges(mainBlocks, f.compactorCfg.BlockRanges.ToMilliseconds()) { + for k, mainBlocks := range mainGroups { + for i, plan := range groupBlocksByCompactableRanges(mainBlocks, f.compactorCfg.BlockRanges.ToMilliseconds(), f.ulogger) { // Nothing to do if we don't have at least 2 blocks. if len(plan.blocks) < 2 { continue } - plan.key = i - level.Info(f.ulogger).Log("msg", "Found plan for user", "user", f.userID, "plan", plan.String()) - plans = append(plans, &plan) + plan.key = fmt.Sprintf("%v_%v", k, i) + + plans = append(plans, plan) } } @@ -152,7 +152,7 @@ type blocksGroup struct { rangeStart int64 // Included. rangeEnd int64 // Excluded. blocks []*metadata.Meta - key int + key string } // overlaps returns whether the group range overlaps with the input group. @@ -166,7 +166,7 @@ func (g blocksGroup) overlaps(other blocksGroup) bool { func (g blocksGroup) String() string { out := strings.Builder{} - out.WriteString(fmt.Sprintf("Group range start: %d, range end: %d, blocks: ", g.rangeStart, g.rangeEnd)) + out.WriteString(fmt.Sprintf("Group range start: %d, range end: %d, key %v, blocks: ", g.rangeStart, g.rangeEnd, g.key)) for i, b := range g.blocks { if i > 0 { diff --git a/pkg/compactor/planner_filter_test.go b/pkg/compactor/planner_filter_test.go index fceb4f2e351..abcb828a01a 100644 --- a/pkg/compactor/planner_filter_test.go +++ b/pkg/compactor/planner_filter_test.go @@ -1,13 +1,132 @@ package compactor import ( + "context" "testing" + "time" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/block/metadata" ) +func TestPlannerFilterPlanGeneration(t *testing.T) { + block1ulid := ulid.MustNew(1, nil) + block2ulid := ulid.MustNew(2, nil) + block3ulid := ulid.MustNew(3, nil) + block4ulid := ulid.MustNew(4, nil) + block5ulid := ulid.MustNew(5, nil) + block6ulid := ulid.MustNew(6, nil) + block7ulid := ulid.MustNew(7, nil) + block8ulid := ulid.MustNew(8, nil) + block9ulid := ulid.MustNew(9, nil) + block10ulid := ulid.MustNew(10, nil) + block11ulid := ulid.MustNew(11, nil) + + blocks := + map[ulid.ULID]*metadata.Meta{ + block1ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block1ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block2ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block2ulid, MinTime: 3 * time.Hour.Milliseconds(), MaxTime: 4 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block3ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block3ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block4ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block4ulid, MinTime: 2 * time.Hour.Milliseconds(), MaxTime: 3 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block5ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block5ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, + }, + block6ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block6ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, + }, + block7ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block7ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block8ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block8ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, + }, + block9ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block9ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "3"}}, + }, + block10ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block10ulid, MinTime: 4 * time.Hour.Milliseconds(), MaxTime: 6 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, + }, + block11ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block11ulid, MinTime: 6 * time.Hour.Milliseconds(), MaxTime: 8 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, + }, + } + + tests := map[string]struct { + ranges cortex_tsdb.DurationList + blocks map[ulid.ULID]*metadata.Meta + expectedPlans []blocksGroup + }{ + "test basic planning": { + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1ulid: blocks[block1ulid], block2ulid: blocks[block2ulid], block3ulid: blocks[block3ulid], block4ulid: blocks[block4ulid], block5ulid: blocks[block5ulid], block6ulid: blocks[block6ulid]}, + expectedPlans: []blocksGroup{ + {rangeStart: 0, rangeEnd: 7200000, blocks: []*metadata.Meta{blocks[block6ulid], blocks[block5ulid]}, key: "0@14088339200549387484_0"}, + {rangeStart: 0, rangeEnd: 7200000, blocks: []*metadata.Meta{blocks[block3ulid], blocks[block1ulid]}, key: "0@6043952821095826047_0"}, + {rangeStart: 7200000, rangeEnd: 14400000, blocks: []*metadata.Meta{blocks[block4ulid], blocks[block2ulid]}, key: "0@6043952821095826047_1"}, + }, + }, + "test no compaction": { + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block7ulid: blocks[block7ulid], block8ulid: blocks[block8ulid], block9ulid: blocks[block9ulid]}, + expectedPlans: []blocksGroup{}, + }, + "test smallest range first": { + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1ulid: blocks[block1ulid], block2ulid: blocks[block2ulid], block3ulid: blocks[block3ulid], block4ulid: blocks[block4ulid], block10ulid: blocks[block10ulid], block11ulid: blocks[block11ulid]}, + expectedPlans: []blocksGroup{ + {rangeStart: 0, rangeEnd: 7200000, blocks: []*metadata.Meta{blocks[block3ulid], blocks[block1ulid]}, key: "0@6043952821095826047_0"}, + {rangeStart: 7200000, rangeEnd: 14400000, blocks: []*metadata.Meta{blocks[block4ulid], blocks[block2ulid]}, key: "0@6043952821095826047_1"}, + {rangeStart: 14400000, rangeEnd: 28800000, blocks: []*metadata.Meta{blocks[block10ulid], blocks[block11ulid]}, key: "0@14088339200549387484_0"}, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + compactorCfg := Config{} + flagext.DefaultValues(&compactorCfg) + compactorCfg.BlockRanges = testData.ranges + f := &PlannerFilter{ + userID: "test-user", + compactorCfg: compactorCfg, + ulogger: log.NewNopLogger(), + } + err := f.generatePlans(context.Background(), testData.blocks) + require.NoError(t, err) + actualPlans := f.plans + require.Len(t, actualPlans, len(testData.expectedPlans)) + for i, expectedPlan := range testData.expectedPlans { + assert.Equal(t, expectedPlan, actualPlans[i]) + } + }) + } +} + func TestGroupBlocksByCompactableRanges(t *testing.T) { tests := map[string]struct { ranges []int64 From 2c115d1408abb0fc04022e3943d31dc62979c26f Mon Sep 17 00:00:00 2001 From: Albert Date: Fri, 18 Jun 2021 09:04:08 -0700 Subject: [PATCH 6/7] Add planner filter Signed-off-by: Albert --- pkg/compactor/compactor.go | 52 +++- pkg/compactor/planner.go | 58 ----- pkg/compactor/planner_filter.go | 333 ++++++++++++++++++++++++++ pkg/compactor/planner_filter_test.go | 342 +++++++++++++++++++++++++++ pkg/compactor/planner_test.go | 58 ----- 5 files changed, 718 insertions(+), 125 deletions(-) delete mode 100644 pkg/compactor/planner.go create mode 100644 pkg/compactor/planner_filter.go create mode 100644 pkg/compactor/planner_filter_test.go delete mode 100644 pkg/compactor/planner_test.go diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index cd58888da92..1b41ff64552 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -62,7 +62,7 @@ var ( return nil, nil, err } - planner, _ := NewPlanner(logger, cfg.BlockRanges.ToMilliseconds(), "default") + planner := compact.NewTSDBBasedPlanner(logger, cfg.BlockRanges.ToMilliseconds()) return compactor, planner, nil } ) @@ -120,6 +120,9 @@ type Config struct { // Allow downstream projects to customise the blocks compactor. BlocksGrouperFactory BlocksGrouperFactory `yaml:"-"` BlocksCompactorFactory BlocksCompactorFactory `yaml:"-"` + + // Flag to enable planner filter + PlannerFilterEnabled bool `yaml:"planner_filter_enabled"` } // RegisterFlags registers the Compactor flags. @@ -146,6 +149,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { "If 0, blocks will be deleted straight away. Note that deleting blocks immediately can cause query failures.") f.DurationVar(&cfg.TenantCleanupDelay, "compactor.tenant-cleanup-delay", 6*time.Hour, "For tenants marked for deletion, this is time between deleting of last block, and doing final cleanup (marker files, debug files) of the tenant.") f.BoolVar(&cfg.BlockDeletionMarksMigrationEnabled, "compactor.block-deletion-marks-migration-enabled", true, "When enabled, at compactor startup the bucket will be scanned and all found deletion marks inside the block location will be copied to the markers global location too. This option can (and should) be safely disabled as soon as the compactor has successfully run at least once.") + f.BoolVar(&cfg.PlannerFilterEnabled, "compactor.planner-filter-enabled", false, "Filter and plan blocks within PlannerFilter instead of through Thanos planner and grouper.") f.Var(&cfg.EnabledTenants, "compactor.enabled-tenants", "Comma separated list of tenants that can be compacted. If specified, only these tenants will be compacted by compactor, otherwise all tenants can be compacted. Subject to sharding.") f.Var(&cfg.DisabledTenants, "compactor.disabled-tenants", "Comma separated list of tenants that cannot be compacted by this compactor. If specified, and compactor would normally pick given tenant for compaction (via -compactor.enabled-tenants or sharding), it will be ignored instead.") @@ -606,6 +610,43 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { time.Duration(c.compactorCfg.DeletionDelay.Seconds()/2)*time.Second, c.compactorCfg.MetaSyncConcurrency) + fetcherFilters := []block.MetadataFilter{ + // Remove the ingester ID because we don't shard blocks anymore, while still + // honoring the shard ID if sharding was done in the past. + NewLabelRemoverFilter([]string{cortex_tsdb.IngesterIDExternalLabel}), + block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg), + ignoreDeletionMarkFilter, + deduplicateBlocksFilter, + } + + // If config is set to use planner filter then generate plans and append it to the fetcherFilters + if c.compactorCfg.PlannerFilterEnabled { + level.Info(c.logger).Log("msg", "Compactor using planner filter") + + // Create a new planner filter + f, err := NewPlannerFilter( + ctx, + userID, + ulogger, + bucket, + fetcherFilters, + c.compactorCfg, + c.metaSyncDirForUser(userID), + ) + if err != nil { + return err + } + + // Generate all parallelizable plans + err = f.fetchBlocksAndGeneratePlans(ctx) + if err != nil { + return err + } + + // Add the planner filter to the fetcher's filters + fetcherFilters = append(fetcherFilters, f) + } + fetcher, err := block.NewMetaFetcher( ulogger, c.compactorCfg.MetaSyncConcurrency, @@ -613,14 +654,7 @@ func (c *Compactor) compactUser(ctx context.Context, userID string) error { c.metaSyncDirForUser(userID), reg, // List of filters to apply (order matters). - []block.MetadataFilter{ - // Remove the ingester ID because we don't shard blocks anymore, while still - // honoring the shard ID if sharding was done in the past. - NewLabelRemoverFilter([]string{cortex_tsdb.IngesterIDExternalLabel}), - block.NewConsistencyDelayMetaFilter(ulogger, c.compactorCfg.ConsistencyDelay, reg), - ignoreDeletionMarkFilter, - deduplicateBlocksFilter, - }, + fetcherFilters, nil, ) if err != nil { diff --git a/pkg/compactor/planner.go b/pkg/compactor/planner.go deleted file mode 100644 index 7fc07495c08..00000000000 --- a/pkg/compactor/planner.go +++ /dev/null @@ -1,58 +0,0 @@ -package compactor - -import ( - "context" - "errors" - "fmt" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" - "github.com/thanos-io/thanos/pkg/block/metadata" - "github.com/thanos-io/thanos/pkg/compact" -) - -var ( - errPlannerNotImplemented = errors.New("planner type not implemented") -) - -type Planner struct { - blocksPlanner compact.Planner - logger log.Logger -} - -func NewPlanner(logger log.Logger, ranges []int64, plannerType string) (*Planner, error) { - if logger == nil { - logger = log.NewNopLogger() - } - - var blocksPlanner compact.Planner - switch plannerType { - case "parallel": - blocksPlanner = NewParallelPlanner(logger, ranges) - case "default": - blocksPlanner = compact.NewTSDBBasedPlanner(logger, ranges) - default: - return nil, errPlannerNotImplemented - } - - p := &Planner{ - logger: logger, - blocksPlanner: blocksPlanner, - } - - return p, nil -} - -func (p *Planner) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { - return p.blocksPlanner.Plan(ctx, metasByMinTime) -} - -func (p *Planner) PrintPlan(ctx context.Context, metasByMinTime []*metadata.Meta) { - toCompact, err := p.Plan(ctx, metasByMinTime) - - if err != nil { - return - } - - level.Info(p.logger).Log("msg", "Compaction plan generated: ", "plan", fmt.Sprintf("%v", toCompact)) -} diff --git a/pkg/compactor/planner_filter.go b/pkg/compactor/planner_filter.go new file mode 100644 index 00000000000..1569c6a6b1d --- /dev/null +++ b/pkg/compactor/planner_filter.go @@ -0,0 +1,333 @@ +package compactor + +import ( + "context" + "fmt" + "sort" + "strings" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" + "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/compact" + "github.com/thanos-io/thanos/pkg/extprom" + "github.com/thanos-io/thanos/pkg/objstore" +) + +type PlannerFilter struct { + userID string + ulogger log.Logger + bucket objstore.InstrumentedBucket + fetcherFilters []block.MetadataFilter + compactorCfg Config + metaSyncDir string + + plans []*blocksGroup +} + +func NewPlannerFilter(ctx context.Context, userID string, ulogger log.Logger, bucket objstore.InstrumentedBucket, fetcherFilters []block.MetadataFilter, compactorCfg Config, metaSyncDir string) (*PlannerFilter, error) { + f := &PlannerFilter{ + userID: userID, + ulogger: ulogger, + bucket: bucket, + fetcherFilters: fetcherFilters, + compactorCfg: compactorCfg, + metaSyncDir: metaSyncDir, + } + + return f, nil +} + +// Gets the blocks of the user. +func (f *PlannerFilter) getUserBlocks(ctx context.Context) (map[ulid.ULID]*metadata.Meta, error) { + fetcher, err := block.NewMetaFetcher( + f.ulogger, + f.compactorCfg.MetaSyncConcurrency, + f.bucket, + f.metaSyncDir, + prometheus.NewRegistry(), + // List of filters to apply (order matters). + f.fetcherFilters, + nil, + ) + if err != nil { + return nil, err + } + + metas, _, err := fetcher.Fetch(ctx) + if err != nil { + return nil, err + } + + return metas, nil +} + +// Fetches blocks and generates plans that can be parallized and saves them +func (f *PlannerFilter) fetchBlocksAndGeneratePlans(ctx context.Context) error { + // Get blocks + blocks, err := f.getUserBlocks(ctx) + if err != nil { + return err + } + + return f.generatePlans(ctx, blocks) +} + +// Generates plans that can be parallized and saves them +func (f *PlannerFilter) generatePlans(ctx context.Context, blocks map[ulid.ULID]*metadata.Meta) error { + // First of all we have to group blocks using the Thanos default + // grouping (based on downsample resolution + external labels). + mainGroups := map[string][]*metadata.Meta{} + for _, b := range blocks { + key := compact.DefaultGroupKey(b.Thanos) + mainGroups[key] = append(mainGroups[key], b) + } + + var plans []*blocksGroup + + for _, mainBlocks := range mainGroups { + for i, plan := range groupBlocksByCompactableRanges(mainBlocks, f.compactorCfg.BlockRanges.ToMilliseconds()) { + // Nothing to do if we don't have at least 2 blocks. + if len(plan.blocks) < 2 { + continue + } + + plan.key = i + + level.Info(f.ulogger).Log("msg", "Found plan for user", "user", f.userID, "plan", plan.String()) + + plans = append(plans, &plan) + } + } + + // Ensure groups are sorted by smallest range, oldest min time first. The rationale + // is that we wanna favor smaller ranges first (ie. to deduplicate samples sooner + // than later) and older ones are more likely to be "complete" (no missing block still + // to be uploaded). + sort.SliceStable(plans, func(i, j int) bool { + iLength := plans[i].maxTime() - plans[i].minTime() + jLength := plans[j].maxTime() - plans[j].minTime() + + if iLength != jLength { + return iLength < jLength + } + if plans[i].minTime() != plans[j].minTime() { + return plans[i].minTime() < plans[j].minTime() + } + + // Guarantee stable sort for tests. + return plans[i].key < plans[j].key + }) + + f.plans = plans + + return nil +} + +// Filter removes the blocks for every single plan except one. +// Currently we are just using the first plan every single time. +// TODO: Filter plans by putting each plan on the ring and having the compactor select a plan (filter the rest) +func (f *PlannerFilter) Filter(_ context.Context, metas map[ulid.ULID]*metadata.Meta, _ *extprom.TxGaugeVec) error { + // Plans have to exist to be filtered, if no blocks need to be compacted then nothing needs to be filtered. + if len(f.plans) < 1 { + return nil + } + + // Delete blocks for each plan except the first one. + for _, p := range f.plans[1:] { + for _, b := range p.blocks { + delete(metas, b.BlockMeta.ULID) // check what happens if it tries to delete a key that doesn't exist + } + } + + return nil +} + +// blocksGroup struct and functions copied and adjusted from https://github.com/cortexproject/cortex/pull/2616 +type blocksGroup struct { + rangeStart int64 // Included. + rangeEnd int64 // Excluded. + blocks []*metadata.Meta + key int +} + +// overlaps returns whether the group range overlaps with the input group. +func (g blocksGroup) overlaps(other blocksGroup) bool { + if g.rangeStart >= other.rangeEnd || other.rangeStart >= g.rangeEnd { + return false + } + + return true +} + +func (g blocksGroup) String() string { + out := strings.Builder{} + out.WriteString(fmt.Sprintf("Group range start: %d, range end: %d, blocks: ", g.rangeStart, g.rangeEnd)) + + for i, b := range g.blocks { + if i > 0 { + out.WriteString(", ") + } + + minT := time.Unix(0, b.MinTime*int64(time.Millisecond)).UTC() + maxT := time.Unix(0, b.MaxTime*int64(time.Millisecond)).UTC() + out.WriteString(fmt.Sprintf("%s (min time: %s, max time: %s)", b.ULID.String(), minT.String(), maxT.String())) + } + + return out.String() +} + +func (g blocksGroup) rangeLength() int64 { + return g.rangeEnd - g.rangeStart +} + +// minTime returns the MinTime across all blocks in the group. +func (g blocksGroup) minTime() int64 { + // Blocks are expected to be sorted by MinTime. + return g.blocks[0].MinTime +} + +// maxTime returns the MaxTime across all blocks in the group. +func (g blocksGroup) maxTime() int64 { + max := g.blocks[0].MaxTime + + for _, b := range g.blocks[1:] { + if b.MaxTime > max { + max = b.MaxTime + } + } + + return max +} + +// groupBlocksByCompactableRanges groups input blocks by compactable ranges, giving preference +// to smaller ranges. If a smaller range contains more than 1 block (and thus it should +// be compacted), the larger range block group is not generated until each of its +// smaller ranges have 1 block each at most. +func groupBlocksByCompactableRanges(blocks []*metadata.Meta, ranges []int64) []blocksGroup { + if len(blocks) == 0 { + return nil + } + + // Sort blocks by min time. + sortMetasByMinTime(blocks) + + var groups []blocksGroup + + for _, tr := range ranges { + nextGroup: + for _, group := range groupBlocksByRange(blocks, tr) { + // Exclude groups with a single block, because no compaction is required. + if len(group.blocks) < 2 { + continue + } + + // Ensure this group's range does not overlap with any group already scheduled + // for compaction by a smaller range, because we need to guarantee that smaller ranges + // are compacted first. + for _, c := range groups { + if group.overlaps(c) { + continue nextGroup + } + } + + groups = append(groups, group) + } + } + + // Ensure we don't compact the most recent blocks prematurely when another one of + // the same size still fits in the range. To do it, we consider valid a group only + // if it's before the most recent block or if it fully covers the range. + highestMinTime := blocks[len(blocks)-1].MinTime + + for idx := 0; idx < len(groups); { + group := groups[idx] + + // If the group covers a range before the most recent block, it's fine. + if group.rangeEnd <= highestMinTime { + idx++ + continue + } + + // If the group covers the full range, it's fine. + if group.maxTime()-group.minTime() == group.rangeLength() { + idx++ + continue + } + + // We hit into a group which would compact recent blocks prematurely, + // so we need to filter it out. + groups = append(groups[:idx], groups[idx+1:]...) + } + + return groups +} + +// groupBlocksByRange splits the blocks by the time range. The range sequence starts at 0. +// Input blocks are expected to be sorted by MinTime. +// +// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30 +// it returns [0-10, 10-20], [50-60], [90-100]. +func groupBlocksByRange(blocks []*metadata.Meta, tr int64) []blocksGroup { + var ret []blocksGroup + + for i := 0; i < len(blocks); { + var ( + group blocksGroup + m = blocks[i] + ) + + group.rangeStart = getRangeStart(m, tr) + group.rangeEnd = group.rangeStart + tr + + // Skip blocks that don't fall into the range. This can happen via mis-alignment or + // by being the multiple of the intended range. + if m.MaxTime > group.rangeEnd { + i++ + continue + } + + // Add all blocks to the current group that are within [t0, t0+tr]. + for ; i < len(blocks); i++ { + // If the block does not start within this group, then we should break the iteration + // and move it to the next group. + if blocks[i].MinTime >= group.rangeEnd { + break + } + + // If the block doesn't fall into this group, but it started within this group then it + // means it spans across multiple ranges and we should skip it. + if blocks[i].MaxTime > group.rangeEnd { + continue + } + + group.blocks = append(group.blocks, blocks[i]) + } + + if len(group.blocks) > 0 { + ret = append(ret, group) + } + } + + return ret +} + +func getRangeStart(m *metadata.Meta, tr int64) int64 { + // Compute start of aligned time range of size tr closest to the current block's start. + // This code has been copied from TSDB. + if m.MinTime >= 0 { + return tr * (m.MinTime / tr) + } else { + return tr * ((m.MinTime - tr + 1) / tr) + } +} + +func sortMetasByMinTime(metas []*metadata.Meta) { + sort.Slice(metas, func(i, j int) bool { + return metas[i].BlockMeta.MinTime < metas[j].BlockMeta.MinTime + }) +} diff --git a/pkg/compactor/planner_filter_test.go b/pkg/compactor/planner_filter_test.go new file mode 100644 index 00000000000..fceb4f2e351 --- /dev/null +++ b/pkg/compactor/planner_filter_test.go @@ -0,0 +1,342 @@ +package compactor + +import ( + "testing" + + "github.com/prometheus/prometheus/tsdb" + "github.com/stretchr/testify/assert" + "github.com/thanos-io/thanos/pkg/block/metadata" +) + +func TestGroupBlocksByCompactableRanges(t *testing.T) { + tests := map[string]struct { + ranges []int64 + blocks []*metadata.Meta + expected []blocksGroup + }{ + "no input blocks": { + ranges: []int64{20}, + blocks: nil, + expected: nil, + }, + "only 1 block in input": { + ranges: []int64{20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }, + expected: nil, + }, + "only 1 block for each range (single range)": { + ranges: []int64{20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }, + expected: nil, + }, + "only 1 block for each range (multiple ranges)": { + ranges: []int64{10, 20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }, + expected: nil, + }, + "input blocks can be compacted on the 1st range only": { + ranges: []int64{20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 25, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 50}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + }, + expected: []blocksGroup{ + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 25, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + {rangeStart: 40, rangeEnd: 60, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 50}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 60}}, + }}, + }, + }, + "input blocks can be compacted on the 2nd range only": { + ranges: []int64{10, 20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + }, + expected: []blocksGroup{ + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + {rangeStart: 60, rangeEnd: 80, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + }}, + }, + }, + "input blocks can be compacted on a mix of 1st and 2nd ranges, guaranteeing no overlaps and giving preference to smaller ranges": { + ranges: []int64{10, 20}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 60, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 75, MaxTime: 80}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 10, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 10}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 10}}, + }}, + {rangeStart: 70, rangeEnd: 80, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 70, MaxTime: 80}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 75, MaxTime: 80}}, + }}, + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + }, + }, + "input blocks have already been compacted with the largest range": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }, + expected: nil, + }, + "input blocks match the largest range but can be compacted because overlapping": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }, + expected: []blocksGroup{ + {rangeStart: 80, rangeEnd: 120, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }}, + }, + }, + "a block with time range crossing two 1st level ranges should be NOT considered for 1st level compaction": { + ranges: []int64{20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 30}}, // This block spans across two 1st level ranges. + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }, + expected: []blocksGroup{ + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 40}}, + }}, + }, + }, + "a block with time range crossing two 1st level ranges should BE considered for 2nd level compaction": { + ranges: []int64{20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 30}}, // This block spans across two 1st level ranges. + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 40}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 30}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 40}}, + }}, + }, + }, + "a block with time range larger then the largest compaction range should NOT be considered for compaction": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 30, MaxTime: 150}}, // This block is larger then the largest compaction range. + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 70}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }, + expected: []blocksGroup{ + {rangeStart: 80, rangeEnd: 120, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 80, MaxTime: 120}}, + }}, + }, + }, + "a range containg the most recent block shouldn't be prematurely compacted if doesn't cover the full range": { + ranges: []int64{10, 20, 40}, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 5, MaxTime: 8}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 9}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 12}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 13, MaxTime: 15}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 10, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 5, MaxTime: 8}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 7, MaxTime: 9}}, + }}, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expected, groupBlocksByCompactableRanges(testData.blocks, testData.ranges)) + }) + } +} + +func TestGroupBlocksByRange(t *testing.T) { + tests := map[string]struct { + timeRange int64 + blocks []*metadata.Meta + expected []blocksGroup + }{ + "no input blocks": { + timeRange: 20, + blocks: nil, + expected: nil, + }, + "only 1 block in input": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }}, + }, + }, + "only 1 block per range": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + }}, + {rangeStart: 40, rangeEnd: 60, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + }}, + }, + }, + "multiple blocks per range": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 55}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 15}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }}, + {rangeStart: 40, rangeEnd: 60, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 40, MaxTime: 60}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 50, MaxTime: 55}}, + }}, + }, + }, + "a block with time range larger then the range should be excluded": { + timeRange: 20, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, // This block is larger then the range. + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 20, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + }}, + {rangeStart: 20, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }}, + }, + }, + "blocks with different time ranges but all fitting within the input range": { + timeRange: 40, + blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }, + expected: []blocksGroup{ + {rangeStart: 0, rangeEnd: 40, blocks: []*metadata.Meta{ + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 0, MaxTime: 40}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 10, MaxTime: 20}}, + {BlockMeta: tsdb.BlockMeta{MinTime: 20, MaxTime: 30}}, + }}, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + assert.Equal(t, testData.expected, groupBlocksByRange(testData.blocks, testData.timeRange)) + }) + } +} + +func TestBlocksGroup_overlaps(t *testing.T) { + tests := []struct { + first blocksGroup + second blocksGroup + expected bool + }{ + { + first: blocksGroup{rangeStart: 10, rangeEnd: 20}, + second: blocksGroup{rangeStart: 20, rangeEnd: 30}, + expected: false, + }, { + first: blocksGroup{rangeStart: 10, rangeEnd: 20}, + second: blocksGroup{rangeStart: 19, rangeEnd: 30}, + expected: true, + }, { + first: blocksGroup{rangeStart: 10, rangeEnd: 21}, + second: blocksGroup{rangeStart: 20, rangeEnd: 30}, + expected: true, + }, { + first: blocksGroup{rangeStart: 10, rangeEnd: 20}, + second: blocksGroup{rangeStart: 12, rangeEnd: 18}, + expected: true, + }, + } + + for _, tc := range tests { + assert.Equal(t, tc.expected, tc.first.overlaps(tc.second)) + assert.Equal(t, tc.expected, tc.second.overlaps(tc.first)) + } +} diff --git a/pkg/compactor/planner_test.go b/pkg/compactor/planner_test.go deleted file mode 100644 index 6c4d27f53ba..00000000000 --- a/pkg/compactor/planner_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package compactor - -import ( - "context" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/thanos-io/thanos/pkg/block/metadata" -) - -type PlannerMock struct { - mock.Mock -} - -func (p *PlannerMock) Plan(ctx context.Context, metasByMinTime []*metadata.Meta) ([]*metadata.Meta, error) { - args := p.Called(ctx, metasByMinTime) - return args.Get(0).([]*metadata.Meta), args.Error(1) -} - -func TestPlanner_ShouldReturnParallelPlanner(t *testing.T) { - var ranges []int64 - p, _ := NewPlanner(nil, ranges, "parallel") - - parallelPlanner := &ParallelPlanner{} - - assert.IsType(t, parallelPlanner, p.blocksPlanner) -} - -func TestPlanner_ShouldReturnDefaultPlanner(t *testing.T) { - var ranges []int64 - p, _ := NewPlanner(nil, ranges, "default") - - parallelPlanner := &ParallelPlanner{} - - assert.IsType(t, parallelPlanner, p.blocksPlanner) -} - -func TestPlanner_ShouldErrorOnNonExistentPlanner(t *testing.T) { - var ranges []int64 - _, err := NewPlanner(nil, ranges, "non-existent") - - assert.ErrorIs(t, err, errPlannerNotImplemented) -} - -func TestPlanner_PlanShouldCallBlocksPlannerPlan(t *testing.T) { - blockPlannerMock := &PlannerMock{} - blockPlannerMock.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, nil) - - p := &Planner{ - blocksPlanner: blockPlannerMock, - } - - blockMetas := []*metadata.Meta{} - p.Plan(context.Background(), blockMetas) - - blockPlannerMock.AssertCalled(t, "Plan", context.Background(), blockMetas) -} From 28e319a5b73034050817f864ccae4b2183268239 Mon Sep 17 00:00:00 2001 From: Albert Date: Thu, 24 Jun 2021 10:11:53 -0700 Subject: [PATCH 7/7] Add planner filter unit tests Signed-off-by: Albert --- docs/blocks-storage/compactor.md | 5 ++ pkg/compactor/planner_filter.go | 20 ++--- pkg/compactor/planner_filter_test.go | 119 +++++++++++++++++++++++++++ 3 files changed, 134 insertions(+), 10 deletions(-) diff --git a/docs/blocks-storage/compactor.md b/docs/blocks-storage/compactor.md index bcea40d0e30..df985f09e08 100644 --- a/docs/blocks-storage/compactor.md +++ b/docs/blocks-storage/compactor.md @@ -166,6 +166,11 @@ compactor: # CLI flag: -compactor.disabled-tenants [disabled_tenants: | default = ""] + # Enable planner filter which will filter groups of blocks within the Cortex + # compactor instead of using the Thanos to group blocks. + # CLI flag: -compactor.planner-filter-enabled + [planner_filter_enabled: | default = false] + # Shard tenants across multiple compactor instances. Sharding is required if # you run multiple compactor instances, in order to coordinate compactions and # avoid race conditions leading to the same tenant blocks simultaneously diff --git a/pkg/compactor/planner_filter.go b/pkg/compactor/planner_filter.go index 1569c6a6b1d..08ba071b58e 100644 --- a/pkg/compactor/planner_filter.go +++ b/pkg/compactor/planner_filter.go @@ -26,7 +26,7 @@ type PlannerFilter struct { compactorCfg Config metaSyncDir string - plans []*blocksGroup + plans []blocksGroup } func NewPlannerFilter(ctx context.Context, userID string, ulogger log.Logger, bucket objstore.InstrumentedBucket, fetcherFilters []block.MetadataFilter, compactorCfg Config, metaSyncDir string) (*PlannerFilter, error) { @@ -66,7 +66,7 @@ func (f *PlannerFilter) getUserBlocks(ctx context.Context) (map[ulid.ULID]*metad return metas, nil } -// Fetches blocks and generates plans that can be parallized and saves them +// Fetches blocks and generates plans that can be parallized and saves them in the PlannerFilter struct. func (f *PlannerFilter) fetchBlocksAndGeneratePlans(ctx context.Context) error { // Get blocks blocks, err := f.getUserBlocks(ctx) @@ -87,20 +87,20 @@ func (f *PlannerFilter) generatePlans(ctx context.Context, blocks map[ulid.ULID] mainGroups[key] = append(mainGroups[key], b) } - var plans []*blocksGroup + var plans []blocksGroup - for _, mainBlocks := range mainGroups { - for i, plan := range groupBlocksByCompactableRanges(mainBlocks, f.compactorCfg.BlockRanges.ToMilliseconds()) { + for k, mainBlocks := range mainGroups { + for i, plan := range groupBlocksByCompactableRanges(mainBlocks, f.compactorCfg.BlockRanges.ToMilliseconds(), f.ulogger) { // Nothing to do if we don't have at least 2 blocks. if len(plan.blocks) < 2 { continue } - plan.key = i - level.Info(f.ulogger).Log("msg", "Found plan for user", "user", f.userID, "plan", plan.String()) - plans = append(plans, &plan) + plan.key = fmt.Sprintf("%v_%v", k, i) + + plans = append(plans, plan) } } @@ -152,7 +152,7 @@ type blocksGroup struct { rangeStart int64 // Included. rangeEnd int64 // Excluded. blocks []*metadata.Meta - key int + key string } // overlaps returns whether the group range overlaps with the input group. @@ -166,7 +166,7 @@ func (g blocksGroup) overlaps(other blocksGroup) bool { func (g blocksGroup) String() string { out := strings.Builder{} - out.WriteString(fmt.Sprintf("Group range start: %d, range end: %d, blocks: ", g.rangeStart, g.rangeEnd)) + out.WriteString(fmt.Sprintf("Group range start: %d, range end: %d, key %v, blocks: ", g.rangeStart, g.rangeEnd, g.key)) for i, b := range g.blocks { if i > 0 { diff --git a/pkg/compactor/planner_filter_test.go b/pkg/compactor/planner_filter_test.go index fceb4f2e351..abcb828a01a 100644 --- a/pkg/compactor/planner_filter_test.go +++ b/pkg/compactor/planner_filter_test.go @@ -1,13 +1,132 @@ package compactor import ( + "context" "testing" + "time" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util/flagext" + "github.com/go-kit/kit/log" + "github.com/oklog/ulid" "github.com/prometheus/prometheus/tsdb" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/thanos-io/thanos/pkg/block/metadata" ) +func TestPlannerFilterPlanGeneration(t *testing.T) { + block1ulid := ulid.MustNew(1, nil) + block2ulid := ulid.MustNew(2, nil) + block3ulid := ulid.MustNew(3, nil) + block4ulid := ulid.MustNew(4, nil) + block5ulid := ulid.MustNew(5, nil) + block6ulid := ulid.MustNew(6, nil) + block7ulid := ulid.MustNew(7, nil) + block8ulid := ulid.MustNew(8, nil) + block9ulid := ulid.MustNew(9, nil) + block10ulid := ulid.MustNew(10, nil) + block11ulid := ulid.MustNew(11, nil) + + blocks := + map[ulid.ULID]*metadata.Meta{ + block1ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block1ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block2ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block2ulid, MinTime: 3 * time.Hour.Milliseconds(), MaxTime: 4 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block3ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block3ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block4ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block4ulid, MinTime: 2 * time.Hour.Milliseconds(), MaxTime: 3 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block5ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block5ulid, MinTime: 1 * time.Hour.Milliseconds(), MaxTime: 2 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, + }, + block6ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block6ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, + }, + block7ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block7ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "1"}}, + }, + block8ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block8ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, + }, + block9ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block9ulid, MinTime: 0 * time.Hour.Milliseconds(), MaxTime: 1 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "3"}}, + }, + block10ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block10ulid, MinTime: 4 * time.Hour.Milliseconds(), MaxTime: 6 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, + }, + block11ulid: { + BlockMeta: tsdb.BlockMeta{ULID: block11ulid, MinTime: 6 * time.Hour.Milliseconds(), MaxTime: 8 * time.Hour.Milliseconds()}, + Thanos: metadata.Thanos{Labels: map[string]string{"external": "2"}}, + }, + } + + tests := map[string]struct { + ranges cortex_tsdb.DurationList + blocks map[ulid.ULID]*metadata.Meta + expectedPlans []blocksGroup + }{ + "test basic planning": { + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1ulid: blocks[block1ulid], block2ulid: blocks[block2ulid], block3ulid: blocks[block3ulid], block4ulid: blocks[block4ulid], block5ulid: blocks[block5ulid], block6ulid: blocks[block6ulid]}, + expectedPlans: []blocksGroup{ + {rangeStart: 0, rangeEnd: 7200000, blocks: []*metadata.Meta{blocks[block6ulid], blocks[block5ulid]}, key: "0@14088339200549387484_0"}, + {rangeStart: 0, rangeEnd: 7200000, blocks: []*metadata.Meta{blocks[block3ulid], blocks[block1ulid]}, key: "0@6043952821095826047_0"}, + {rangeStart: 7200000, rangeEnd: 14400000, blocks: []*metadata.Meta{blocks[block4ulid], blocks[block2ulid]}, key: "0@6043952821095826047_1"}, + }, + }, + "test no compaction": { + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block7ulid: blocks[block7ulid], block8ulid: blocks[block8ulid], block9ulid: blocks[block9ulid]}, + expectedPlans: []blocksGroup{}, + }, + "test smallest range first": { + ranges: []time.Duration{2 * time.Hour, 4 * time.Hour}, + blocks: map[ulid.ULID]*metadata.Meta{block1ulid: blocks[block1ulid], block2ulid: blocks[block2ulid], block3ulid: blocks[block3ulid], block4ulid: blocks[block4ulid], block10ulid: blocks[block10ulid], block11ulid: blocks[block11ulid]}, + expectedPlans: []blocksGroup{ + {rangeStart: 0, rangeEnd: 7200000, blocks: []*metadata.Meta{blocks[block3ulid], blocks[block1ulid]}, key: "0@6043952821095826047_0"}, + {rangeStart: 7200000, rangeEnd: 14400000, blocks: []*metadata.Meta{blocks[block4ulid], blocks[block2ulid]}, key: "0@6043952821095826047_1"}, + {rangeStart: 14400000, rangeEnd: 28800000, blocks: []*metadata.Meta{blocks[block10ulid], blocks[block11ulid]}, key: "0@14088339200549387484_0"}, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + compactorCfg := Config{} + flagext.DefaultValues(&compactorCfg) + compactorCfg.BlockRanges = testData.ranges + f := &PlannerFilter{ + userID: "test-user", + compactorCfg: compactorCfg, + ulogger: log.NewNopLogger(), + } + err := f.generatePlans(context.Background(), testData.blocks) + require.NoError(t, err) + actualPlans := f.plans + require.Len(t, actualPlans, len(testData.expectedPlans)) + for i, expectedPlan := range testData.expectedPlans { + assert.Equal(t, expectedPlan, actualPlans[i]) + } + }) + } +} + func TestGroupBlocksByCompactableRanges(t *testing.T) { tests := map[string]struct { ranges []int64