diff --git a/go.mod b/go.mod index 4567fd3f33d..2f6e40f981d 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b github.com/cespare/xxhash v1.1.0 github.com/dustin/go-humanize v1.0.0 - github.com/efficientgo/core v1.0.0-rc.0.0.20221201130417-ba593f67d2a4 + github.com/efficientgo/core v1.0.0-rc.2 github.com/facette/natsort v0.0.0-20181210072756-2cd4dd1e2dcb github.com/felixge/fgprof v0.9.3 github.com/go-kit/log v0.2.1 @@ -49,7 +49,7 @@ require ( github.com/sony/gobreaker v0.5.0 github.com/spf13/afero v1.9.3 github.com/stretchr/testify v1.8.1 - github.com/thanos-community/promql-engine v0.0.0-20230111121531-c293f65f5389 + github.com/thanos-community/promql-engine v0.0.0-20230123143605-0ad3f3b2e4b4 github.com/thanos-io/objstore v0.0.0-20221205132204-5aafc0079f06 github.com/thanos-io/thanos v0.29.1-0.20230103123855-3327c510076a github.com/uber/jaeger-client-go v2.30.0+incompatible diff --git a/go.sum b/go.sum index d73a45131d2..856173871a5 100644 --- a/go.sum +++ b/go.sum @@ -510,8 +510,8 @@ github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= github.com/edsrzf/mmap-go v1.1.0 h1:6EUwBLQ/Mcr1EYLE4Tn1VdW1A4ckqCQWZBw8Hr0kjpQ= github.com/edsrzf/mmap-go v1.1.0/go.mod h1:19H/e8pUPLicwkyNgOykDXkJ9F0MHE+Z52B8EIth78Q= -github.com/efficientgo/core v1.0.0-rc.0.0.20221201130417-ba593f67d2a4 h1:rydBwnBoywKQMjWF0z8SriYtQ+uUcaFsxuijMjJr5PI= -github.com/efficientgo/core v1.0.0-rc.0.0.20221201130417-ba593f67d2a4/go.mod h1:kQa0V74HNYMfuJH6jiPiwNdpWXl4xd/K4tzlrcvYDQI= +github.com/efficientgo/core v1.0.0-rc.2 h1:7j62qHLnrZqO3V3UA0AqOGd5d5aXV3AX6m/NZBHp78I= +github.com/efficientgo/core v1.0.0-rc.2/go.mod h1:FfGdkzWarkuzOlY04VY+bGfb1lWrjaL6x/GLcQ4vJps= github.com/efficientgo/e2e v0.13.1-0.20220923082810-8fa9daa8af8a h1:cnJajqeh/HjvJLhI3wPvWG9OQ4gU79+4pELRD5Pkih8= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= @@ -1507,8 +1507,8 @@ github.com/tchap/go-patricia v2.2.6+incompatible/go.mod h1:bmLyhP68RS6kStMGxByiQ github.com/tencentyun/cos-go-sdk-v5 v0.7.34 h1:xm+Pg+6m486y4eugRI7/E4WasbVmpY1hp9QBSRErgp8= github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e h1:f1Zsv7OAU9iQhZwigp50Yl38W10g/vd5NC8Rdk1Jzng= github.com/thanos-community/galaxycache v0.0.0-20211122094458-3a32041a1f1e/go.mod h1:jXcofnrSln/cLI6/dhlBxPQZEEQHVPCcFaH75M+nSzM= -github.com/thanos-community/promql-engine v0.0.0-20230111121531-c293f65f5389 h1:ZER0Tf+2PnNEd4bcOJFDQ48dOraRyvu5tfYWySMS2S4= -github.com/thanos-community/promql-engine v0.0.0-20230111121531-c293f65f5389/go.mod h1:GJkKOtKfXos1xbTmHBnX3YTCov8enxdAS1woR6/h4CI= +github.com/thanos-community/promql-engine v0.0.0-20230123143605-0ad3f3b2e4b4 h1:54iqf5p40TFGTc2Dp791P1/ncO2sTqvXdMLXqNpC/Gc= +github.com/thanos-community/promql-engine v0.0.0-20230123143605-0ad3f3b2e4b4/go.mod h1:d52Wfzxs6L3xhc2snodyWvM2bZMMVn0XT2q4vsfBPVo= github.com/thanos-io/objstore v0.0.0-20221205132204-5aafc0079f06 h1:xUnLk2CwIoJyv6OB4MWC3aOH9TnneSgM5kgTsOmXIuI= github.com/thanos-io/objstore v0.0.0-20221205132204-5aafc0079f06/go.mod h1:gdo4vwwonBnheHB/TCwAOUtKJKrLhLtbBVTQR9rN/v0= github.com/thanos-io/thanos v0.29.1-0.20230103123855-3327c510076a h1:oN3VupYNkavPRvdXwq71p54SAFSbOGvL0qL7CeKFrJ0= diff --git a/vendor/github.com/efficientgo/core/testutil/testutil.go b/vendor/github.com/efficientgo/core/testutil/testutil.go index c74f368507b..c88191bc10d 100644 --- a/vendor/github.com/efficientgo/core/testutil/testutil.go +++ b/vendor/github.com/efficientgo/core/testutil/testutil.go @@ -14,8 +14,19 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/efficientgo/core/errors" "github.com/efficientgo/core/testutil/internal" + "github.com/google/go-cmp/cmp" ) +const limitOfElemChars = 1e3 + +func withLimitf(f string, v ...interface{}) string { + s := fmt.Sprintf(f, v...) + if len(s) > limitOfElemChars { + return s[:limitOfElemChars] + "...(output trimmed)" + } + return s +} + // Assert fails the test if the condition is false. func Assert(tb testing.TB, condition bool, v ...interface{}) { tb.Helper() @@ -28,7 +39,7 @@ func Assert(tb testing.TB, condition bool, v ...interface{}) { if len(v) > 0 { msg = fmt.Sprintf(v[0].(string), v[1:]...) } - tb.Fatalf("\033[31m%s:%d: "+msg+"\033[39m\n\n", filepath.Base(file), line) + tb.Fatalf("\033[31m%s:%d: \"%s\"\033[39m\n\n", filepath.Base(file), line, withLimitf(msg)) } // Ok fails the test if an err is not nil. @@ -43,7 +54,7 @@ func Ok(tb testing.TB, err error, v ...interface{}) { if len(v) > 0 { msg = fmt.Sprintf(v[0].(string), v[1:]...) } - tb.Fatalf("\033[31m%s:%d:"+msg+"\n\n unexpected error: %s\033[39m\n\n", filepath.Base(file), line, err.Error()) + tb.Fatalf("\033[31m%s:%d: \"%s\"\n\n unexpected error: %s\033[39m\n\n", filepath.Base(file), line, withLimitf(msg), withLimitf(err.Error())) } // NotOk fails the test if an err is nil. @@ -58,7 +69,7 @@ func NotOk(tb testing.TB, err error, v ...interface{}) { if len(v) > 0 { msg = fmt.Sprintf(v[0].(string), v[1:]...) } - tb.Fatalf("\033[31m%s:%d:"+msg+"\n\n expected error, got nothing \033[39m\n\n", filepath.Base(file), line) + tb.Fatalf("\033[31m%s:%d: \"%s\"\n\n expected error, got nothing \033[39m\n\n", filepath.Base(file), line, withLimitf(msg)) } // Equals fails the test if exp is not equal to act. @@ -67,13 +78,41 @@ func Equals(tb testing.TB, exp, act interface{}, v ...interface{}) { if reflect.DeepEqual(exp, act) { return } - _, file, line, _ := runtime.Caller(1) + fatalNotEqual(tb, exp, act, v...) +} + +func fatalNotEqual(tb testing.TB, exp, act interface{}, v ...interface{}) { + _, file, line, _ := runtime.Caller(2) var msg string if len(v) > 0 { msg = fmt.Sprintf(v[0].(string), v[1:]...) } - tb.Fatal(sprintfWithLimit("\033[31m%s:%d:"+msg+"\n\n\texp: %#v\n\n\tgot: %#v%s\033[39m\n\n", filepath.Base(file), line, exp, act, diff(exp, act))) + tb.Fatalf( + "\033[31m%s:%d: \"%s\"\n\n\texp: %s\n\n\tgot: %s%s\033[39m\n\n", + filepath.Base(file), line, withLimitf(msg), withLimitf("%#v", exp), withLimitf("%#v", act), withLimitf(diff(exp, act)), + ) +} + +type goCmp struct { + opts cmp.Options +} + +// WithGoCmp allows specifying options and using https://github.com/google/go-cmp +// for equality comparisons. The compatibility guarantee of this function's arguments +// are the same as go-cmp (no guarantee due to v0.x). +func WithGoCmp(opts ...cmp.Option) goCmp { + return goCmp{opts: opts} +} + +// Equals uses go-cmp for comparing equality between two structs, and can be used with +// various options defined in go-cmp/cmp and go-cmp/cmp/cmpopts. +func (o goCmp) Equals(tb testing.TB, exp, act interface{}, v ...interface{}) { + tb.Helper() + if cmp.Equal(exp, act, o.opts) { + return + } + fatalNotEqual(tb, exp, act, v...) } // FaultOrPanicToErr returns error if panic of fault was triggered during execution of function. @@ -98,7 +137,7 @@ func ContainsStringSlice(tb testing.TB, haystack, needle []string) { _, file, line, _ := runtime.Caller(1) if !contains(haystack, needle) { - tb.Fatalf(sprintfWithLimit("\033[31m%s:%d: %#v does not contain %#v\033[39m\n\n", filepath.Base(file), line, haystack, needle)) + tb.Fatalf("\033[31m%s:%d: %s does not contain %s\033[39m\n\n", filepath.Base(file), line, withLimitf("%#v", haystack), withLimitf("%#v", needle)) } } @@ -138,14 +177,6 @@ func contains(haystack, needle []string) bool { return false } -func sprintfWithLimit(act string, v ...interface{}) string { - s := fmt.Sprintf(act, v...) - if len(s) > 10000 { - return s[:10000] + "...(output trimmed)" - } - return s -} - func typeAndKind(v interface{}) (reflect.Type, reflect.Kind) { t := reflect.TypeOf(v) k := t.Kind() diff --git a/vendor/github.com/thanos-community/promql-engine/api/remote.go b/vendor/github.com/thanos-community/promql-engine/api/remote.go index 2f8c9f2c146..d93ade96046 100644 --- a/vendor/github.com/thanos-community/promql-engine/api/remote.go +++ b/vendor/github.com/thanos-community/promql-engine/api/remote.go @@ -6,6 +6,7 @@ package api import ( "time" + "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" ) @@ -14,6 +15,8 @@ type RemoteEndpoints interface { } type RemoteEngine interface { + MaxT() int64 + LabelSets() []labels.Labels NewInstantQuery(opts *promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) NewRangeQuery(opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) } diff --git a/vendor/github.com/thanos-community/promql-engine/engine/engine.go b/vendor/github.com/thanos-community/promql-engine/engine/engine.go index 399c0c55c3b..55f25b4c28f 100644 --- a/vendor/github.com/thanos-community/promql-engine/engine/engine.go +++ b/vendor/github.com/thanos-community/promql-engine/engine/engine.go @@ -6,6 +6,7 @@ package engine import ( "context" + "github.com/prometheus/prometheus/model/labels" v1 "github.com/prometheus/prometheus/web/api/v1" "io" @@ -63,29 +64,41 @@ func (o Opts) getLogicalOptimizers() []logicalplan.Optimizer { return o.LogicalOptimizers } -type localEngine struct { - q storage.Queryable - engine *compatibilityEngine +type remoteEngine struct { + q storage.Queryable + engine *compatibilityEngine + labelSets []labels.Labels + maxt int64 } -func NewLocalEngine(opts Opts, q storage.Queryable) *localEngine { - return &localEngine{ - q: q, - engine: New(opts), +func NewRemoteEngine(opts Opts, q storage.Queryable, maxt int64, labelSets []labels.Labels) *remoteEngine { + return &remoteEngine{ + q: q, + labelSets: labelSets, + maxt: maxt, + engine: New(opts), } } -func (l localEngine) NewInstantQuery(opts *promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) { +func (l remoteEngine) MaxT() int64 { + return l.maxt +} + +func (l remoteEngine) LabelSets() []labels.Labels { + return l.labelSets +} + +func (l remoteEngine) NewInstantQuery(opts *promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) { return l.engine.NewInstantQuery(l.q, opts, qs, ts) } -func (l localEngine) NewRangeQuery(opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) { +func (l remoteEngine) NewRangeQuery(opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) { return l.engine.NewRangeQuery(l.q, opts, qs, start, end, interval) } type distributedEngine struct { - endpoints api.RemoteEndpoints - localEngine *compatibilityEngine + endpoints api.RemoteEndpoints + remoteEngine *compatibilityEngine } func NewDistributedEngine(opts Opts, endpoints api.RemoteEndpoints) v1.QueryEngine { @@ -94,19 +107,19 @@ func NewDistributedEngine(opts Opts, endpoints api.RemoteEndpoints) v1.QueryEngi logicalplan.DistributedExecutionOptimizer{Endpoints: endpoints}, ) return &distributedEngine{ - endpoints: endpoints, - localEngine: New(opts), + endpoints: endpoints, + remoteEngine: New(opts), } } func (l distributedEngine) SetQueryLogger(log promql.QueryLogger) {} func (l distributedEngine) NewInstantQuery(q storage.Queryable, opts *promql.QueryOpts, qs string, ts time.Time) (promql.Query, error) { - return l.localEngine.NewInstantQuery(q, opts, qs, ts) + return l.remoteEngine.NewInstantQuery(q, opts, qs, ts) } func (l distributedEngine) NewRangeQuery(q storage.Queryable, opts *promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error) { - return l.localEngine.NewRangeQuery(q, opts, qs, start, end, interval) + return l.remoteEngine.NewRangeQuery(q, opts, qs, start, end, interval) } func New(opts Opts) *compatibilityEngine { @@ -174,11 +187,12 @@ func (e *compatibilityEngine) NewInstantQuery(q storage.Queryable, opts *promql. } return &compatibilityQuery{ - Query: &Query{exec: exec}, - engine: e, - expr: expr, - ts: ts, - t: InstantQuery, + Query: &Query{exec: exec, opts: opts}, + engine: e, + expr: expr, + ts: ts, + t: InstantQuery, + resultSort: newResultSort(expr), }, nil } @@ -211,7 +225,7 @@ func (e *compatibilityEngine) NewRangeQuery(q storage.Queryable, opts *promql.Qu } return &compatibilityQuery{ - Query: &Query{exec: exec}, + Query: &Query{exec: exec, opts: opts}, engine: e, expr: expr, t: RangeQuery, @@ -220,6 +234,7 @@ func (e *compatibilityEngine) NewRangeQuery(q storage.Queryable, opts *promql.Qu type Query struct { exec model.VectorOperator + opts *promql.QueryOpts } // Explain returns human-readable explanation of the created executor. @@ -232,12 +247,83 @@ func (q *Query) Profile() { // TODO(bwplotka): Return profile. } +type sortOrder bool + +const ( + sortOrderAsc sortOrder = false + sortOrderDesc sortOrder = true +) + +type resultSort struct { + sortByValues bool + sortOrder sortOrder + sortingLabels []string + groupBy bool +} + +func newResultSort(expr parser.Expr) resultSort { + aggr, ok := expr.(*parser.AggregateExpr) + if !ok { + return resultSort{} + } + + switch aggr.Op { + case parser.TOPK: + return resultSort{ + sortByValues: true, + sortingLabels: aggr.Grouping, + sortOrder: sortOrderDesc, + groupBy: !aggr.Without, + } + case parser.BOTTOMK: + return resultSort{ + sortByValues: true, + sortingLabels: aggr.Grouping, + sortOrder: sortOrderAsc, + groupBy: !aggr.Without, + } + default: + return resultSort{} + } +} + +func (s resultSort) comparer(samples *promql.Vector) func(i int, j int) bool { + return func(i int, j int) bool { + if !s.sortByValues { + return i < j + } + + var iLbls labels.Labels + var jLbls labels.Labels + iLb := labels.NewBuilder((*samples)[i].Metric) + jLb := labels.NewBuilder((*samples)[j].Metric) + if s.groupBy { + iLbls = iLb.Keep(s.sortingLabels...).Labels(nil) + jLbls = jLb.Keep(s.sortingLabels...).Labels(nil) + } else { + iLbls = iLb.Del(s.sortingLabels...).Labels(nil) + jLbls = jLb.Del(s.sortingLabels...).Labels(nil) + } + + lblsCmp := labels.Compare(iLbls, jLbls) + if lblsCmp != 0 { + return lblsCmp < 0 + } + + if s.sortOrder == sortOrderAsc { + return (*samples)[i].V < (*samples)[j].V + } + return (*samples)[i].V > (*samples)[j].V + } +} + type compatibilityQuery struct { *Query - engine *compatibilityEngine - expr parser.Expr - ts time.Time // Empty for range queries. - t QueryType + engine *compatibilityEngine + expr parser.Expr + ts time.Time // Empty for range queries. + t QueryType + resultSort resultSort cancel context.CancelFunc } @@ -351,6 +437,7 @@ loop: }, }) } + sort.Slice(vector, q.resultSort.comparer(&vector)) result = vector case parser.ValueTypeScalar: v := math.NaN() @@ -378,7 +465,14 @@ func newErrResult(r *promql.Result, err error) *promql.Result { func (q *compatibilityQuery) Statement() parser.Statement { return nil } -func (q *compatibilityQuery) Stats() *stats.Statistics { return &stats.Statistics{} } +// Stats always returns empty query stats for now to avoid panic. +func (q *compatibilityQuery) Stats() *stats.Statistics { + var enablePerStepStats bool + if q.opts != nil { + enablePerStepStats = q.opts.EnablePerStepStats + } + return &stats.Statistics{Timers: stats.NewQueryTimers(), Samples: stats.NewQuerySamples(enablePerStepStats)} +} func (q *compatibilityQuery) Close() { q.Cancel() } diff --git a/vendor/github.com/thanos-community/promql-engine/execution/exchange/coalesce.go b/vendor/github.com/thanos-community/promql-engine/execution/exchange/coalesce.go index 23b45ae86ef..8e5cc15cfbf 100644 --- a/vendor/github.com/thanos-community/promql-engine/execution/exchange/coalesce.go +++ b/vendor/github.com/thanos-community/promql-engine/execution/exchange/coalesce.go @@ -6,6 +6,7 @@ package exchange import ( "context" "sync" + "sync/atomic" "github.com/efficientgo/core/errors" "github.com/prometheus/prometheus/model/labels" @@ -25,34 +26,42 @@ func (c errorChan) getError() error { return nil } -type coalesceOperator struct { +// coalesce is a model.VectorOperator that merges input vectors from multiple downstream operators +// into a single output vector. +// coalesce guarantees that samples from different input vectors will be added to the output in the same order +// as the input vectors themselves are provided in NewCoalesce. +type coalesce struct { once sync.Once series []labels.Labels - pool *model.VectorPool - mu sync.Mutex - wg sync.WaitGroup - operators []model.VectorOperator + pool *model.VectorPool + wg sync.WaitGroup + operators []model.VectorOperator + + // inVectors is an internal per-step cache for references to input vectors. + inVectors [][]model.StepVector + // sampleOffsets holds per-operator offsets needed to map an input sample ID to an output sample ID. sampleOffsets []uint64 } func NewCoalesce(pool *model.VectorPool, operators ...model.VectorOperator) model.VectorOperator { - return &coalesceOperator{ + return &coalesce{ pool: pool, - operators: operators, sampleOffsets: make([]uint64, len(operators)), + operators: operators, + inVectors: make([][]model.StepVector, len(operators)), } } -func (c *coalesceOperator) Explain() (me string, next []model.VectorOperator) { - return "[*coalesceOperator]", c.operators +func (c *coalesce) Explain() (me string, next []model.VectorOperator) { + return "[*coalesce]", c.operators } -func (c *coalesceOperator) GetPool() *model.VectorPool { +func (c *coalesce) GetPool() *model.VectorPool { return c.pool } -func (c *coalesceOperator) Series(ctx context.Context) ([]labels.Labels, error) { +func (c *coalesce) Series(ctx context.Context) ([]labels.Labels, error) { var err error c.once.Do(func() { err = c.loadSeries(ctx) }) if err != nil { @@ -61,7 +70,7 @@ func (c *coalesceOperator) Series(ctx context.Context) ([]labels.Labels, error) return c.series, nil } -func (c *coalesceOperator) Next(ctx context.Context) ([]model.StepVector, error) { +func (c *coalesce) Next(ctx context.Context) ([]model.StepVector, error) { select { case <-ctx.Done(): return nil, ctx.Err() @@ -74,7 +83,6 @@ func (c *coalesceOperator) Next(ctx context.Context) ([]model.StepVector, error) return nil, err } - var out []model.StepVector = nil var errChan = make(errorChan, len(c.operators)) for idx, o := range c.operators { c.wg.Add(1) @@ -86,36 +94,14 @@ func (c *coalesceOperator) Next(ctx context.Context) ([]model.StepVector, error) errChan <- err return } - if in == nil { - return - } + // Map input IDs to output IDs. for _, vector := range in { for i := range vector.SampleIDs { vector.SampleIDs[i] += c.sampleOffsets[opIdx] } } - - c.mu.Lock() - defer c.mu.Unlock() - - if len(in) > 0 && out == nil { - out = c.pool.GetVectorBatch() - for i := 0; i < len(in); i++ { - out = append(out, c.pool.GetStepVector(in[i].T)) - } - } - - for i := 0; i < len(in); i++ { - if len(in[i].Samples) > 0 { - out[i].T = in[i].T - } - - out[i].Samples = append(out[i].Samples, in[i].Samples...) - out[i].SampleIDs = append(out[i].SampleIDs, in[i].SampleIDs...) - o.GetPool().PutStepVector(in[i]) - } - o.GetPool().PutVectors(in) + c.inVectors[opIdx] = in }(idx, o) } c.wg.Wait() @@ -125,6 +111,24 @@ func (c *coalesceOperator) Next(ctx context.Context) ([]model.StepVector, error) return nil, err } + var out []model.StepVector = nil + for opIdx, vector := range c.inVectors { + if len(vector) > 0 && out == nil { + out = c.pool.GetVectorBatch() + for i := 0; i < len(vector); i++ { + out = append(out, c.pool.GetStepVector(vector[i].T)) + } + } + + for i := 0; i < len(vector); i++ { + out[i].Samples = append(out[i].Samples, vector[i].Samples...) + out[i].SampleIDs = append(out[i].SampleIDs, vector[i].SampleIDs...) + c.operators[opIdx].GetPool().PutStepVector(vector[i]) + } + c.inVectors[opIdx] = nil + c.operators[opIdx].GetPool().PutVectors(vector) + } + if out == nil { return nil, nil } @@ -132,9 +136,8 @@ func (c *coalesceOperator) Next(ctx context.Context) ([]model.StepVector, error) return out, nil } -func (c *coalesceOperator) loadSeries(ctx context.Context) error { +func (c *coalesce) loadSeries(ctx context.Context) error { var wg sync.WaitGroup - var mu sync.Mutex var numSeries uint64 allSeries := make([][]labels.Labels, len(c.operators)) errChan := make(errorChan, len(c.operators)) @@ -161,9 +164,7 @@ func (c *coalesceOperator) loadSeries(ctx context.Context) error { } allSeries[i] = series - mu.Lock() - numSeries += uint64(len(series)) - mu.Unlock() + atomic.AddUint64(&numSeries, uint64(len(series))) }(i) } wg.Wait() @@ -172,13 +173,11 @@ func (c *coalesceOperator) loadSeries(ctx context.Context) error { return err } - var offset uint64 c.sampleOffsets = make([]uint64, len(c.operators)) c.series = make([]labels.Labels, 0, numSeries) for i, series := range allSeries { - c.sampleOffsets[i] = offset + c.sampleOffsets[i] = uint64(len(c.series)) c.series = append(c.series, series...) - offset += uint64(len(series)) } c.pool.SetStepSize(len(c.series)) diff --git a/vendor/github.com/thanos-community/promql-engine/execution/exchange/dedup.go b/vendor/github.com/thanos-community/promql-engine/execution/exchange/dedup.go new file mode 100644 index 00000000000..517aed72ab3 --- /dev/null +++ b/vendor/github.com/thanos-community/promql-engine/execution/exchange/dedup.go @@ -0,0 +1,141 @@ +// Copyright (c) The Thanos Community Authors. +// Licensed under the Apache License 2.0. + +package exchange + +import ( + "context" + "sync" + + "github.com/prometheus/prometheus/model/labels" + + "github.com/thanos-community/promql-engine/execution/model" +) + +type dedupSample struct { + t int64 + v float64 +} + +// The dedupCache is an internal cache used to deduplicate samples inside a single step vector. +type dedupCache []dedupSample + +// dedupOperator is a model.VectorOperator that deduplicates samples with +// same IDs inside a single model.StepVector. +// Deduplication is done using a last-sample-wins strategy, which means that +// if multiple samples with the same ID are present in a StepVector, dedupOperator +// will keep the last sample in that vector. +type dedupOperator struct { + once sync.Once + series []labels.Labels + + pool *model.VectorPool + next model.VectorOperator + // outputIndex is a slice that is used as an index from input sample ID to output sample ID. + outputIndex []uint64 + dedupCache dedupCache +} + +func NewDedupOperator(pool *model.VectorPool, next model.VectorOperator) model.VectorOperator { + return &dedupOperator{ + next: next, + pool: pool, + } +} + +func (d *dedupOperator) Next(ctx context.Context) ([]model.StepVector, error) { + var err error + d.once.Do(func() { err = d.loadSeries(ctx) }) + if err != nil { + return nil, err + } + + in, err := d.next.Next(ctx) + if err != nil { + return nil, err + } + if in == nil { + return nil, nil + } + + result := d.pool.GetVectorBatch() + for _, vector := range in { + for i, inputSampleID := range vector.SampleIDs { + outputSampleID := d.outputIndex[inputSampleID] + d.dedupCache[outputSampleID].t = vector.T + d.dedupCache[outputSampleID].v = vector.Samples[i] + } + + out := d.pool.GetStepVector(vector.T) + for outputSampleID, sample := range d.dedupCache { + // To avoid clearing the dedup cache for each step vector, we use the `t` field + // to detect whether a sample for the current step should be mapped to the output. + // If the timestamp of the sample does not match the input vector timestamp, it means that + // the sample was added in a previous iteration and should be skipped. + if sample.t == vector.T { + out.SampleIDs = append(out.SampleIDs, uint64(outputSampleID)) + out.Samples = append(out.Samples, sample.v) + } + } + result = append(result, out) + } + + return result, nil +} + +func (d *dedupOperator) Series(ctx context.Context) ([]labels.Labels, error) { + var err error + d.once.Do(func() { err = d.loadSeries(ctx) }) + if err != nil { + return nil, err + } + return d.series, nil +} + +func (d *dedupOperator) GetPool() *model.VectorPool { + return d.pool +} + +func (d *dedupOperator) Explain() (me string, next []model.VectorOperator) { + return "[*dedup]", []model.VectorOperator{d.next} +} + +func (d *dedupOperator) loadSeries(ctx context.Context) error { + series, err := d.next.Series(ctx) + if err != nil { + return err + } + + outputIndex := make(map[uint64]uint64) + inputIndex := make([]uint64, len(series)) + hashBuf := make([]byte, 0, 128) + for inputSeriesID, inputSeries := range series { + hash := hashSeries(hashBuf, inputSeries) + + inputIndex[inputSeriesID] = hash + outputSeriesID, ok := outputIndex[hash] + if !ok { + outputSeriesID = uint64(len(d.series)) + d.series = append(d.series, inputSeries) + } + outputIndex[hash] = outputSeriesID + } + + d.outputIndex = make([]uint64, len(inputIndex)) + for inputSeriesID, hash := range inputIndex { + outputSeriesID := outputIndex[hash] + d.outputIndex[inputSeriesID] = outputSeriesID + } + d.dedupCache = make(dedupCache, len(outputIndex)) + for i := range d.dedupCache { + d.dedupCache[i].t = -1 + } + + return nil +} + +func hashSeries(hashBuf []byte, inputSeries labels.Labels) uint64 { + hashBuf = hashBuf[:0] + hash, _ := inputSeries.HashWithoutLabels(hashBuf) + return hash +} diff --git a/vendor/github.com/thanos-community/promql-engine/execution/execution.go b/vendor/github.com/thanos-community/promql-engine/execution/execution.go index c563d1bca2a..28ff04ca4d7 100644 --- a/vendor/github.com/thanos-community/promql-engine/execution/execution.go +++ b/vendor/github.com/thanos-community/promql-engine/execution/execution.go @@ -18,6 +18,7 @@ package execution import ( "runtime" + "sort" "time" "github.com/prometheus/prometheus/promql" @@ -236,7 +237,15 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O } return step_invariant.NewStepInvariantOperator(model.NewVectorPool(stepsBatch), next, e.Expr, opts, stepsBatch) - case logicalplan.Coalesce: + case logicalplan.Deduplicate: + // The Deduplicate operator will deduplicate samples using a last-sample-wins strategy. + // Sorting engines by MaxT ensures that samples produced due to + // staleness will be overwritten and corrected by samples coming from + // engines with a higher max time. + sort.Slice(e.Expressions, func(i, j int) bool { + return e.Expressions[i].Engine.MaxT() < e.Expressions[j].Engine.MaxT() + }) + operators := make([]model.VectorOperator, len(e.Expressions)) for i, expr := range e.Expressions { operator, err := newOperator(expr, storage, opts, hints) @@ -245,9 +254,11 @@ func newOperator(expr parser.Expr, storage *engstore.SelectorPool, opts *query.O } operators[i] = operator } - return exchange.NewCoalesce(model.NewVectorPool(stepsBatch), operators...), nil + coalesce := exchange.NewCoalesce(model.NewVectorPool(stepsBatch), operators...) + dedup := exchange.NewDedupOperator(model.NewVectorPool(stepsBatch), coalesce) + return exchange.NewConcurrent(dedup, 2), nil - case *logicalplan.RemoteExecution: + case logicalplan.RemoteExecution: qry, err := e.Engine.NewRangeQuery(&promql.QueryOpts{}, e.Query, opts.Start, opts.End, opts.Step) if err != nil { return nil, err diff --git a/vendor/github.com/thanos-community/promql-engine/logicalplan/distribute.go b/vendor/github.com/thanos-community/promql-engine/logicalplan/distribute.go index d332e7c5235..36e9af15120 100644 --- a/vendor/github.com/thanos-community/promql-engine/logicalplan/distribute.go +++ b/vendor/github.com/thanos-community/promql-engine/logicalplan/distribute.go @@ -5,28 +5,26 @@ package logicalplan import ( "fmt" + "sort" + "strings" "github.com/prometheus/prometheus/promql/parser" "github.com/thanos-community/promql-engine/api" ) -type Coalesce struct { - Expressions parser.Expressions -} +type RemoteExecutions []RemoteExecution -func (r Coalesce) String() string { - return fmt.Sprintf("coalesce(%s)", r.Expressions) +func (rs RemoteExecutions) String() string { + parts := make([]string, len(rs)) + for i, r := range rs { + parts[i] = r.String() + } + return strings.Join(parts, ", ") } -func (r Coalesce) Pretty(level int) string { return r.String() } - -func (r Coalesce) PositionRange() parser.PositionRange { return parser.PositionRange{} } - -func (r Coalesce) Type() parser.ValueType { return parser.ValueTypeMatrix } - -func (r Coalesce) PromQLExpr() {} - +// RemoteExecution is a logical plan that describes a +// remote execution of a Query against the given PromQL Engine. type RemoteExecution struct { Engine api.RemoteEngine Query string @@ -44,6 +42,25 @@ func (r RemoteExecution) Type() parser.ValueType { return parser.ValueTypeMatrix func (r RemoteExecution) PromQLExpr() {} +// Deduplicate is a logical plan which deduplicates samples from multiple RemoteExecutions. +type Deduplicate struct { + Expressions RemoteExecutions +} + +func (r Deduplicate) String() string { + return fmt.Sprintf("dedup(%s)", r.Expressions.String()) +} + +func (r Deduplicate) Pretty(level int) string { return r.String() } + +func (r Deduplicate) PositionRange() parser.PositionRange { return parser.PositionRange{} } + +func (r Deduplicate) Type() parser.ValueType { return parser.ValueTypeMatrix } + +func (r Deduplicate) PromQLExpr() {} + +// distributiveAggregations are all PromQL aggregations which support +// distributed execution. var distributiveAggregations = map[parser.ItemType]struct{}{ parser.SUM: {}, parser.MIN: {}, @@ -62,6 +79,7 @@ type DistributedExecutionOptimizer struct { func (m DistributedExecutionOptimizer) Optimize(plan parser.Expr) parser.Expr { engines := m.Endpoints.Engines() + traverseBottomUp(nil, &plan, func(parent, current *parser.Expr) (stop bool) { // If the current operation is not distributive, stop the traversal. if !isDistributive(current) { @@ -75,7 +93,9 @@ func (m DistributedExecutionOptimizer) Optimize(plan parser.Expr) parser.Expr { if aggr.Op == parser.COUNT { localAggregation = parser.SUM } - subQueries := m.makeSubQueries(current, engines) + + remoteAggregation := newRemoteAggregation(aggr, engines) + subQueries := m.makeSubQueries(&remoteAggregation, engines) *current = &parser.AggregateExpr{ Op: localAggregation, Expr: subQueries, @@ -99,17 +119,47 @@ func (m DistributedExecutionOptimizer) Optimize(plan parser.Expr) parser.Expr { return plan } -func (m DistributedExecutionOptimizer) makeSubQueries(current *parser.Expr, engines []api.RemoteEngine) Coalesce { - remoteQueries := Coalesce{ - Expressions: make(parser.Expressions, len(engines)), +func newRemoteAggregation(rootAggregation *parser.AggregateExpr, engines []api.RemoteEngine) parser.Expr { + groupingSet := make(map[string]struct{}) + for _, lbl := range rootAggregation.Grouping { + groupingSet[lbl] = struct{}{} } + + for _, engine := range engines { + for _, lbls := range engine.LabelSets() { + for _, lbl := range lbls { + if rootAggregation.Without { + delete(groupingSet, lbl.Name) + } else { + groupingSet[lbl.Name] = struct{}{} + } + } + } + } + + groupingLabels := make([]string, 0, len(groupingSet)) + for lbl := range groupingSet { + groupingLabels = append(groupingLabels, lbl) + } + sort.Strings(groupingLabels) + + remoteAggregation := *rootAggregation + remoteAggregation.Grouping = groupingLabels + return &remoteAggregation +} + +func (m DistributedExecutionOptimizer) makeSubQueries(current *parser.Expr, engines []api.RemoteEngine) Deduplicate { + remoteQueries := make(RemoteExecutions, len(engines)) for i := 0; i < len(engines); i++ { - remoteQueries.Expressions[i] = &RemoteExecution{ + remoteQueries[i] = RemoteExecution{ Engine: engines[i], Query: (*current).String(), } } - return remoteQueries + + return Deduplicate{ + Expressions: remoteQueries, + } } func isDistributive(expr *parser.Expr) bool { diff --git a/vendor/github.com/thanos-community/promql-engine/logicalplan/plan.go b/vendor/github.com/thanos-community/promql-engine/logicalplan/plan.go index 8d8720d3013..682a86d299f 100644 --- a/vendor/github.com/thanos-community/promql-engine/logicalplan/plan.go +++ b/vendor/github.com/thanos-community/promql-engine/logicalplan/plan.go @@ -84,6 +84,8 @@ func traverse(expr *parser.Expr, transform func(*parser.Expr)) { func traverseBottomUp(parent *parser.Expr, current *parser.Expr, transform func(parent *parser.Expr, node *parser.Expr) bool) bool { switch node := (*current).(type) { + case *parser.NumberLiteral: + return false case *parser.StepInvariantExpr: return traverseBottomUp(current, &node.Expr, transform) case *parser.VectorSelector: @@ -96,8 +98,8 @@ func traverseBottomUp(parent *parser.Expr, current *parser.Expr, transform func( } return transform(parent, current) case *parser.Call: - for _, n := range node.Args { - if stop := traverseBottomUp(current, &n, transform); stop { + for i := range node.Args { + if stop := traverseBottomUp(current, &node.Args[i], transform); stop { return stop } } diff --git a/vendor/modules.txt b/vendor/modules.txt index c17d9e2ebc8..4c82195c0b9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -265,7 +265,7 @@ github.com/dustin/go-humanize # github.com/edsrzf/mmap-go v1.1.0 ## explicit; go 1.17 github.com/edsrzf/mmap-go -# github.com/efficientgo/core v1.0.0-rc.0.0.20221201130417-ba593f67d2a4 +# github.com/efficientgo/core v1.0.0-rc.2 ## explicit; go 1.17 github.com/efficientgo/core/errcapture github.com/efficientgo/core/errors @@ -792,7 +792,7 @@ github.com/stretchr/objx github.com/stretchr/testify/assert github.com/stretchr/testify/mock github.com/stretchr/testify/require -# github.com/thanos-community/promql-engine v0.0.0-20230111121531-c293f65f5389 +# github.com/thanos-community/promql-engine v0.0.0-20230123143605-0ad3f3b2e4b4 ## explicit; go 1.19 github.com/thanos-community/promql-engine/api github.com/thanos-community/promql-engine/engine