diff --git a/CHANGELOG.md b/CHANGELOG.md index c4e3778c7c..d75b191439 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ * [CHANGE] Compactor/Bucket Store: Added `-blocks-storage.bucket-store.block-discovery-strategy` to configure different block listing strategy. Reverted the current recursive block listing mechanism and use the strategy `Concurrent` as in 1.15. #5828 * [CHANGE] Compactor: Don't halt compactor when overlapped source blocks detected. #5854 * [CHANGE] S3 Bucket Client: Expose `-blocks-storage.s3.send-content-md5` flag and set default checksum algorithm to MD5. #5870 +* [CHANGE] Querier: Mark `querier.iterators` and `querier.batch-iterators` flags as deprecated. Now querier always use batch iterators. #5868 * [FEATURE] OTLP ingestion experimental. #5813 * [FEATURE] Ingester: Add per-tenant new metric `cortex_ingester_tsdb_data_replay_duration_seconds`. #5477 * [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 1aabad8522..3b0c998eeb 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -104,16 +104,6 @@ querier: # CLI flag: -querier.timeout [timeout: | default = 2m] - # Use iterators to execute query, as opposed to fully materialising the series - # in memory. - # CLI flag: -querier.iterators - [iterators: | default = false] - - # Use batch iterators to execute query, as opposed to fully materialising the - # series in memory. Takes precedent over the -querier.iterators flag. - # CLI flag: -querier.batch-iterators - [batch_iterators: | default = true] - # Use streaming RPCs for metadata APIs from ingester. # CLI flag: -querier.ingester-metadata-streaming [ingester_metadata_streaming: | default = false] diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 78b600dd72..f0da1dbed5 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3609,16 +3609,6 @@ The `querier_config` configures the Cortex querier. # CLI flag: -querier.timeout [timeout: | default = 2m] -# Use iterators to execute query, as opposed to fully materialising the series -# in memory. -# CLI flag: -querier.iterators -[iterators: | default = false] - -# Use batch iterators to execute query, as opposed to fully materialising the -# series in memory. Takes precedent over the -querier.iterators flag. -# CLI flag: -querier.batch-iterators -[batch_iterators: | default = true] - # Use streaming RPCs for metadata APIs from ingester. # CLI flag: -querier.ingester-metadata-streaming [ingester_metadata_streaming: | default = false] diff --git a/pkg/chunk/encoding/instrumentation.go b/pkg/chunk/encoding/instrumentation.go deleted file mode 100644 index 241b88c48b..0000000000 --- a/pkg/chunk/encoding/instrumentation.go +++ /dev/null @@ -1,91 +0,0 @@ -// This file was taken from Prometheus (https://github.com/prometheus/prometheus). -// The original license header is included below: -// -// Copyright 2014 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package encoding - -import "github.com/prometheus/client_golang/prometheus" - -// Usually, a separate file for instrumentation is frowned upon. Metrics should -// be close to where they are used. However, the metrics below are set all over -// the place, so we go for a separate instrumentation file in this case. -var ( - Ops = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "chunk_ops_total", - Help: "The total number of chunk operations by their type.", - }, - []string{OpTypeLabel}, - ) - DescOps = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "chunkdesc_ops_total", - Help: "The total number of chunk descriptor operations by their type.", - }, - []string{OpTypeLabel}, - ) - NumMemDescs = prometheus.NewGauge(prometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: subsystem, - Name: "memory_chunkdescs", - Help: "The current number of chunk descriptors in memory.", - }) -) - -const ( - namespace = "prometheus" - subsystem = "local_storage" - - // OpTypeLabel is the label name for chunk operation types. - OpTypeLabel = "type" - - // Op-types for ChunkOps. - - // CreateAndPin is the label value for create-and-pin chunk ops. - CreateAndPin = "create" // A Desc creation with refCount=1. - // PersistAndUnpin is the label value for persist chunk ops. - PersistAndUnpin = "persist" - // Pin is the label value for pin chunk ops (excludes pin on creation). - Pin = "pin" - // Unpin is the label value for unpin chunk ops (excludes the unpin on persisting). - Unpin = "unpin" - // Transcode is the label value for transcode chunk ops. - Transcode = "transcode" - // Drop is the label value for drop chunk ops. - Drop = "drop" - - // Op-types for ChunkOps and ChunkDescOps. - - // Evict is the label value for evict chunk desc ops. - Evict = "evict" - // Load is the label value for load chunk and chunk desc ops. - Load = "load" -) - -func init() { - prometheus.MustRegister(Ops) - prometheus.MustRegister(DescOps) - prometheus.MustRegister(NumMemDescs) -} - -// NumMemChunks is the total number of chunks in memory. This is a global -// counter, also used internally, so not implemented as metrics. Collected in -// MemorySeriesStorage. -// TODO(beorn7): Having this as an exported global variable is really bad. -var NumMemChunks int64 diff --git a/pkg/chunk/opts.go b/pkg/chunk/opts.go deleted file mode 100644 index 3384ecd7a4..0000000000 --- a/pkg/chunk/opts.go +++ /dev/null @@ -1,60 +0,0 @@ -package chunk - -import ( - "strings" - "unicode/utf8" -) - -// Bitmap used by func isRegexMetaCharacter to check whether a character needs to be escaped. -var regexMetaCharacterBytes [16]byte - -// isRegexMetaCharacter reports whether byte b needs to be escaped. -func isRegexMetaCharacter(b byte) bool { - return b < utf8.RuneSelf && regexMetaCharacterBytes[b%16]&(1<<(b/16)) != 0 -} - -func init() { - for _, b := range []byte(`.+*?()|[]{}^$`) { - regexMetaCharacterBytes[b%16] |= 1 << (b / 16) - } -} - -// FindSetMatches returns list of values that can be equality matched on. -// copied from Prometheus querier.go, removed check for Prometheus wrapper. -func FindSetMatches(pattern string) []string { - escaped := false - sets := []*strings.Builder{{}} - for i := 0; i < len(pattern); i++ { - if escaped { - switch { - case isRegexMetaCharacter(pattern[i]): - sets[len(sets)-1].WriteByte(pattern[i]) - case pattern[i] == '\\': - sets[len(sets)-1].WriteByte('\\') - default: - return nil - } - escaped = false - } else { - switch { - case isRegexMetaCharacter(pattern[i]): - if pattern[i] == '|' { - sets = append(sets, &strings.Builder{}) - } else { - return nil - } - case pattern[i] == '\\': - escaped = true - default: - sets[len(sets)-1].WriteByte(pattern[i]) - } - } - } - matches := make([]string, 0, len(sets)) - for _, s := range sets { - if s.Len() > 0 { - matches = append(matches, s.String()) - } - } - return matches -} diff --git a/pkg/chunk/opts_test.go b/pkg/chunk/opts_test.go deleted file mode 100644 index 01d05407e6..0000000000 --- a/pkg/chunk/opts_test.go +++ /dev/null @@ -1,50 +0,0 @@ -package chunk - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -// Refer to https://github.com/prometheus/prometheus/issues/2651. -func TestFindSetMatches(t *testing.T) { - cases := []struct { - pattern string - exp []string - }{ - // Simple sets. - { - pattern: "foo|bar|baz", - exp: []string{ - "foo", - "bar", - "baz", - }, - }, - // Simple sets containing escaped characters. - { - pattern: "fo\\.o|bar\\?|\\^baz", - exp: []string{ - "fo.o", - "bar?", - "^baz", - }, - }, - // Simple sets containing special characters without escaping. - { - pattern: "fo.o|bar?|^baz", - exp: nil, - }, - { - pattern: "foo\\|bar\\|baz", - exp: []string{ - "foo|bar|baz", - }, - }, - } - - for _, c := range cases { - matches := FindSetMatches(c.pattern) - require.Equal(t, c.exp, matches) - } -} diff --git a/pkg/querier/distributor_queryable_test.go b/pkg/querier/distributor_queryable_test.go index 2d9704b1ce..7e2412083d 100644 --- a/pkg/querier/distributor_queryable_test.go +++ b/pkg/querier/distributor_queryable_test.go @@ -20,6 +20,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/prom1/storage/metric" + "github.com/cortexproject/cortex/pkg/querier/batch" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/chunkcompat" "github.com/cortexproject/cortex/pkg/util/validation" @@ -190,7 +191,7 @@ func TestIngesterStreaming(t *testing.T) { nil) ctx := user.InjectOrgID(context.Background(), "0") - queryable := newDistributorQueryable(d, true, mergeChunks, 0, true) + queryable := newDistributorQueryable(d, true, batch.NewChunkMergeIterator, 0, true) querier, err := queryable.Querier(mint, maxt) require.NoError(t, err) @@ -268,7 +269,7 @@ func TestIngesterStreamingMixedResults(t *testing.T) { nil) ctx := user.InjectOrgID(context.Background(), "0") - queryable := newDistributorQueryable(d, true, mergeChunks, 0, true) + queryable := newDistributorQueryable(d, true, batch.NewChunkMergeIterator, 0, true) querier, err := queryable.Querier(mint, maxt) require.NoError(t, err) diff --git a/pkg/querier/iterators/chunk_iterator.go b/pkg/querier/iterators/chunk_iterator.go deleted file mode 100644 index 0f8f7347e1..0000000000 --- a/pkg/querier/iterators/chunk_iterator.go +++ /dev/null @@ -1,64 +0,0 @@ -package iterators - -import ( - "github.com/prometheus/common/model" - - "github.com/cortexproject/cortex/pkg/chunk" - promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" -) - -type chunkIterator struct { - chunk.Chunk - it promchunk.Iterator - - // At() is called often in the heap code, so caching its result seems like - // a good idea. - cacheValid bool - cachedTime int64 - cachedValue float64 -} - -// Seek advances the iterator forward to the value at or after -// the given timestamp. -func (i *chunkIterator) Seek(t int64) bool { - i.cacheValid = false - - // We assume seeks only care about a specific window; if this chunk doesn't - // contain samples in that window, we can shortcut. - if int64(i.Through) < t { - return false - } - - return i.it.FindAtOrAfter(model.Time(t)) -} - -func (i *chunkIterator) AtTime() int64 { - if i.cacheValid { - return i.cachedTime - } - - v := i.it.Value() - i.cachedTime, i.cachedValue = int64(v.Timestamp), float64(v.Value) - i.cacheValid = true - return i.cachedTime -} - -func (i *chunkIterator) At() (int64, float64) { - if i.cacheValid { - return i.cachedTime, i.cachedValue - } - - v := i.it.Value() - i.cachedTime, i.cachedValue = int64(v.Timestamp), float64(v.Value) - i.cacheValid = true - return i.cachedTime, i.cachedValue -} - -func (i *chunkIterator) Next() bool { - i.cacheValid = false - return i.it.Scan() -} - -func (i *chunkIterator) Err() error { - return i.it.Err() -} diff --git a/pkg/querier/iterators/chunk_merge_iterator.go b/pkg/querier/iterators/chunk_merge_iterator.go deleted file mode 100644 index 295527a9c8..0000000000 --- a/pkg/querier/iterators/chunk_merge_iterator.go +++ /dev/null @@ -1,207 +0,0 @@ -package iterators - -import ( - "container/heap" - "sort" - - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/tsdb/chunkenc" - - "github.com/cortexproject/cortex/pkg/chunk" -) - -type chunkMergeIterator struct { - its []*nonOverlappingIterator - h seriesIteratorHeap - - currTime int64 - currValue float64 - currErr error -} - -// NewChunkMergeIterator creates a chunkenc.Iterator for a set of chunks. -func NewChunkMergeIterator(cs []chunk.Chunk, _, _ model.Time) chunkenc.Iterator { - its := buildIterators(cs) - c := &chunkMergeIterator{ - currTime: -1, - its: its, - h: make(seriesIteratorHeap, 0, len(its)), - } - - for _, iter := range c.its { - if iter.Next() { - c.h = append(c.h, NewCompatibleChunksIterator(iter)) - continue - } - - if err := iter.Err(); err != nil { - c.currErr = err - } - } - - heap.Init(&c.h) - return NewCompatibleChunksIterator(c) -} - -// Build a list of lists of non-overlapping chunk iterators. -func buildIterators(cs []chunk.Chunk) []*nonOverlappingIterator { - chunks := make([]*chunkIterator, len(cs)) - for i := range cs { - chunks[i] = &chunkIterator{ - Chunk: cs[i], - it: cs[i].Data.NewIterator(nil), - } - } - sort.Sort(byFrom(chunks)) - - chunkLists := [][]*chunkIterator{} -outer: - for _, chunk := range chunks { - for i, chunkList := range chunkLists { - if chunkList[len(chunkList)-1].Through.Before(chunk.From) { - chunkLists[i] = append(chunkLists[i], chunk) - continue outer - } - } - chunkLists = append(chunkLists, []*chunkIterator{chunk}) - } - - its := make([]*nonOverlappingIterator, 0, len(chunkLists)) - for _, chunkList := range chunkLists { - its = append(its, newNonOverlappingIterator(chunkList)) - } - return its -} - -func (c *chunkMergeIterator) Seek(t int64) bool { - c.h = c.h[:0] - - for _, iter := range c.its { - if iter.Seek(t) { - c.h = append(c.h, NewCompatibleChunksIterator(iter)) - continue - } - - if err := iter.Err(); err != nil { - c.currErr = err - return false - } - } - - heap.Init(&c.h) - - if len(c.h) > 0 { - c.currTime, c.currValue = c.h[0].At() - return true - } - - return false -} - -func (c *chunkMergeIterator) Next() bool { - if len(c.h) == 0 { - return false - } - - lastTime := c.currTime - for c.currTime == lastTime && len(c.h) > 0 { - c.currTime, c.currValue = c.h[0].At() - - if c.h[0].Next() != chunkenc.ValNone { - heap.Fix(&c.h, 0) - continue - } - - iter := heap.Pop(&c.h).(chunkenc.Iterator) - if err := iter.Err(); err != nil { - c.currErr = err - return false - } - } - - return c.currTime != lastTime -} - -func (c *chunkMergeIterator) At() (t int64, v float64) { - return c.currTime, c.currValue -} - -func (c *chunkMergeIterator) Err() error { - return c.currErr -} - -type extraIterator interface { - chunkenc.Iterator -} - -type seriesIteratorHeap []extraIterator - -func (h *seriesIteratorHeap) Len() int { return len(*h) } -func (h *seriesIteratorHeap) Swap(i, j int) { (*h)[i], (*h)[j] = (*h)[j], (*h)[i] } - -func (h *seriesIteratorHeap) Less(i, j int) bool { - iT, _ := (*h)[i].At() - jT, _ := (*h)[j].At() - return iT < jT -} - -func (h *seriesIteratorHeap) Push(x interface{}) { - *h = append(*h, x.(extraIterator)) -} - -func (h *seriesIteratorHeap) Pop() interface{} { - old := *h - n := len(old) - x := old[n-1] - *h = old[0 : n-1] - return x -} - -type byFrom []*chunkIterator - -func (b byFrom) Len() int { return len(b) } -func (b byFrom) Swap(i, j int) { b[i], b[j] = b[j], b[i] } -func (b byFrom) Less(i, j int) bool { return b[i].From < b[j].From } - -type nonOverlappingIterator struct { - curr int - chunks []*chunkIterator -} - -// newNonOverlappingIterator returns a single iterator over an slice of sorted, -// non-overlapping iterators. -func newNonOverlappingIterator(chunks []*chunkIterator) *nonOverlappingIterator { - return &nonOverlappingIterator{ - chunks: chunks, - } -} - -func (it *nonOverlappingIterator) Seek(t int64) bool { - for ; it.curr < len(it.chunks); it.curr++ { - if it.chunks[it.curr].Seek(t) { - return true - } - } - - return false -} - -func (it *nonOverlappingIterator) Next() bool { - for it.curr < len(it.chunks) && !it.chunks[it.curr].Next() { - it.curr++ - } - - return it.curr < len(it.chunks) -} - -func (it *nonOverlappingIterator) AtTime() int64 { - return it.chunks[it.curr].AtTime() -} - -func (it *nonOverlappingIterator) At() (int64, float64) { - return it.chunks[it.curr].At() -} - -func (it *nonOverlappingIterator) Err() error { - return nil -} diff --git a/pkg/querier/iterators/chunk_merge_iterator_test.go b/pkg/querier/iterators/chunk_merge_iterator_test.go deleted file mode 100644 index bd9e98502a..0000000000 --- a/pkg/querier/iterators/chunk_merge_iterator_test.go +++ /dev/null @@ -1,113 +0,0 @@ -package iterators - -import ( - "strconv" - "testing" - "time" - - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" - "github.com/prometheus/prometheus/tsdb/chunkenc" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/cortexproject/cortex/pkg/chunk" - promchunk "github.com/cortexproject/cortex/pkg/chunk/encoding" -) - -func TestChunkMergeIterator(t *testing.T) { - t.Parallel() - for i, tc := range []struct { - chunks []chunk.Chunk - mint, maxt int64 - }{ - { - chunks: []chunk.Chunk{ - mkChunk(t, 0, 100, 1*time.Millisecond, promchunk.PrometheusXorChunk), - }, - maxt: 100, - }, - - { - chunks: []chunk.Chunk{ - mkChunk(t, 0, 100, 1*time.Millisecond, promchunk.PrometheusXorChunk), - mkChunk(t, 0, 100, 1*time.Millisecond, promchunk.PrometheusXorChunk), - }, - maxt: 100, - }, - - { - chunks: []chunk.Chunk{ - mkChunk(t, 0, 100, 1*time.Millisecond, promchunk.PrometheusXorChunk), - mkChunk(t, 50, 150, 1*time.Millisecond, promchunk.PrometheusXorChunk), - mkChunk(t, 100, 200, 1*time.Millisecond, promchunk.PrometheusXorChunk), - }, - maxt: 200, - }, - - { - chunks: []chunk.Chunk{ - mkChunk(t, 0, 100, 1*time.Millisecond, promchunk.PrometheusXorChunk), - mkChunk(t, 100, 200, 1*time.Millisecond, promchunk.PrometheusXorChunk), - }, - maxt: 200, - }, - } { - tc := tc - t.Run(strconv.Itoa(i), func(t *testing.T) { - t.Parallel() - iter := NewChunkMergeIterator(tc.chunks, 0, 0) - for i := tc.mint; i < tc.maxt; i++ { - require.NotEqual(t, iter.Next(), chunkenc.ValNone) - ts, s := iter.At() - assert.Equal(t, i, ts) - assert.Equal(t, float64(i), s) - assert.NoError(t, iter.Err()) - } - assert.Equal(t, iter.Next(), chunkenc.ValNone) - }) - } -} - -func TestChunkMergeIteratorSeek(t *testing.T) { - t.Parallel() - iter := NewChunkMergeIterator([]chunk.Chunk{ - mkChunk(t, 0, 100, 1*time.Millisecond, promchunk.PrometheusXorChunk), - mkChunk(t, 50, 150, 1*time.Millisecond, promchunk.PrometheusXorChunk), - mkChunk(t, 100, 200, 1*time.Millisecond, promchunk.PrometheusXorChunk), - }, 0, 0) - - for i := int64(0); i < 10; i += 20 { - require.NotEqual(t, iter.Seek(i), chunkenc.ValNone) - ts, s := iter.At() - assert.Equal(t, i, ts) - assert.Equal(t, float64(i), s) - assert.NoError(t, iter.Err()) - - for j := i + 1; j < 200; j++ { - require.NotEqual(t, iter.Next(), chunkenc.ValNone) - ts, s := iter.At() - assert.Equal(t, j, ts) - assert.Equal(t, float64(j), s) - assert.NoError(t, iter.Err()) - } - assert.Equal(t, iter.Next(), chunkenc.ValNone) - } -} - -func mkChunk(t require.TestingT, mint, maxt model.Time, step time.Duration, encoding promchunk.Encoding) chunk.Chunk { - metric := labels.Labels{ - {Name: model.MetricNameLabel, Value: "foo"}, - } - pc, err := promchunk.NewForEncoding(encoding) - require.NoError(t, err) - for i := mint; i.Before(maxt); i = i.Add(step) { - npc, err := pc.Add(model.SamplePair{ - Timestamp: i, - Value: model.SampleValue(float64(i)), - }) - require.NoError(t, err) - require.Nil(t, npc) - } - return chunk.NewChunk(metric, pc, mint, maxt) -} diff --git a/pkg/querier/matrix.go b/pkg/querier/matrix.go deleted file mode 100644 index 6367bc7910..0000000000 --- a/pkg/querier/matrix.go +++ /dev/null @@ -1,25 +0,0 @@ -package querier - -import ( - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/tsdb/chunkenc" - - "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/querier/series" - "github.com/cortexproject/cortex/pkg/util" -) - -func mergeChunks(chunks []chunk.Chunk, from, through model.Time) chunkenc.Iterator { - samples := make([][]model.SamplePair, 0, len(chunks)) - for _, c := range chunks { - ss, err := c.Samples(from, through) - if err != nil { - return series.NewErrIterator(err) - } - - samples = append(samples, ss) - } - - merged := util.MergeNSampleSets(samples...) - return series.NewConcreteSeriesIterator(series.NewConcreteSeries(nil, merged)) -} diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 2946c2d96c..4ad9e0844b 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -27,7 +27,6 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/querier/batch" - "github.com/cortexproject/cortex/pkg/querier/iterators" "github.com/cortexproject/cortex/pkg/querier/lazyquery" seriesset "github.com/cortexproject/cortex/pkg/querier/series" querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" @@ -44,8 +43,6 @@ import ( type Config struct { MaxConcurrent int `yaml:"max_concurrent"` Timeout time.Duration `yaml:"timeout"` - Iterators bool `yaml:"iterators"` - BatchIterators bool `yaml:"batch_iterators"` IngesterStreaming bool `yaml:"ingester_streaming" doc:"hidden"` IngesterMetadataStreaming bool `yaml:"ingester_metadata_streaming"` MaxSamples int `yaml:"max_samples"` @@ -102,12 +99,14 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { flagext.DeprecatedFlag(f, "querier.at-modifier-enabled", "This flag is no longer functional; at-modifier is always enabled now.", util_log.Logger) //lint:ignore faillint Need to pass the global logger like this for warning on deprecated methods flagext.DeprecatedFlag(f, "querier.ingester-streaming", "Deprecated: Use streaming RPCs to query ingester. QueryStream is always enabled and the flag is not effective anymore.", util_log.Logger) + //lint:ignore faillint Need to pass the global logger like this for warning on deprecated methods + flagext.DeprecatedFlag(f, "querier.iterators", "Deprecated: Use iterators to execute query. This flag is no longer functional; Batch iterator is always enabled instead.", util_log.Logger) + //lint:ignore faillint Need to pass the global logger like this for warning on deprecated methods + flagext.DeprecatedFlag(f, "querier.batch-iterators", "Deprecated: Use batch iterators to execute query. This flag is no longer functional; Batch iterator is always enabled now.", util_log.Logger) cfg.StoreGatewayClient.RegisterFlagsWithPrefix("querier.store-gateway-client", f) f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.") f.DurationVar(&cfg.Timeout, "querier.timeout", 2*time.Minute, "The timeout for a query.") - f.BoolVar(&cfg.Iterators, "querier.iterators", false, "Use iterators to execute query, as opposed to fully materialising the series in memory.") - f.BoolVar(&cfg.BatchIterators, "querier.batch-iterators", true, "Use batch iterators to execute query, as opposed to fully materialising the series in memory. Takes precedent over the -querier.iterators flag.") f.BoolVar(&cfg.IngesterMetadataStreaming, "querier.ingester-metadata-streaming", false, "Use streaming RPCs for metadata APIs from ingester.") f.IntVar(&cfg.MaxSamples, "querier.max-samples", 50e6, "Maximum number of samples a single query can load into memory.") f.DurationVar(&cfg.QueryIngestersWithin, "querier.query-ingesters-within", 0, "Maximum lookback beyond which queries are not sent to ingester. 0 means all queries are sent to ingester.") @@ -152,13 +151,8 @@ func (cfg *Config) GetStoreGatewayAddresses() []string { return strings.Split(cfg.StoreGatewayAddresses, ",") } -func getChunksIteratorFunction(cfg Config) chunkIteratorFunc { - if cfg.BatchIterators { - return batch.NewChunkMergeIterator - } else if cfg.Iterators { - return iterators.NewChunkMergeIterator - } - return mergeChunks +func getChunksIteratorFunction(_ Config) chunkIteratorFunc { + return batch.NewChunkMergeIterator } // New builds a queryable and promql engine. diff --git a/pkg/querier/querier_test.go b/pkg/querier/querier_test.go index 5e37087e3e..77a76b8e62 100644 --- a/pkg/querier/querier_test.go +++ b/pkg/querier/querier_test.go @@ -485,33 +485,29 @@ func TestQuerier(t *testing.T) { for _, thanosEngine := range []bool{false, true} { for _, query := range queries { for _, encoding := range encodings { - for _, iterators := range []bool{false, true} { - iterators := iterators - t.Run(fmt.Sprintf("%s/%s/iterators=%t", query.query, encoding.name, iterators), func(t *testing.T) { - var queryEngine promql.QueryEngine - if thanosEngine { - queryEngine = engine.New(engine.Opts{ - EngineOpts: opts, - LogicalOptimizers: logicalplan.AllOptimizers, - }) - } else { - queryEngine = promql.NewEngine(opts) - } - cfg.Iterators = iterators - // Disable active query tracker to avoid mmap error. - cfg.ActiveQueryTrackerDir = "" - - chunkStore, through := makeMockChunkStore(t, chunks, encoding.e) - distributor := mockDistibutorFor(t, chunkStore.chunks) - - overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil) - require.NoError(t, err) + t.Run(fmt.Sprintf("%s/%s", query.query, encoding.name), func(t *testing.T) { + var queryEngine promql.QueryEngine + if thanosEngine { + queryEngine = engine.New(engine.Opts{ + EngineOpts: opts, + LogicalOptimizers: logicalplan.AllOptimizers, + }) + } else { + queryEngine = promql.NewEngine(opts) + } + // Disable active query tracker to avoid mmap error. + cfg.ActiveQueryTrackerDir = "" - queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(cfg, chunkStore)), UseAlwaysQueryable(db)} - queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger()) - testRangeQuery(t, queryable, queryEngine, through, query) - }) - } + chunkStore, through := makeMockChunkStore(t, chunks, encoding.e) + distributor := mockDistibutorFor(t, chunkStore.chunks) + + overrides, err := validation.NewOverrides(DefaultLimitsConfig(), nil) + require.NoError(t, err) + + queryables := []QueryableWithFilter{UseAlwaysQueryable(NewMockStoreQueryable(cfg, chunkStore)), UseAlwaysQueryable(db)} + queryable, _, _ := New(cfg, overrides, distributor, queryables, nil, log.NewNopLogger()) + testRangeQuery(t, queryable, queryEngine, through, query) + }) } } } diff --git a/pkg/querier/series/series_set.go b/pkg/querier/series/series_set.go index b8742ec0d7..7c5c9bb4b1 100644 --- a/pkg/querier/series/series_set.go +++ b/pkg/querier/series/series_set.go @@ -128,32 +128,6 @@ func (c *concreteSeriesIterator) Err() error { return nil } -// NewErrIterator instantiates an errIterator -func NewErrIterator(err error) chunkenc.Iterator { - return iterators.NewCompatibleChunksIterator(errIterator{err}) -} - -// errIterator implements chunkenc.Iterator, just returning an error. -type errIterator struct { - err error -} - -func (errIterator) Seek(int64) bool { - return false -} - -func (errIterator) Next() bool { - return false -} - -func (errIterator) At() (t int64, v float64) { - return 0, 0 -} - -func (e errIterator) Err() error { - return e.err -} - // MatrixToSeriesSet creates a storage.SeriesSet from a model.Matrix // Series will be sorted by labels if sortSeries is set. func MatrixToSeriesSet(sortSeries bool, m model.Matrix) storage.SeriesSet { @@ -175,8 +149,7 @@ func MetricsToSeriesSet(ctx context.Context, sortSeries bool, ms []metric.Metric return storage.ErrSeriesSet(ctx.Err()) } series = append(series, &ConcreteSeries{ - labels: metricToLabels(m.Metric), - samples: nil, + labels: metricToLabels(m.Metric), }) } return NewConcreteSeriesSet(sortSeries, series) diff --git a/pkg/util/merger.go b/pkg/util/merger.go index c5ac8dad18..1202a9f0ec 100644 --- a/pkg/util/merger.go +++ b/pkg/util/merger.go @@ -24,19 +24,3 @@ func MergeSampleSets(a, b []model.SamplePair) []model.SamplePair { result = append(result, b[j:]...) return result } - -// MergeNSampleSets merges and dedupes n sets of already sorted sample pairs. -func MergeNSampleSets(sampleSets ...[]model.SamplePair) []model.SamplePair { - l := len(sampleSets) - switch l { - case 0: - return []model.SamplePair{} - case 1: - return sampleSets[0] - } - - n := l / 2 - left := MergeNSampleSets(sampleSets[:n]...) - right := MergeNSampleSets(sampleSets[n:]...) - return MergeSampleSets(left, right) -} diff --git a/pkg/util/merger_test.go b/pkg/util/merger_test.go index ca7816a430..e53cf51e5d 100644 --- a/pkg/util/merger_test.go +++ b/pkg/util/merger_test.go @@ -50,32 +50,3 @@ func TestMergeSampleSets(t *testing.T) { require.Equal(t, c.expected, samples) } } - -func TestMergeNSampleSets(t *testing.T) { - now := model.Now() - sample1 := model.SamplePair{Timestamp: now, Value: 1} - sample2 := model.SamplePair{Timestamp: now.Add(1 * time.Second), Value: 2} - sample3 := model.SamplePair{Timestamp: now.Add(4 * time.Second), Value: 3} - sample4 := model.SamplePair{Timestamp: now.Add(8 * time.Second), Value: 7} - - for _, c := range []struct { - sampleSets [][]model.SamplePair - expected []model.SamplePair - }{ - { - sampleSets: [][]model.SamplePair{{}, {}, {}}, - expected: []model.SamplePair{}, - }, - { - sampleSets: [][]model.SamplePair{ - {sample1, sample2}, - {sample2}, - {sample1, sample3, sample4}, - }, - expected: []model.SamplePair{sample1, sample2, sample3, sample4}, - }, - } { - samples := MergeNSampleSets(c.sampleSets...) - require.Equal(t, c.expected, samples) - } -}