From 1434ef6a87af171aed55e4105fae17e5198de3a9 Mon Sep 17 00:00:00 2001 From: Erlan Zholdubai uulu Date: Thu, 28 Mar 2024 08:41:20 -0700 Subject: [PATCH 1/2] add context cancellation checks on merging GetLabel slices Signed-off-by: Erlan Zholdubai uulu --- pkg/distributor/distributor.go | 10 ++++++++-- pkg/util/strings.go | 35 ++++++++++++++++++++++++---------- pkg/util/strings_test.go | 5 ++++- 3 files changed, 37 insertions(+), 13 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 3bb3256fe8..2e01992df4 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -973,7 +973,10 @@ func (d *Distributor) LabelValuesForLabelNameCommon(ctx context.Context, from, t for i, resp := range resps { values[i] = resp.([]string) } - r := util.MergeSlicesParallel(mergeSlicesParallelism, values...) + r, err := util.MergeSlicesParallel(ctx, mergeSlicesParallelism, values...) + if err != nil { + return nil, err + } span.SetTag("result_length", len(r)) return r, nil } @@ -1043,7 +1046,10 @@ func (d *Distributor) LabelNamesCommon(ctx context.Context, from, to model.Time, for i, resp := range resps { values[i] = resp.([]string) } - r := util.MergeSlicesParallel(mergeSlicesParallelism, values...) + r, err := util.MergeSlicesParallel(ctx, mergeSlicesParallelism, values...) + if err != nil { + return nil, err + } span.SetTag("result_length", len(r)) return r, nil diff --git a/pkg/util/strings.go b/pkg/util/strings.go index 30a3283c53..c085452286 100644 --- a/pkg/util/strings.go +++ b/pkg/util/strings.go @@ -1,6 +1,7 @@ package util import ( + "context" "sync" "unsafe" @@ -37,17 +38,18 @@ func StringsClone(s string) string { // MergeSlicesParallel merge sorted slices in parallel // using the MergeSortedSlices function -func MergeSlicesParallel(parallelism int, a ...[]string) []string { +func MergeSlicesParallel(ctx context.Context, parallelism int, a ...[]string) ([]string, error) { if parallelism <= 1 { - return MergeSortedSlices(a...) + return MergeSortedSlices(ctx, a...) } if len(a) == 0 { - return nil + return nil, nil } if len(a) == 1 { - return a[0] + return a[0], nil } c := make(chan []string, len(a)) + errCh := make(chan error, 1) wg := sync.WaitGroup{} var r [][]string p := min(parallelism, len(a)/2) @@ -57,7 +59,13 @@ func MergeSlicesParallel(parallelism int, a ...[]string) []string { wg.Add(1) go func(i int) { m := min(len(a), i+batchSize) - c <- MergeSortedSlices(a[i:m]...) + r, e := MergeSortedSlices(ctx, a[i:m]...) + if e != nil { + errCh <- e + wg.Done() + return + } + c <- r wg.Done() }(i) } @@ -65,13 +73,17 @@ func MergeSlicesParallel(parallelism int, a ...[]string) []string { go func() { wg.Wait() close(c) + close(errCh) }() + if err := <-errCh; err != nil { + return nil, err + } for s := range c { r = append(r, s) } - return MergeSortedSlices(r...) + return MergeSortedSlices(ctx, r...) } func NewStringListIter(s []string) *StringListIter { @@ -98,9 +110,9 @@ var MAX_STRING = string([]byte{0xff}) // MergeSortedSlices merges a set of sorted string slices into a single ones // while removing all duplicates. -func MergeSortedSlices(a ...[]string) []string { +func MergeSortedSlices(ctx context.Context, a ...[]string) ([]string, error) { if len(a) == 1 { - return a[0] + return a[0], nil } its := make([]*StringListIter, 0, len(a)) sumLengh := 0 @@ -111,16 +123,19 @@ func MergeSortedSlices(a ...[]string) []string { lt := loser.New(its, MAX_STRING) if sumLengh == 0 { - return []string{} + return []string{}, nil } r := make([]string, 0, sumLengh*2/10) var current string for lt.Next() { + if ctx.Err() != nil { + return nil, ctx.Err() + } if lt.At() != current { current = lt.At() r = append(r, current) } } - return r + return r, nil } diff --git a/pkg/util/strings_test.go b/pkg/util/strings_test.go index 05cf9d2096..75496c7c46 100644 --- a/pkg/util/strings_test.go +++ b/pkg/util/strings_test.go @@ -1,6 +1,7 @@ package util import ( + "context" "fmt" "math/rand" "sort" @@ -96,12 +97,14 @@ func BenchmarkMergeSlicesParallel(b *testing.B) { b.ReportAllocs() b.ResetTimer() var r []string + var err error for i := 0; i < b.N; i++ { if p == usingMap { r = sortUsingMap(input...) require.NotEmpty(b, r) } else { - r = MergeSlicesParallel(int(p), input...) + r, err = MergeSlicesParallel(context.Background(), int(p), input...) + require.NoError(b, err) require.NotEmpty(b, r) } } From d4a6a14c782c11ed5345be1cb8ea342dc2e4e8c8 Mon Sep 17 00:00:00 2001 From: Erlan Zholdubai uulu Date: Tue, 9 Apr 2024 09:00:12 -0700 Subject: [PATCH 2/2] add context cancellation checks on merging GetLabel slices, update changelog Signed-off-by: Erlan Zholdubai uulu --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index f34a231c55..7db368c7c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ * [ENHANCEMENT] Query: Added additional max query length check at Query Frontend and Ruler. Added `-querier.ignore-max-query-length` flag to disable max query length check at Querier. #5808 * [ENHANCEMENT] Querier: Add context error check when converting Metrics to SeriesSet for GetSeries on distributorQuerier. #5827 * [ENHANCEMENT] Ruler: Improve GetRules response time by refactoring mutexes and introducing a temporary rules cache in `ruler/manager.go`. #5805 +* [ENHANCEMENT] Querier: Add context error check when merging slices from ingesters for GetLabel operations. #5837 * [BUGFIX] Distributor: Do not use label with empty values for sharding #5717 * [BUGFIX] Query Frontend: queries with negative offset should check whether it is cacheable or not. #5719 * [BUGFIX] Redis Cache: pass `cache_size` config correctly. #5734